dsdb: Add more locking more tests, confirming blocking locks in both directions
[samba.git] / python / samba / tests / dsdb.py
1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for samba.dsdb."""
19
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.ndr import ndr_unpack, ndr_pack
25 from samba.dcerpc import drsblobs
26 import ldb
27 import os
28 import samba
29 import gc
30 import time
31
32 class DsdbTests(TestCase):
33
34     def setUp(self):
35         super(DsdbTests, self).setUp()
36         self.lp = samba.tests.env_loadparm()
37         self.creds = Credentials()
38         self.creds.guess(self.lp)
39         self.session = system_session()
40         self.samdb = SamDB(session_info=self.session,
41                            credentials=self.creds,
42                            lp=self.lp)
43
44     def test_get_oid_from_attrid(self):
45         oid = self.samdb.get_oid_from_attid(591614)
46         self.assertEquals(oid, "1.2.840.113556.1.4.1790")
47
48     def test_error_replpropertymetadata(self):
49         res = self.samdb.search(expression="cn=Administrator",
50                             scope=ldb.SCOPE_SUBTREE,
51                             attrs=["replPropertyMetaData"])
52         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
53                             str(res[0]["replPropertyMetaData"]))
54         ctr = repl.ctr
55         for o in ctr.array:
56             # Search for Description
57             if o.attid == 13:
58                 old_version = o.version
59                 o.version = o.version + 1
60         replBlob = ndr_pack(repl)
61         msg = ldb.Message()
62         msg.dn = res[0].dn
63         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
64         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
65
66     def test_error_replpropertymetadata_nochange(self):
67         res = self.samdb.search(expression="cn=Administrator",
68                             scope=ldb.SCOPE_SUBTREE,
69                             attrs=["replPropertyMetaData"])
70         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
71                             str(res[0]["replPropertyMetaData"]))
72         replBlob = ndr_pack(repl)
73         msg = ldb.Message()
74         msg.dn = res[0].dn
75         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
76         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
77
78     def test_error_replpropertymetadata_allow_sort(self):
79         res = self.samdb.search(expression="cn=Administrator",
80                             scope=ldb.SCOPE_SUBTREE,
81                             attrs=["replPropertyMetaData"])
82         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
83                             str(res[0]["replPropertyMetaData"]))
84         replBlob = ndr_pack(repl)
85         msg = ldb.Message()
86         msg.dn = res[0].dn
87         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
88         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
89
90     def test_twoatt_replpropertymetadata(self):
91         res = self.samdb.search(expression="cn=Administrator",
92                             scope=ldb.SCOPE_SUBTREE,
93                             attrs=["replPropertyMetaData", "uSNChanged"])
94         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
95                             str(res[0]["replPropertyMetaData"]))
96         ctr = repl.ctr
97         for o in ctr.array:
98             # Search for Description
99             if o.attid == 13:
100                 old_version = o.version
101                 o.version = o.version + 1
102                 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
103         replBlob = ndr_pack(repl)
104         msg = ldb.Message()
105         msg.dn = res[0].dn
106         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
107         msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
108         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
109
110     def test_set_replpropertymetadata(self):
111         res = self.samdb.search(expression="cn=Administrator",
112                             scope=ldb.SCOPE_SUBTREE,
113                             attrs=["replPropertyMetaData", "uSNChanged"])
114         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
115                             str(res[0]["replPropertyMetaData"]))
116         ctr = repl.ctr
117         for o in ctr.array:
118             # Search for Description
119             if o.attid == 13:
120                 old_version = o.version
121                 o.version = o.version + 1
122                 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
123                 o.originating_usn = long(str(res[0]["uSNChanged"])) + 1
124         replBlob = ndr_pack(repl)
125         msg = ldb.Message()
126         msg.dn = res[0].dn
127         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
128         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
129
130     def test_ok_get_attribute_from_attid(self):
131         self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
132
133     def test_ko_get_attribute_from_attid(self):
134         self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
135
136     def test_get_attribute_replmetadata_version(self):
137         res = self.samdb.search(expression="cn=Administrator",
138                             scope=ldb.SCOPE_SUBTREE,
139                             attrs=["dn"])
140         self.assertEquals(len(res), 1)
141         dn = str(res[0].dn)
142         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1)
143
144     def test_set_attribute_replmetadata_version(self):
145         res = self.samdb.search(expression="cn=Administrator",
146                             scope=ldb.SCOPE_SUBTREE,
147                             attrs=["dn"])
148         self.assertEquals(len(res), 1)
149         dn = str(res[0].dn)
150         version = self.samdb.get_attribute_replmetadata_version(dn, "description")
151         self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
152         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
153
154     def test_db_lock1(self):
155         basedn = self.samdb.get_default_basedn()
156         (r1, w1) = os.pipe()
157
158         pid = os.fork()
159         if pid == 0:
160             # In the child, close the main DB, re-open just one DB
161             del(self.samdb)
162             gc.collect()
163             self.samdb = SamDB(session_info=self.session,
164                                credentials=self.creds,
165                                lp=self.lp)
166
167             self.samdb.transaction_start()
168
169             dn = "cn=test_db_lock_user,cn=users," + str(basedn)
170             self.samdb.add({
171                  "dn": dn,
172                  "objectclass": "user",
173             })
174             self.samdb.delete(dn)
175
176             # Obtain a write lock
177             self.samdb.transaction_prepare_commit()
178             os.write(w1, b"prepared")
179             time.sleep(2)
180
181             # Drop the write lock
182             self.samdb.transaction_cancel()
183             os._exit(0)
184
185         self.assertEqual(os.read(r1, 8), b"prepared")
186
187         start = time.time()
188
189         # We need to hold this iterator open to hold the all-record lock.
190         res = self.samdb.search_iterator()
191
192         # This should take at least 2 seconds because the transaction
193         # has a write lock on one backend db open
194
195         # Release the locks
196         for l in res:
197             pass
198
199         end = time.time()
200         self.assertGreater(end - start, 1.9)
201
202         (got_pid, status) = os.waitpid(pid, 0)
203         self.assertEqual(got_pid, pid)
204         self.assertTrue(os.WIFEXITED(status))
205         self.assertEqual(os.WEXITSTATUS(status), 0)
206
207     def test_db_lock2(self):
208         basedn = self.samdb.get_default_basedn()
209         (r1, w1) = os.pipe()
210         (r2, w2) = os.pipe()
211
212         pid = os.fork()
213         if pid == 0:
214             # In the child, close the main DB, re-open
215             del(self.samdb)
216             gc.collect()
217             self.samdb = SamDB(session_info=self.session,
218                            credentials=self.creds,
219                            lp=self.lp)
220
221             # We need to hold this iterator open to hold the all-record lock.
222             res = self.samdb.search_iterator()
223
224             os.write(w2, b"start")
225             if (os.read(r1, 7) != b"started"):
226                 os._exit(1)
227
228             os.write(w2, b"add")
229             if (os.read(r1, 5) != b"added"):
230                 os._exit(2)
231
232             # Wait 2 seconds to block prepare_commit() in the child.
233             os.write(w2, b"prepare")
234             time.sleep(2)
235
236             # Release the locks
237             for l in res:
238                 pass
239
240             if (os.read(r1, 8) != b"prepared"):
241                 os._exit(3)
242
243             os._exit(0)
244
245         # We can start the transaction during the search
246         # because both just grab the all-record read lock.
247         self.assertEqual(os.read(r2, 5), b"start")
248         self.samdb.transaction_start()
249         os.write(w1, b"started")
250
251         self.assertEqual(os.read(r2, 3), b"add")
252         dn = "cn=test_db_lock_user,cn=users," + str(basedn)
253         self.samdb.add({
254              "dn": dn,
255              "objectclass": "user",
256         })
257         self.samdb.delete(dn)
258         os.write(w1, b"added")
259
260         # Obtain a write lock, this will block until
261         # the parent releases the read lock.
262         self.assertEqual(os.read(r2, 7), b"prepare")
263         start = time.time()
264         self.samdb.transaction_prepare_commit()
265         end = time.time()
266         try:
267             self.assertGreater(end - start, 1.9)
268         except:
269             raise
270         finally:
271             os.write(w1, b"prepared")
272
273             # Drop the write lock
274             self.samdb.transaction_cancel()
275
276             (got_pid, status) = os.waitpid(pid, 0)
277             self.assertEqual(got_pid, pid)
278             self.assertTrue(os.WIFEXITED(status))
279             self.assertEqual(os.WEXITSTATUS(status), 0)
280
281     def test_full_db_lock1(self):
282         basedn = self.samdb.get_default_basedn()
283         backend_filename = "%s.ldb" % basedn.get_casefold()
284         backend_subpath = os.path.join("sam.ldb.d",
285                                        backend_filename)
286         backend_path = self.lp.private_path(backend_subpath)
287         (r1, w1) = os.pipe()
288
289         pid = os.fork()
290         if pid == 0:
291             # In the child, close the main DB, re-open just one DB
292             del(self.samdb)
293             gc.collect()
294
295             backenddb = ldb.Ldb(backend_path)
296
297
298             backenddb.transaction_start()
299
300             backenddb.add({"dn":"@DSDB_LOCK_TEST"})
301             backenddb.delete("@DSDB_LOCK_TEST")
302
303             # Obtain a write lock
304             backenddb.transaction_prepare_commit()
305             os.write(w1, b"prepared")
306             time.sleep(2)
307
308             # Drop the write lock
309             backenddb.transaction_cancel()
310             os._exit(0)
311
312         self.assertEqual(os.read(r1, 8), b"prepared")
313
314         start = time.time()
315
316         # We need to hold this iterator open to hold the all-record lock.
317         res = self.samdb.search_iterator()
318
319         # This should take at least 2 seconds because the transaction
320         # has a write lock on one backend db open
321
322         end = time.time()
323         self.assertGreater(end - start, 1.9)
324
325         # Release the locks
326         for l in res:
327             pass
328
329         (got_pid, status) = os.waitpid(pid, 0)
330         self.assertEqual(got_pid, pid)
331         self.assertTrue(os.WIFEXITED(status))
332         self.assertEqual(os.WEXITSTATUS(status), 0)
333
334     def test_full_db_lock2(self):
335         basedn = self.samdb.get_default_basedn()
336         backend_filename = "%s.ldb" % basedn.get_casefold()
337         backend_subpath = os.path.join("sam.ldb.d",
338                                        backend_filename)
339         backend_path = self.lp.private_path(backend_subpath)
340         (r1, w1) = os.pipe()
341         (r2, w2) = os.pipe()
342
343         pid = os.fork()
344         if pid == 0:
345
346             # In the child, close the main DB, re-open
347             del(self.samdb)
348             gc.collect()
349             self.samdb = SamDB(session_info=self.session,
350                            credentials=self.creds,
351                            lp=self.lp)
352
353             # We need to hold this iterator open to hold the all-record lock.
354             res = self.samdb.search_iterator()
355
356             os.write(w2, b"start")
357             if (os.read(r1, 7) != b"started"):
358                 os._exit(1)
359             os.write(w2, b"add")
360             if (os.read(r1, 5) != b"added"):
361                 os._exit(2)
362
363             # Wait 2 seconds to block prepare_commit() in the child.
364             os.write(w2, b"prepare")
365             time.sleep(2)
366
367             # Release the locks
368             for l in res:
369                 pass
370
371             if (os.read(r1, 8) != b"prepared"):
372                 os._exit(3)
373
374             os._exit(0)
375
376         # In the parent, close the main DB, re-open just one DB
377         del(self.samdb)
378         gc.collect()
379         backenddb = ldb.Ldb(backend_path)
380
381         # We can start the transaction during the search
382         # because both just grab the all-record read lock.
383         self.assertEqual(os.read(r2, 5), b"start")
384         backenddb.transaction_start()
385         os.write(w1, b"started")
386
387         self.assertEqual(os.read(r2, 3), b"add")
388         backenddb.add({"dn":"@DSDB_LOCK_TEST"})
389         backenddb.delete("@DSDB_LOCK_TEST")
390         os.write(w1, b"added")
391
392         # Obtain a write lock, this will block until
393         # the child releases the read lock.
394         self.assertEqual(os.read(r2, 7), b"prepare")
395         start = time.time()
396         backenddb.transaction_prepare_commit()
397         end = time.time()
398
399         try:
400             self.assertGreater(end - start, 1.9)
401         except:
402             raise
403         finally:
404             os.write(w1, b"prepared")
405
406             # Drop the write lock
407             backenddb.transaction_cancel()
408
409             (got_pid, status) = os.waitpid(pid, 0)
410             self.assertEqual(got_pid, pid)
411             self.assertTrue(os.WIFEXITED(status))
412             self.assertEqual(os.WEXITSTATUS(status), 0)