python:tests: Create a test user for the dsdb test
[metze/samba/wip.git] / python / samba / tests / dsdb.py
1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for samba.dsdb."""
19
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.tests import delete_force
25 from samba.ndr import ndr_unpack, ndr_pack
26 from samba.dcerpc import drsblobs
27 from samba import dsdb
28 import ldb
29 import os
30 import samba
31 import gc
32 import time
33
34 class DsdbTests(TestCase):
35
36     def setUp(self):
37         super(DsdbTests, self).setUp()
38         self.lp = samba.tests.env_loadparm()
39         self.creds = Credentials()
40         self.creds.guess(self.lp)
41         self.session = system_session()
42         self.samdb = SamDB(session_info=self.session,
43                            credentials=self.creds,
44                            lp=self.lp)
45
46         # Create a test user
47         user_name = "samdb-testuser"
48         user_pass = samba.generate_random_password(32, 32)
49         user_description = "Test user for dsdb test"
50
51         base_dn = self.samdb.domain_dn()
52
53         self.account_dn = "cn=" + user_name + ",cn=Users," + base_dn
54         delete_force(self.samdb, self.account_dn)
55         self.samdb.newuser(username=user_name,
56                            password=user_pass,
57                            description=user_description)
58
59     def test_get_oid_from_attrid(self):
60         oid = self.samdb.get_oid_from_attid(591614)
61         self.assertEquals(oid, "1.2.840.113556.1.4.1790")
62
63     def test_error_replpropertymetadata(self):
64         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
65                                 base=self.account_dn,
66                                 attrs=["replPropertyMetaData"])
67         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
68                             str(res[0]["replPropertyMetaData"]))
69         ctr = repl.ctr
70         for o in ctr.array:
71             # Search for Description
72             if o.attid == 13:
73                 old_version = o.version
74                 o.version = o.version + 1
75         replBlob = ndr_pack(repl)
76         msg = ldb.Message()
77         msg.dn = res[0].dn
78         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
79         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
80
81     def test_error_replpropertymetadata_nochange(self):
82         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
83                                 base=self.account_dn,
84                                 attrs=["replPropertyMetaData"])
85         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
86                             str(res[0]["replPropertyMetaData"]))
87         replBlob = ndr_pack(repl)
88         msg = ldb.Message()
89         msg.dn = res[0].dn
90         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
91         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
92
93     def test_error_replpropertymetadata_allow_sort(self):
94         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
95                                 base=self.account_dn,
96                                 attrs=["replPropertyMetaData"])
97         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
98                             str(res[0]["replPropertyMetaData"]))
99         replBlob = ndr_pack(repl)
100         msg = ldb.Message()
101         msg.dn = res[0].dn
102         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
103         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
104
105     def test_twoatt_replpropertymetadata(self):
106         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
107                                 base=self.account_dn,
108                                 attrs=["replPropertyMetaData", "uSNChanged"])
109         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
110                             str(res[0]["replPropertyMetaData"]))
111         ctr = repl.ctr
112         for o in ctr.array:
113             # Search for Description
114             if o.attid == 13:
115                 old_version = o.version
116                 o.version = o.version + 1
117                 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
118         replBlob = ndr_pack(repl)
119         msg = ldb.Message()
120         msg.dn = res[0].dn
121         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
122         msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
123         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
124
125     def test_set_replpropertymetadata(self):
126         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
127                                 base=self.account_dn,
128                                 attrs=["replPropertyMetaData", "uSNChanged"])
129         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
130                             str(res[0]["replPropertyMetaData"]))
131         ctr = repl.ctr
132         for o in ctr.array:
133             # Search for Description
134             if o.attid == 13:
135                 old_version = o.version
136                 o.version = o.version + 1
137                 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
138                 o.originating_usn = long(str(res[0]["uSNChanged"])) + 1
139         replBlob = ndr_pack(repl)
140         msg = ldb.Message()
141         msg.dn = res[0].dn
142         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
143         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
144
145     def test_ok_get_attribute_from_attid(self):
146         self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
147
148     def test_ko_get_attribute_from_attid(self):
149         self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
150
151     def test_get_attribute_replmetadata_version(self):
152         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
153                                 base=self.account_dn,
154                                 attrs=["dn"])
155         self.assertEquals(len(res), 1)
156         dn = str(res[0].dn)
157         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 2)
158
159     def test_set_attribute_replmetadata_version(self):
160         res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
161                                 base=self.account_dn,
162                                 attrs=["dn"])
163         self.assertEquals(len(res), 1)
164         dn = str(res[0].dn)
165         version = self.samdb.get_attribute_replmetadata_version(dn, "description")
166         self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
167         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
168
169     def test_db_lock1(self):
170         basedn = self.samdb.get_default_basedn()
171         (r1, w1) = os.pipe()
172
173         pid = os.fork()
174         if pid == 0:
175             # In the child, close the main DB, re-open just one DB
176             del(self.samdb)
177             gc.collect()
178             self.samdb = SamDB(session_info=self.session,
179                                credentials=self.creds,
180                                lp=self.lp)
181
182             self.samdb.transaction_start()
183
184             dn = "cn=test_db_lock_user,cn=users," + str(basedn)
185             self.samdb.add({
186                  "dn": dn,
187                  "objectclass": "user",
188             })
189             self.samdb.delete(dn)
190
191             # Obtain a write lock
192             self.samdb.transaction_prepare_commit()
193             os.write(w1, b"prepared")
194             time.sleep(2)
195
196             # Drop the write lock
197             self.samdb.transaction_cancel()
198             os._exit(0)
199
200         self.assertEqual(os.read(r1, 8), b"prepared")
201
202         start = time.time()
203
204         # We need to hold this iterator open to hold the all-record lock.
205         res = self.samdb.search_iterator()
206
207         # This should take at least 2 seconds because the transaction
208         # has a write lock on one backend db open
209
210         # Release the locks
211         for l in res:
212             pass
213
214         end = time.time()
215         self.assertGreater(end - start, 1.9)
216
217         (got_pid, status) = os.waitpid(pid, 0)
218         self.assertEqual(got_pid, pid)
219         self.assertTrue(os.WIFEXITED(status))
220         self.assertEqual(os.WEXITSTATUS(status), 0)
221
222     def test_db_lock2(self):
223         basedn = self.samdb.get_default_basedn()
224         (r1, w1) = os.pipe()
225         (r2, w2) = os.pipe()
226
227         pid = os.fork()
228         if pid == 0:
229             # In the child, close the main DB, re-open
230             del(self.samdb)
231             gc.collect()
232             self.samdb = SamDB(session_info=self.session,
233                            credentials=self.creds,
234                            lp=self.lp)
235
236             # We need to hold this iterator open to hold the all-record lock.
237             res = self.samdb.search_iterator()
238
239             os.write(w2, b"start")
240             if (os.read(r1, 7) != b"started"):
241                 os._exit(1)
242
243             os.write(w2, b"add")
244             if (os.read(r1, 5) != b"added"):
245                 os._exit(2)
246
247             # Wait 2 seconds to block prepare_commit() in the child.
248             os.write(w2, b"prepare")
249             time.sleep(2)
250
251             # Release the locks
252             for l in res:
253                 pass
254
255             if (os.read(r1, 8) != b"prepared"):
256                 os._exit(3)
257
258             os._exit(0)
259
260         # We can start the transaction during the search
261         # because both just grab the all-record read lock.
262         self.assertEqual(os.read(r2, 5), b"start")
263         self.samdb.transaction_start()
264         os.write(w1, b"started")
265
266         self.assertEqual(os.read(r2, 3), b"add")
267         dn = "cn=test_db_lock_user,cn=users," + str(basedn)
268         self.samdb.add({
269              "dn": dn,
270              "objectclass": "user",
271         })
272         self.samdb.delete(dn)
273         os.write(w1, b"added")
274
275         # Obtain a write lock, this will block until
276         # the parent releases the read lock.
277         self.assertEqual(os.read(r2, 7), b"prepare")
278         start = time.time()
279         self.samdb.transaction_prepare_commit()
280         end = time.time()
281         try:
282             self.assertGreater(end - start, 1.9)
283         except:
284             raise
285         finally:
286             os.write(w1, b"prepared")
287
288             # Drop the write lock
289             self.samdb.transaction_cancel()
290
291             (got_pid, status) = os.waitpid(pid, 0)
292             self.assertEqual(got_pid, pid)
293             self.assertTrue(os.WIFEXITED(status))
294             self.assertEqual(os.WEXITSTATUS(status), 0)
295
296     def test_db_lock3(self):
297         basedn = self.samdb.get_default_basedn()
298         (r1, w1) = os.pipe()
299         (r2, w2) = os.pipe()
300
301         pid = os.fork()
302         if pid == 0:
303             # In the child, close the main DB, re-open
304             del(self.samdb)
305             gc.collect()
306             self.samdb = SamDB(session_info=self.session,
307                            credentials=self.creds,
308                            lp=self.lp)
309
310             # We need to hold this iterator open to hold the all-record lock.
311             res = self.samdb.search_iterator()
312
313             os.write(w2, b"start")
314             if (os.read(r1, 7) != b"started"):
315                 os._exit(1)
316
317             os.write(w2, b"add")
318             if (os.read(r1, 5) != b"added"):
319                 os._exit(2)
320
321             # Wait 2 seconds to block prepare_commit() in the child.
322             os.write(w2, b"prepare")
323             time.sleep(2)
324
325             # Release the locks
326             for l in res:
327                 pass
328
329             if (os.read(r1, 8) != b"prepared"):
330                 os._exit(3)
331
332             os._exit(0)
333
334         # We can start the transaction during the search
335         # because both just grab the all-record read lock.
336         self.assertEqual(os.read(r2, 5), b"start")
337         self.samdb.transaction_start()
338         os.write(w1, b"started")
339
340         self.assertEqual(os.read(r2, 3), b"add")
341
342         # This will end up in the top level db
343         dn = "@DSDB_LOCK_TEST"
344         self.samdb.add({
345              "dn": dn})
346         self.samdb.delete(dn)
347         os.write(w1, b"added")
348
349         # Obtain a write lock, this will block until
350         # the child releases the read lock.
351         self.assertEqual(os.read(r2, 7), b"prepare")
352         start = time.time()
353         self.samdb.transaction_prepare_commit()
354         end = time.time()
355         self.assertGreater(end - start, 1.9)
356         os.write(w1, b"prepared")
357
358         # Drop the write lock
359         self.samdb.transaction_cancel()
360
361         (got_pid, status) = os.waitpid(pid, 0)
362         self.assertTrue(os.WIFEXITED(status))
363         self.assertEqual(os.WEXITSTATUS(status), 0)
364         self.assertEqual(got_pid, pid)
365
366
367     def _test_full_db_lock1(self, backend_path):
368         (r1, w1) = os.pipe()
369
370         pid = os.fork()
371         if pid == 0:
372             # In the child, close the main DB, re-open just one DB
373             del(self.samdb)
374             gc.collect()
375
376             backenddb = ldb.Ldb(backend_path)
377
378
379             backenddb.transaction_start()
380
381             backenddb.add({"dn":"@DSDB_LOCK_TEST"})
382             backenddb.delete("@DSDB_LOCK_TEST")
383
384             # Obtain a write lock
385             backenddb.transaction_prepare_commit()
386             os.write(w1, b"prepared")
387             time.sleep(2)
388
389             # Drop the write lock
390             backenddb.transaction_cancel()
391             os._exit(0)
392
393         self.assertEqual(os.read(r1, 8), b"prepared")
394
395         start = time.time()
396
397         # We need to hold this iterator open to hold the all-record lock.
398         res = self.samdb.search_iterator()
399
400         # This should take at least 2 seconds because the transaction
401         # has a write lock on one backend db open
402
403         end = time.time()
404         self.assertGreater(end - start, 1.9)
405
406         # Release the locks
407         for l in res:
408             pass
409
410         (got_pid, status) = os.waitpid(pid, 0)
411         self.assertEqual(got_pid, pid)
412         self.assertTrue(os.WIFEXITED(status))
413         self.assertEqual(os.WEXITSTATUS(status), 0)
414
415     def test_full_db_lock1(self):
416         basedn = self.samdb.get_default_basedn()
417         backend_filename = "%s.ldb" % basedn.get_casefold()
418         backend_subpath = os.path.join("sam.ldb.d",
419                                        backend_filename)
420         backend_path = self.lp.private_path(backend_subpath)
421         self._test_full_db_lock1(backend_path)
422
423
424     def test_full_db_lock1_config(self):
425         basedn = self.samdb.get_config_basedn()
426         backend_filename = "%s.ldb" % basedn.get_casefold()
427         backend_subpath = os.path.join("sam.ldb.d",
428                                        backend_filename)
429         backend_path = self.lp.private_path(backend_subpath)
430         self._test_full_db_lock1(backend_path)
431
432
433     def _test_full_db_lock2(self, backend_path):
434         (r1, w1) = os.pipe()
435         (r2, w2) = os.pipe()
436
437         pid = os.fork()
438         if pid == 0:
439
440             # In the child, close the main DB, re-open
441             del(self.samdb)
442             gc.collect()
443             self.samdb = SamDB(session_info=self.session,
444                            credentials=self.creds,
445                            lp=self.lp)
446
447             # We need to hold this iterator open to hold the all-record lock.
448             res = self.samdb.search_iterator()
449
450             os.write(w2, b"start")
451             if (os.read(r1, 7) != b"started"):
452                 os._exit(1)
453             os.write(w2, b"add")
454             if (os.read(r1, 5) != b"added"):
455                 os._exit(2)
456
457             # Wait 2 seconds to block prepare_commit() in the child.
458             os.write(w2, b"prepare")
459             time.sleep(2)
460
461             # Release the locks
462             for l in res:
463                 pass
464
465             if (os.read(r1, 8) != b"prepared"):
466                 os._exit(3)
467
468             os._exit(0)
469
470         # In the parent, close the main DB, re-open just one DB
471         del(self.samdb)
472         gc.collect()
473         backenddb = ldb.Ldb(backend_path)
474
475         # We can start the transaction during the search
476         # because both just grab the all-record read lock.
477         self.assertEqual(os.read(r2, 5), b"start")
478         backenddb.transaction_start()
479         os.write(w1, b"started")
480
481         self.assertEqual(os.read(r2, 3), b"add")
482         backenddb.add({"dn":"@DSDB_LOCK_TEST"})
483         backenddb.delete("@DSDB_LOCK_TEST")
484         os.write(w1, b"added")
485
486         # Obtain a write lock, this will block until
487         # the child releases the read lock.
488         self.assertEqual(os.read(r2, 7), b"prepare")
489         start = time.time()
490         backenddb.transaction_prepare_commit()
491         end = time.time()
492
493         try:
494             self.assertGreater(end - start, 1.9)
495         except:
496             raise
497         finally:
498             os.write(w1, b"prepared")
499
500             # Drop the write lock
501             backenddb.transaction_cancel()
502
503             (got_pid, status) = os.waitpid(pid, 0)
504             self.assertEqual(got_pid, pid)
505             self.assertTrue(os.WIFEXITED(status))
506             self.assertEqual(os.WEXITSTATUS(status), 0)
507
508     def test_full_db_lock2(self):
509         basedn = self.samdb.get_default_basedn()
510         backend_filename = "%s.ldb" % basedn.get_casefold()
511         backend_subpath = os.path.join("sam.ldb.d",
512                                        backend_filename)
513         backend_path = self.lp.private_path(backend_subpath)
514         self._test_full_db_lock2(backend_path)
515
516     def test_full_db_lock2_config(self):
517         basedn = self.samdb.get_config_basedn()
518         backend_filename = "%s.ldb" % basedn.get_casefold()
519         backend_subpath = os.path.join("sam.ldb.d",
520                                        backend_filename)
521         backend_path = self.lp.private_path(backend_subpath)
522         self._test_full_db_lock2(backend_path)
523
524     def test_no_error_on_invalid_control(self):
525         try:
526             res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
527                                     base=self.account_dn,
528                                     attrs=["replPropertyMetaData"],
529                                     controls=["local_oid:%s:0"
530                                               % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
531         except ldb.LdbError as e:
532             self.fail("Should have not raised an exception")
533
534     def test_error_on_invalid_critical_control(self):
535         try:
536             res = self.samdb.search(scope=ldb.SCOPE_SUBTREE,
537                                     base=self.account_dn,
538                                     attrs=["replPropertyMetaData"],
539                                     controls=["local_oid:%s:1"
540                                               % dsdb.DSDB_CONTROL_INVALID_NOT_IMPLEMENTED])
541         except ldb.LdbError as e:
542             if e[0] != ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION:
543                 self.fail("Got %s should have got ERR_UNSUPPORTED_CRITICAL_EXTENSION"
544                           % e[1])