3 # Unit tests for the notification control
4 # Copyright (C) Stefan Metzmacher 2016
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 from __future__ import print_function
24 sys.path.insert(0, "bin/python")
27 from samba.tests.subunitrun import SubunitOptions, TestProgram
29 import samba.getopt as options
31 from samba.auth import system_session
33 from samba.samdb import SamDB
34 from samba.ndr import ndr_unpack
35 from samba import gensec
36 from samba.credentials import Credentials
39 from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
41 from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
42 from ldb import ERR_TIME_LIMIT_EXCEEDED, ERR_ADMIN_LIMIT_EXCEEDED, ERR_UNWILLING_TO_PERFORM
43 from ldb import Message
45 parser = optparse.OptionParser("notification.py [options] <host>")
46 sambaopts = options.SambaOptions(parser)
47 parser.add_option_group(sambaopts)
48 parser.add_option_group(options.VersionOptions(parser))
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
62 lp = sambaopts.get_loadparm()
63 creds = credopts.get_credentials(lp)
66 class LDAPNotificationTest(samba.tests.TestCase):
69 super(LDAPNotificationTest, self).setUp()
70 self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
71 self.base_dn = self.ldb.domain_dn()
73 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
74 self.assertEqual(len(res), 1)
76 self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
78 def test_simple_search(self):
79 """Testing a notification with an modify and a timeout"""
80 if not url.startswith("ldap"):
81 self.fail(msg="This test is only valid on ldap")
84 search1 = self.ldb.search_iterator(base=self.user_sid_dn,
85 expression="(objectClass=*)",
86 scope=ldb.SCOPE_SUBTREE,
87 attrs=["name", "objectGUID", "displayName"])
89 self.assertIsInstance(reply, ldb.Message)
90 self.assertIsNone(msg1)
92 res1 = search1.result()
94 search2 = self.ldb.search_iterator(base=self.base_dn,
95 expression="(objectClass=*)",
96 scope=ldb.SCOPE_SUBTREE,
97 attrs=["name", "objectGUID", "displayName"])
100 for reply in search2:
101 if isinstance(reply, str):
104 self.assertIsInstance(reply, ldb.Message)
105 if reply["objectGUID"][0] == msg1["objectGUID"][0]:
106 self.assertIsNone(msg2)
108 self.assertEqual(msg1.dn, msg2.dn)
109 self.assertEqual(len(msg1), len(msg2))
110 self.assertEqual(msg1["name"], msg2["name"])
111 #self.assertEqual(msg1["displayName"], msg2["displayName"])
112 res2 = search2.result()
114 self.ldb.modify_ldif("""
115 dn: """ + self.user_sid_dn + """
117 replace: otherLoginWorkstations
118 otherLoginWorkstations: BEFORE"
120 notify1 = self.ldb.search_iterator(base=self.base_dn,
121 expression="(objectClass=*)",
122 scope=ldb.SCOPE_SUBTREE,
123 attrs=["name", "objectGUID", "displayName"],
124 controls=["notification:1"],
127 self.ldb.modify_ldif("""
128 dn: """ + self.user_sid_dn + """
130 replace: otherLoginWorkstations
131 otherLoginWorkstations: AFTER"
135 for reply in notify1:
136 self.assertIsInstance(reply, ldb.Message)
137 if reply["objectGUID"][0] == msg1["objectGUID"][0]:
138 self.assertIsNone(msg3)
140 self.assertEqual(msg1.dn, msg3.dn)
141 self.assertEqual(len(msg1), len(msg3))
142 self.assertEqual(msg1["name"], msg3["name"])
143 #self.assertEqual(msg1["displayName"], msg3["displayName"])
145 res = notify1.result()
147 except LdbError as e10:
149 self.assertEqual(num, ERR_TIME_LIMIT_EXCEEDED)
150 self.assertIsNotNone(msg3)
152 self.ldb.modify_ldif("""
153 dn: """ + self.user_sid_dn + """
155 delete: otherLoginWorkstations
158 def test_max_search(self):
159 """Testing the max allowed notifications"""
160 if not url.startswith("ldap"):
161 self.fail(msg="This test is only valid on ldap")
163 max_notifications = 5
165 notifies = [None] * (max_notifications + 1)
166 for i in range(0, max_notifications + 1):
167 notifies[i] = self.ldb.search_iterator(base=self.base_dn,
168 expression="(objectClass=*)",
169 scope=ldb.SCOPE_SUBTREE,
171 controls=["notification:1"],
175 for i in range(0, max_notifications + 1):
177 for msg in notifies[i]:
179 res = notifies[i].result()
181 except LdbError as e:
183 if num == ERR_ADMIN_LIMIT_EXCEEDED:
186 if num == ERR_TIME_LIMIT_EXCEEDED:
190 self.assertEqual(num_admin_limit, 1)
191 self.assertEqual(num_time_limit, max_notifications)
193 def test_invalid_filter(self):
194 """Testing invalid filters for notifications"""
195 if not url.startswith("ldap"):
196 self.fail(msg="This test is only valid on ldap")
198 valid_attrs = ["objectClass", "objectGUID", "distinguishedName", "name"]
200 for va in valid_attrs:
202 hnd = self.ldb.search_iterator(base=self.base_dn,
203 expression="(%s=*)" % va,
204 scope=ldb.SCOPE_SUBTREE,
206 controls=["notification:1"],
212 except LdbError as e1:
214 self.assertEqual(num, ERR_TIME_LIMIT_EXCEEDED)
217 hnd = self.ldb.search_iterator(base=self.base_dn,
218 expression="(|(%s=*)(%s=value))" % (va, va),
219 scope=ldb.SCOPE_SUBTREE,
221 controls=["notification:1"],
227 except LdbError as e2:
229 self.assertEqual(num, ERR_TIME_LIMIT_EXCEEDED)
232 hnd = self.ldb.search_iterator(base=self.base_dn,
233 expression="(&(%s=*)(%s=value))" % (va, va),
234 scope=ldb.SCOPE_SUBTREE,
236 controls=["notification:1"],
242 except LdbError as e3:
244 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
247 hnd = self.ldb.search_iterator(base=self.base_dn,
248 expression="(%s=value)" % va,
249 scope=ldb.SCOPE_SUBTREE,
251 controls=["notification:1"],
257 except LdbError as e4:
259 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
262 hnd = self.ldb.search_iterator(base=self.base_dn,
263 expression="(%s>=value)" % va,
264 scope=ldb.SCOPE_SUBTREE,
266 controls=["notification:1"],
272 except LdbError as e5:
274 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
277 hnd = self.ldb.search_iterator(base=self.base_dn,
278 expression="(%s<=value)" % va,
279 scope=ldb.SCOPE_SUBTREE,
281 controls=["notification:1"],
287 except LdbError as e6:
289 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
292 hnd = self.ldb.search_iterator(base=self.base_dn,
293 expression="(%s=*value*)" % va,
294 scope=ldb.SCOPE_SUBTREE,
296 controls=["notification:1"],
302 except LdbError as e7:
304 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
307 hnd = self.ldb.search_iterator(base=self.base_dn,
308 expression="(!(%s=*))" % va,
309 scope=ldb.SCOPE_SUBTREE,
311 controls=["notification:1"],
317 except LdbError as e8:
319 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
321 res = self.ldb.search(base=self.ldb.get_schema_basedn(),
322 expression="(objectClass=attributeSchema)",
323 scope=ldb.SCOPE_ONELEVEL,
324 attrs=["lDAPDisplayName"],
325 controls=["paged_results:1:2500"])
327 va = str(msg["lDAPDisplayName"][0])
328 if va in valid_attrs:
332 hnd = self.ldb.search_iterator(base=self.base_dn,
333 expression="(%s=*)" % va,
334 scope=ldb.SCOPE_SUBTREE,
336 controls=["notification:1"],
342 except LdbError as e9:
344 if num != ERR_UNWILLING_TO_PERFORM:
346 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
349 va = "noneAttributeName"
350 hnd = self.ldb.search_iterator(base=self.base_dn,
351 expression="(%s=*)" % va,
352 scope=ldb.SCOPE_SUBTREE,
354 controls=["notification:1"],
360 except LdbError as e11:
362 if num != ERR_UNWILLING_TO_PERFORM:
364 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
368 if os.path.isfile(url):
369 url = "tdb://%s" % url
371 url = "ldap://%s" % url
373 TestProgram(module=__name__, opts=subunitopts)