pytests: heed assertEquals deprecation warning en-masse
[gd/samba-autobuild/.git] / source4 / dsdb / tests / python / notification.py
1 #!/usr/bin/env python3
2 #
3 # Unit tests for the notification control
4 # Copyright (C) Stefan Metzmacher 2016
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 from __future__ import print_function
20 import optparse
21 import sys
22 import os
23
24 sys.path.insert(0, "bin/python")
25 import samba
26
27 from samba.tests.subunitrun import SubunitOptions, TestProgram
28
29 import samba.getopt as options
30
31 from samba.auth import system_session
32 from samba import ldb
33 from samba.samdb import SamDB
34 from samba.ndr import ndr_unpack
35 from samba import gensec
36 from samba.credentials import Credentials
37 import samba.tests
38
39 from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
40
41 from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
42 from ldb import ERR_TIME_LIMIT_EXCEEDED, ERR_ADMIN_LIMIT_EXCEEDED, ERR_UNWILLING_TO_PERFORM
43 from ldb import Message
44
45 parser = optparse.OptionParser("notification.py [options] <host>")
46 sambaopts = options.SambaOptions(parser)
47 parser.add_option_group(sambaopts)
48 parser.add_option_group(options.VersionOptions(parser))
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
55
56 if len(args) < 1:
57     parser.print_usage()
58     sys.exit(1)
59
60 url = args[0]
61
62 lp = sambaopts.get_loadparm()
63 creds = credopts.get_credentials(lp)
64
65
66 class LDAPNotificationTest(samba.tests.TestCase):
67
68     def setUp(self):
69         super(LDAPNotificationTest, self).setUp()
70         self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
71         self.base_dn = self.ldb.domain_dn()
72
73         res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
74         self.assertEqual(len(res), 1)
75
76         self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
77
78     def test_simple_search(self):
79         """Testing a notification with an modify and a timeout"""
80         if not url.startswith("ldap"):
81             self.fail(msg="This test is only valid on ldap")
82
83         msg1 = None
84         search1 = self.ldb.search_iterator(base=self.user_sid_dn,
85                                            expression="(objectClass=*)",
86                                            scope=ldb.SCOPE_SUBTREE,
87                                            attrs=["name", "objectGUID", "displayName"])
88         for reply in search1:
89             self.assertIsInstance(reply, ldb.Message)
90             self.assertIsNone(msg1)
91             msg1 = reply
92         res1 = search1.result()
93
94         search2 = self.ldb.search_iterator(base=self.base_dn,
95                                            expression="(objectClass=*)",
96                                            scope=ldb.SCOPE_SUBTREE,
97                                            attrs=["name", "objectGUID", "displayName"])
98         refs2 = 0
99         msg2 = None
100         for reply in search2:
101             if isinstance(reply, str):
102                 refs2 += 1
103                 continue
104             self.assertIsInstance(reply, ldb.Message)
105             if reply["objectGUID"][0] == msg1["objectGUID"][0]:
106                 self.assertIsNone(msg2)
107                 msg2 = reply
108                 self.assertEqual(msg1.dn, msg2.dn)
109                 self.assertEqual(len(msg1), len(msg2))
110                 self.assertEqual(msg1["name"], msg2["name"])
111                 #self.assertEqual(msg1["displayName"], msg2["displayName"])
112         res2 = search2.result()
113
114         self.ldb.modify_ldif("""
115 dn: """ + self.user_sid_dn + """
116 changetype: modify
117 replace: otherLoginWorkstations
118 otherLoginWorkstations: BEFORE"
119 """)
120         notify1 = self.ldb.search_iterator(base=self.base_dn,
121                                            expression="(objectClass=*)",
122                                            scope=ldb.SCOPE_SUBTREE,
123                                            attrs=["name", "objectGUID", "displayName"],
124                                            controls=["notification:1"],
125                                            timeout=1)
126
127         self.ldb.modify_ldif("""
128 dn: """ + self.user_sid_dn + """
129 changetype: modify
130 replace: otherLoginWorkstations
131 otherLoginWorkstations: AFTER"
132 """)
133
134         msg3 = None
135         for reply in notify1:
136             self.assertIsInstance(reply, ldb.Message)
137             if reply["objectGUID"][0] == msg1["objectGUID"][0]:
138                 self.assertIsNone(msg3)
139                 msg3 = reply
140                 self.assertEqual(msg1.dn, msg3.dn)
141                 self.assertEqual(len(msg1), len(msg3))
142                 self.assertEqual(msg1["name"], msg3["name"])
143                 #self.assertEqual(msg1["displayName"], msg3["displayName"])
144         try:
145             res = notify1.result()
146             self.fail()
147         except LdbError as e10:
148             (num, _) = e10.args
149             self.assertEqual(num, ERR_TIME_LIMIT_EXCEEDED)
150         self.assertIsNotNone(msg3)
151
152         self.ldb.modify_ldif("""
153 dn: """ + self.user_sid_dn + """
154 changetype: delete
155 delete: otherLoginWorkstations
156 """)
157
158     def test_max_search(self):
159         """Testing the max allowed notifications"""
160         if not url.startswith("ldap"):
161             self.fail(msg="This test is only valid on ldap")
162
163         max_notifications = 5
164
165         notifies = [None] * (max_notifications + 1)
166         for i in range(0, max_notifications + 1):
167             notifies[i] = self.ldb.search_iterator(base=self.base_dn,
168                                                    expression="(objectClass=*)",
169                                                    scope=ldb.SCOPE_SUBTREE,
170                                                    attrs=["name"],
171                                                    controls=["notification:1"],
172                                                    timeout=1)
173         num_admin_limit = 0
174         num_time_limit = 0
175         for i in range(0, max_notifications + 1):
176             try:
177                 for msg in notifies[i]:
178                     continue
179                 res = notifies[i].result()
180                 self.fail()
181             except LdbError as e:
182                 (num, _) = e.args
183                 if num == ERR_ADMIN_LIMIT_EXCEEDED:
184                     num_admin_limit += 1
185                     continue
186                 if num == ERR_TIME_LIMIT_EXCEEDED:
187                     num_time_limit += 1
188                     continue
189                 raise
190         self.assertEqual(num_admin_limit, 1)
191         self.assertEqual(num_time_limit, max_notifications)
192
193     def test_invalid_filter(self):
194         """Testing invalid filters for notifications"""
195         if not url.startswith("ldap"):
196             self.fail(msg="This test is only valid on ldap")
197
198         valid_attrs = ["objectClass", "objectGUID", "distinguishedName", "name"]
199
200         for va in valid_attrs:
201             try:
202                 hnd = self.ldb.search_iterator(base=self.base_dn,
203                                                expression="(%s=*)" % va,
204                                                scope=ldb.SCOPE_SUBTREE,
205                                                attrs=["name"],
206                                                controls=["notification:1"],
207                                                timeout=1)
208                 for reply in hnd:
209                     self.fail()
210                 res = hnd.result()
211                 self.fail()
212             except LdbError as e1:
213                 (num, _) = e1.args
214                 self.assertEqual(num, ERR_TIME_LIMIT_EXCEEDED)
215
216             try:
217                 hnd = self.ldb.search_iterator(base=self.base_dn,
218                                                expression="(|(%s=*)(%s=value))" % (va, va),
219                                                scope=ldb.SCOPE_SUBTREE,
220                                                attrs=["name"],
221                                                controls=["notification:1"],
222                                                timeout=1)
223                 for reply in hnd:
224                     self.fail()
225                 res = hnd.result()
226                 self.fail()
227             except LdbError as e2:
228                 (num, _) = e2.args
229                 self.assertEqual(num, ERR_TIME_LIMIT_EXCEEDED)
230
231             try:
232                 hnd = self.ldb.search_iterator(base=self.base_dn,
233                                                expression="(&(%s=*)(%s=value))" % (va, va),
234                                                scope=ldb.SCOPE_SUBTREE,
235                                                attrs=["name"],
236                                                controls=["notification:1"],
237                                                timeout=0)
238                 for reply in hnd:
239                     self.fail()
240                 res = hnd.result()
241                 self.fail()
242             except LdbError as e3:
243                 (num, _) = e3.args
244                 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
245
246             try:
247                 hnd = self.ldb.search_iterator(base=self.base_dn,
248                                                expression="(%s=value)" % va,
249                                                scope=ldb.SCOPE_SUBTREE,
250                                                attrs=["name"],
251                                                controls=["notification:1"],
252                                                timeout=0)
253                 for reply in hnd:
254                     self.fail()
255                 res = hnd.result()
256                 self.fail()
257             except LdbError as e4:
258                 (num, _) = e4.args
259                 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
260
261             try:
262                 hnd = self.ldb.search_iterator(base=self.base_dn,
263                                                expression="(%s>=value)" % va,
264                                                scope=ldb.SCOPE_SUBTREE,
265                                                attrs=["name"],
266                                                controls=["notification:1"],
267                                                timeout=0)
268                 for reply in hnd:
269                     self.fail()
270                 res = hnd.result()
271                 self.fail()
272             except LdbError as e5:
273                 (num, _) = e5.args
274                 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
275
276             try:
277                 hnd = self.ldb.search_iterator(base=self.base_dn,
278                                                expression="(%s<=value)" % va,
279                                                scope=ldb.SCOPE_SUBTREE,
280                                                attrs=["name"],
281                                                controls=["notification:1"],
282                                                timeout=0)
283                 for reply in hnd:
284                     self.fail()
285                 res = hnd.result()
286                 self.fail()
287             except LdbError as e6:
288                 (num, _) = e6.args
289                 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
290
291             try:
292                 hnd = self.ldb.search_iterator(base=self.base_dn,
293                                                expression="(%s=*value*)" % va,
294                                                scope=ldb.SCOPE_SUBTREE,
295                                                attrs=["name"],
296                                                controls=["notification:1"],
297                                                timeout=0)
298                 for reply in hnd:
299                     self.fail()
300                 res = hnd.result()
301                 self.fail()
302             except LdbError as e7:
303                 (num, _) = e7.args
304                 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
305
306             try:
307                 hnd = self.ldb.search_iterator(base=self.base_dn,
308                                                expression="(!(%s=*))" % va,
309                                                scope=ldb.SCOPE_SUBTREE,
310                                                attrs=["name"],
311                                                controls=["notification:1"],
312                                                timeout=0)
313                 for reply in hnd:
314                     self.fail()
315                 res = hnd.result()
316                 self.fail()
317             except LdbError as e8:
318                 (num, _) = e8.args
319                 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
320
321         res = self.ldb.search(base=self.ldb.get_schema_basedn(),
322                               expression="(objectClass=attributeSchema)",
323                               scope=ldb.SCOPE_ONELEVEL,
324                               attrs=["lDAPDisplayName"],
325                               controls=["paged_results:1:2500"])
326         for msg in res:
327             va = str(msg["lDAPDisplayName"][0])
328             if va in valid_attrs:
329                 continue
330
331             try:
332                 hnd = self.ldb.search_iterator(base=self.base_dn,
333                                                expression="(%s=*)" % va,
334                                                scope=ldb.SCOPE_SUBTREE,
335                                                attrs=["name"],
336                                                controls=["notification:1"],
337                                                timeout=0)
338                 for reply in hnd:
339                     self.fail()
340                 res = hnd.result()
341                 self.fail()
342             except LdbError as e9:
343                 (num, _) = e9.args
344                 if num != ERR_UNWILLING_TO_PERFORM:
345                     print("va[%s]" % va)
346                 self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
347
348         try:
349             va = "noneAttributeName"
350             hnd = self.ldb.search_iterator(base=self.base_dn,
351                                            expression="(%s=*)" % va,
352                                            scope=ldb.SCOPE_SUBTREE,
353                                            attrs=["name"],
354                                            controls=["notification:1"],
355                                            timeout=0)
356             for reply in hnd:
357                 self.fail()
358             res = hnd.result()
359             self.fail()
360         except LdbError as e11:
361             (num, _) = e11.args
362             if num != ERR_UNWILLING_TO_PERFORM:
363                 print("va[%s]" % va)
364             self.assertEqual(num, ERR_UNWILLING_TO_PERFORM)
365
366
367 if "://" not in url:
368     if os.path.isfile(url):
369         url = "tdb://%s" % url
370     else:
371         url = "ldap://%s" % url
372
373 TestProgram(module=__name__, opts=subunitopts)