3 # Unit tests for the notification control
4 # Copyright (C) Stefan Metzmacher 2016
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 from __future__ import print_function
24 sys.path.insert(0, "bin/python")
27 from samba.tests.subunitrun import SubunitOptions, TestProgram
29 import samba.getopt as options
31 from samba.auth import system_session
33 from samba.samdb import SamDB
34 from samba.ndr import ndr_unpack
35 from samba import gensec
36 from samba.credentials import Credentials
39 from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
41 from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
42 from ldb import ERR_TIME_LIMIT_EXCEEDED, ERR_ADMIN_LIMIT_EXCEEDED, ERR_UNWILLING_TO_PERFORM
43 from ldb import Message
45 parser = optparse.OptionParser("notification.py [options] <host>")
46 sambaopts = options.SambaOptions(parser)
47 parser.add_option_group(sambaopts)
48 parser.add_option_group(options.VersionOptions(parser))
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
62 lp = sambaopts.get_loadparm()
63 creds = credopts.get_credentials(lp)
64 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
67 class LDAPNotificationTest(samba.tests.TestCase):
70 super(samba.tests.TestCase, self).setUp()
71 self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
72 self.base_dn = self.ldb.domain_dn()
74 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
75 self.assertEquals(len(res), 1)
77 self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
79 def test_simple_search(self):
80 """Testing a notification with an modify and a timeout"""
81 if not url.startswith("ldap"):
82 self.fail(msg="This test is only valid on ldap")
85 search1 = self.ldb.search_iterator(base=self.user_sid_dn,
86 expression="(objectClass=*)",
87 scope=ldb.SCOPE_SUBTREE,
88 attrs=["name", "objectGUID", "displayName"])
90 self.assertIsInstance(reply, ldb.Message)
91 self.assertIsNone(msg1)
93 res1 = search1.result()
95 search2 = self.ldb.search_iterator(base=self.base_dn,
96 expression="(objectClass=*)",
97 scope=ldb.SCOPE_SUBTREE,
98 attrs=["name", "objectGUID", "displayName"])
101 for reply in search2:
102 if isinstance(reply, str):
105 self.assertIsInstance(reply, ldb.Message)
106 if reply["objectGUID"][0] == msg1["objectGUID"][0]:
107 self.assertIsNone(msg2)
109 self.assertEqual(msg1.dn, msg2.dn)
110 self.assertEqual(len(msg1), len(msg2))
111 self.assertEqual(msg1["name"], msg2["name"])
112 #self.assertEqual(msg1["displayName"], msg2["displayName"])
113 res2 = search2.result()
115 self.ldb.modify_ldif("""
116 dn: """ + self.user_sid_dn + """
118 replace: otherLoginWorkstations
119 otherLoginWorkstations: BEFORE"
121 notify1 = self.ldb.search_iterator(base=self.base_dn,
122 expression="(objectClass=*)",
123 scope=ldb.SCOPE_SUBTREE,
124 attrs=["name", "objectGUID", "displayName"],
125 controls=["notification:1"],
128 self.ldb.modify_ldif("""
129 dn: """ + self.user_sid_dn + """
131 replace: otherLoginWorkstations
132 otherLoginWorkstations: AFTER"
136 for reply in notify1:
137 self.assertIsInstance(reply, ldb.Message)
138 if reply["objectGUID"][0] == msg1["objectGUID"][0]:
139 self.assertIsNone(msg3)
141 self.assertEqual(msg1.dn, msg3.dn)
142 self.assertEqual(len(msg1), len(msg3))
143 self.assertEqual(msg1["name"], msg3["name"])
144 #self.assertEqual(msg1["displayName"], msg3["displayName"])
146 res = notify1.result()
148 except LdbError as e10:
150 self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
151 self.assertIsNotNone(msg3)
153 self.ldb.modify_ldif("""
154 dn: """ + self.user_sid_dn + """
156 delete: otherLoginWorkstations
159 def test_max_search(self):
160 """Testing the max allowed notifications"""
161 if not url.startswith("ldap"):
162 self.fail(msg="This test is only valid on ldap")
164 max_notifications = 5
166 notifies = [None] * (max_notifications + 1)
167 for i in range(0, max_notifications + 1):
168 notifies[i] = self.ldb.search_iterator(base=self.base_dn,
169 expression="(objectClass=*)",
170 scope=ldb.SCOPE_SUBTREE,
172 controls=["notification:1"],
176 for i in range(0, max_notifications + 1):
178 for msg in notifies[i]:
180 res = notifies[i].result()
182 except LdbError as e:
184 if num == ERR_ADMIN_LIMIT_EXCEEDED:
187 if num == ERR_TIME_LIMIT_EXCEEDED:
191 self.assertEqual(num_admin_limit, 1)
192 self.assertEqual(num_time_limit, max_notifications)
194 def test_invalid_filter(self):
195 """Testing invalid filters for notifications"""
196 if not url.startswith("ldap"):
197 self.fail(msg="This test is only valid on ldap")
199 valid_attrs = ["objectClass", "objectGUID", "distinguishedName", "name"]
201 for va in valid_attrs:
203 hnd = self.ldb.search_iterator(base=self.base_dn,
204 expression="(%s=*)" % va,
205 scope=ldb.SCOPE_SUBTREE,
207 controls=["notification:1"],
213 except LdbError as e1:
215 self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
218 hnd = self.ldb.search_iterator(base=self.base_dn,
219 expression="(|(%s=*)(%s=value))" % (va, va),
220 scope=ldb.SCOPE_SUBTREE,
222 controls=["notification:1"],
228 except LdbError as e2:
230 self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
233 hnd = self.ldb.search_iterator(base=self.base_dn,
234 expression="(&(%s=*)(%s=value))" % (va, va),
235 scope=ldb.SCOPE_SUBTREE,
237 controls=["notification:1"],
243 except LdbError as e3:
245 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
248 hnd = self.ldb.search_iterator(base=self.base_dn,
249 expression="(%s=value)" % va,
250 scope=ldb.SCOPE_SUBTREE,
252 controls=["notification:1"],
258 except LdbError as e4:
260 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
263 hnd = self.ldb.search_iterator(base=self.base_dn,
264 expression="(%s>=value)" % va,
265 scope=ldb.SCOPE_SUBTREE,
267 controls=["notification:1"],
273 except LdbError as e5:
275 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
278 hnd = self.ldb.search_iterator(base=self.base_dn,
279 expression="(%s<=value)" % va,
280 scope=ldb.SCOPE_SUBTREE,
282 controls=["notification:1"],
288 except LdbError as e6:
290 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
293 hnd = self.ldb.search_iterator(base=self.base_dn,
294 expression="(%s=*value*)" % va,
295 scope=ldb.SCOPE_SUBTREE,
297 controls=["notification:1"],
303 except LdbError as e7:
305 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
308 hnd = self.ldb.search_iterator(base=self.base_dn,
309 expression="(!(%s=*))" % va,
310 scope=ldb.SCOPE_SUBTREE,
312 controls=["notification:1"],
318 except LdbError as e8:
320 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
322 res = self.ldb.search(base=self.ldb.get_schema_basedn(),
323 expression="(objectClass=attributeSchema)",
324 scope=ldb.SCOPE_ONELEVEL,
325 attrs=["lDAPDisplayName"],
326 controls=["paged_results:1:2500"])
328 va = msg["lDAPDisplayName"][0]
329 if va in valid_attrs:
333 hnd = self.ldb.search_iterator(base=self.base_dn,
334 expression="(%s=*)" % va,
335 scope=ldb.SCOPE_SUBTREE,
337 controls=["notification:1"],
343 except LdbError as e9:
345 if num != ERR_UNWILLING_TO_PERFORM:
347 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
350 va = "noneAttributeName"
351 hnd = self.ldb.search_iterator(base=self.base_dn,
352 expression="(%s=*)" % va,
353 scope=ldb.SCOPE_SUBTREE,
355 controls=["notification:1"],
361 except LdbError as e11:
363 if num != ERR_UNWILLING_TO_PERFORM:
365 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
369 if os.path.isfile(url):
370 url = "tdb://%s" % url
372 url = "ldap://%s" % url
374 TestProgram(module=__name__, opts=subunitopts)