bb3280aaece8a13b5d39e0d8b29a4a87be44f7ac
[nivanova/samba-autobuild/.git] / source4 / dsdb / tests / python / notification.py
1 #!/usr/bin/env python
2 #
3 # Unit tests for the notification control
4 # Copyright (C) Stefan Metzmacher 2016
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 from __future__ import print_function
20 import optparse
21 import sys
22 import os
23
24 sys.path.insert(0, "bin/python")
25 import samba
26
27 from samba.tests.subunitrun import SubunitOptions, TestProgram
28
29 import samba.getopt as options
30
31 from samba.auth import system_session
32 from samba import ldb
33 from samba.samdb import SamDB
34 from samba.ndr import ndr_unpack
35 from samba import gensec
36 from samba.credentials import Credentials
37 import samba.tests
38
39 from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
40
41 from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
42 from ldb import ERR_TIME_LIMIT_EXCEEDED, ERR_ADMIN_LIMIT_EXCEEDED, ERR_UNWILLING_TO_PERFORM
43 from ldb import Message
44
45 parser = optparse.OptionParser("notification.py [options] <host>")
46 sambaopts = options.SambaOptions(parser)
47 parser.add_option_group(sambaopts)
48 parser.add_option_group(options.VersionOptions(parser))
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
55
56 if len(args) < 1:
57     parser.print_usage()
58     sys.exit(1)
59
60 url = args[0]
61
62 lp = sambaopts.get_loadparm()
63 creds = credopts.get_credentials(lp)
64 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
65
66
67 class LDAPNotificationTest(samba.tests.TestCase):
68
69     def setUp(self):
70         super(samba.tests.TestCase, self).setUp()
71         self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
72         self.base_dn = self.ldb.domain_dn()
73
74         res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
75         self.assertEquals(len(res), 1)
76
77         self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
78
79     def test_simple_search(self):
80         """Testing a notification with an modify and a timeout"""
81         if not url.startswith("ldap"):
82             self.fail(msg="This test is only valid on ldap")
83
84         msg1 = None
85         search1 = self.ldb.search_iterator(base=self.user_sid_dn,
86                                            expression="(objectClass=*)",
87                                            scope=ldb.SCOPE_SUBTREE,
88                                            attrs=["name", "objectGUID", "displayName"])
89         for reply in search1:
90             self.assertIsInstance(reply, ldb.Message)
91             self.assertIsNone(msg1)
92             msg1 = reply
93         res1 = search1.result()
94
95         search2 = self.ldb.search_iterator(base=self.base_dn,
96                                            expression="(objectClass=*)",
97                                            scope=ldb.SCOPE_SUBTREE,
98                                            attrs=["name", "objectGUID", "displayName"])
99         refs2 = 0
100         msg2 = None
101         for reply in search2:
102             if isinstance(reply, str):
103                 refs2 += 1
104                 continue
105             self.assertIsInstance(reply, ldb.Message)
106             if reply["objectGUID"][0] == msg1["objectGUID"][0]:
107                 self.assertIsNone(msg2)
108                 msg2 = reply
109                 self.assertEqual(msg1.dn, msg2.dn)
110                 self.assertEqual(len(msg1), len(msg2))
111                 self.assertEqual(msg1["name"], msg2["name"])
112                 #self.assertEqual(msg1["displayName"], msg2["displayName"])
113         res2 = search2.result()
114
115         self.ldb.modify_ldif("""
116 dn: """ + self.user_sid_dn + """
117 changetype: modify
118 replace: otherLoginWorkstations
119 otherLoginWorkstations: BEFORE"
120 """)
121         notify1 = self.ldb.search_iterator(base=self.base_dn,
122                                            expression="(objectClass=*)",
123                                            scope=ldb.SCOPE_SUBTREE,
124                                            attrs=["name", "objectGUID", "displayName"],
125                                            controls=["notification:1"],
126                                            timeout=1)
127
128         self.ldb.modify_ldif("""
129 dn: """ + self.user_sid_dn + """
130 changetype: modify
131 replace: otherLoginWorkstations
132 otherLoginWorkstations: AFTER"
133 """)
134
135         msg3 = None
136         for reply in notify1:
137             self.assertIsInstance(reply, ldb.Message)
138             if reply["objectGUID"][0] == msg1["objectGUID"][0]:
139                 self.assertIsNone(msg3)
140                 msg3 = reply
141                 self.assertEqual(msg1.dn, msg3.dn)
142                 self.assertEqual(len(msg1), len(msg3))
143                 self.assertEqual(msg1["name"], msg3["name"])
144                 #self.assertEqual(msg1["displayName"], msg3["displayName"])
145         try:
146             res = notify1.result()
147             self.fail()
148         except LdbError as e10:
149             (num, _) = e10.args
150             self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
151         self.assertIsNotNone(msg3)
152
153         self.ldb.modify_ldif("""
154 dn: """ + self.user_sid_dn + """
155 changetype: delete
156 delete: otherLoginWorkstations
157 """)
158
159     def test_max_search(self):
160         """Testing the max allowed notifications"""
161         if not url.startswith("ldap"):
162             self.fail(msg="This test is only valid on ldap")
163
164         max_notifications = 5
165
166         notifies = [None] * (max_notifications + 1)
167         for i in range(0, max_notifications + 1):
168             notifies[i] = self.ldb.search_iterator(base=self.base_dn,
169                                                    expression="(objectClass=*)",
170                                                    scope=ldb.SCOPE_SUBTREE,
171                                                    attrs=["name"],
172                                                    controls=["notification:1"],
173                                                    timeout=1)
174         num_admin_limit = 0
175         num_time_limit = 0
176         for i in range(0, max_notifications + 1):
177             try:
178                 for msg in notifies[i]:
179                     continue
180                 res = notifies[i].result()
181                 self.fail()
182             except LdbError as e:
183                 (num, _) = e.args
184                 if num == ERR_ADMIN_LIMIT_EXCEEDED:
185                     num_admin_limit += 1
186                     continue
187                 if num == ERR_TIME_LIMIT_EXCEEDED:
188                     num_time_limit += 1
189                     continue
190                 raise
191         self.assertEqual(num_admin_limit, 1)
192         self.assertEqual(num_time_limit, max_notifications)
193
194     def test_invalid_filter(self):
195         """Testing invalid filters for notifications"""
196         if not url.startswith("ldap"):
197             self.fail(msg="This test is only valid on ldap")
198
199         valid_attrs = ["objectClass", "objectGUID", "distinguishedName", "name"]
200
201         for va in valid_attrs:
202             try:
203                 hnd = self.ldb.search_iterator(base=self.base_dn,
204                                                expression="(%s=*)" % va,
205                                                scope=ldb.SCOPE_SUBTREE,
206                                                attrs=["name"],
207                                                controls=["notification:1"],
208                                                timeout=1)
209                 for reply in hnd:
210                     self.fail()
211                 res = hnd.result()
212                 self.fail()
213             except LdbError as e1:
214                 (num, _) = e1.args
215                 self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
216
217             try:
218                 hnd = self.ldb.search_iterator(base=self.base_dn,
219                                                expression="(|(%s=*)(%s=value))" % (va, va),
220                                                scope=ldb.SCOPE_SUBTREE,
221                                                attrs=["name"],
222                                                controls=["notification:1"],
223                                                timeout=1)
224                 for reply in hnd:
225                     self.fail()
226                 res = hnd.result()
227                 self.fail()
228             except LdbError as e2:
229                 (num, _) = e2.args
230                 self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
231
232             try:
233                 hnd = self.ldb.search_iterator(base=self.base_dn,
234                                                expression="(&(%s=*)(%s=value))" % (va, va),
235                                                scope=ldb.SCOPE_SUBTREE,
236                                                attrs=["name"],
237                                                controls=["notification:1"],
238                                                timeout=0)
239                 for reply in hnd:
240                     self.fail()
241                 res = hnd.result()
242                 self.fail()
243             except LdbError as e3:
244                 (num, _) = e3.args
245                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
246
247             try:
248                 hnd = self.ldb.search_iterator(base=self.base_dn,
249                                                expression="(%s=value)" % va,
250                                                scope=ldb.SCOPE_SUBTREE,
251                                                attrs=["name"],
252                                                controls=["notification:1"],
253                                                timeout=0)
254                 for reply in hnd:
255                     self.fail()
256                 res = hnd.result()
257                 self.fail()
258             except LdbError as e4:
259                 (num, _) = e4.args
260                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
261
262             try:
263                 hnd = self.ldb.search_iterator(base=self.base_dn,
264                                                expression="(%s>=value)" % va,
265                                                scope=ldb.SCOPE_SUBTREE,
266                                                attrs=["name"],
267                                                controls=["notification:1"],
268                                                timeout=0)
269                 for reply in hnd:
270                     self.fail()
271                 res = hnd.result()
272                 self.fail()
273             except LdbError as e5:
274                 (num, _) = e5.args
275                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
276
277             try:
278                 hnd = self.ldb.search_iterator(base=self.base_dn,
279                                                expression="(%s<=value)" % va,
280                                                scope=ldb.SCOPE_SUBTREE,
281                                                attrs=["name"],
282                                                controls=["notification:1"],
283                                                timeout=0)
284                 for reply in hnd:
285                     self.fail()
286                 res = hnd.result()
287                 self.fail()
288             except LdbError as e6:
289                 (num, _) = e6.args
290                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
291
292             try:
293                 hnd = self.ldb.search_iterator(base=self.base_dn,
294                                                expression="(%s=*value*)" % va,
295                                                scope=ldb.SCOPE_SUBTREE,
296                                                attrs=["name"],
297                                                controls=["notification:1"],
298                                                timeout=0)
299                 for reply in hnd:
300                     self.fail()
301                 res = hnd.result()
302                 self.fail()
303             except LdbError as e7:
304                 (num, _) = e7.args
305                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
306
307             try:
308                 hnd = self.ldb.search_iterator(base=self.base_dn,
309                                                expression="(!(%s=*))" % va,
310                                                scope=ldb.SCOPE_SUBTREE,
311                                                attrs=["name"],
312                                                controls=["notification:1"],
313                                                timeout=0)
314                 for reply in hnd:
315                     self.fail()
316                 res = hnd.result()
317                 self.fail()
318             except LdbError as e8:
319                 (num, _) = e8.args
320                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
321
322         res = self.ldb.search(base=self.ldb.get_schema_basedn(),
323                               expression="(objectClass=attributeSchema)",
324                               scope=ldb.SCOPE_ONELEVEL,
325                               attrs=["lDAPDisplayName"],
326                               controls=["paged_results:1:2500"])
327         for msg in res:
328             va = msg["lDAPDisplayName"][0]
329             if va in valid_attrs:
330                 continue
331
332             try:
333                 hnd = self.ldb.search_iterator(base=self.base_dn,
334                                                expression="(%s=*)" % va,
335                                                scope=ldb.SCOPE_SUBTREE,
336                                                attrs=["name"],
337                                                controls=["notification:1"],
338                                                timeout=0)
339                 for reply in hnd:
340                     self.fail()
341                 res = hnd.result()
342                 self.fail()
343             except LdbError as e9:
344                 (num, _) = e9.args
345                 if num != ERR_UNWILLING_TO_PERFORM:
346                     print("va[%s]" % va)
347                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
348
349         try:
350             va = "noneAttributeName"
351             hnd = self.ldb.search_iterator(base=self.base_dn,
352                                            expression="(%s=*)" % va,
353                                            scope=ldb.SCOPE_SUBTREE,
354                                            attrs=["name"],
355                                            controls=["notification:1"],
356                                            timeout=0)
357             for reply in hnd:
358                 self.fail()
359             res = hnd.result()
360             self.fail()
361         except LdbError as e11:
362             (num, _) = e11.args
363             if num != ERR_UNWILLING_TO_PERFORM:
364                 print("va[%s]" % va)
365             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
366
367
368 if not "://" in url:
369     if os.path.isfile(url):
370         url = "tdb://%s" % url
371     else:
372         url = "ldap://%s" % url
373
374 TestProgram(module=__name__, opts=subunitopts)