dsdb: Add test showing a search can't start while a transaction is already repared
[samba.git] / python / samba / tests / dsdb.py
1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for samba.dsdb."""
19
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.ndr import ndr_unpack, ndr_pack
25 from samba.dcerpc import drsblobs
26 import ldb
27 import os
28 import samba
29 import gc
30 import time
31
32 class DsdbTests(TestCase):
33
34     def setUp(self):
35         super(DsdbTests, self).setUp()
36         self.lp = samba.tests.env_loadparm()
37         self.creds = Credentials()
38         self.creds.guess(self.lp)
39         self.session = system_session()
40         self.samdb = SamDB(session_info=self.session,
41                            credentials=self.creds,
42                            lp=self.lp)
43
44     def test_get_oid_from_attrid(self):
45         oid = self.samdb.get_oid_from_attid(591614)
46         self.assertEquals(oid, "1.2.840.113556.1.4.1790")
47
48     def test_error_replpropertymetadata(self):
49         res = self.samdb.search(expression="cn=Administrator",
50                             scope=ldb.SCOPE_SUBTREE,
51                             attrs=["replPropertyMetaData"])
52         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
53                             str(res[0]["replPropertyMetaData"]))
54         ctr = repl.ctr
55         for o in ctr.array:
56             # Search for Description
57             if o.attid == 13:
58                 old_version = o.version
59                 o.version = o.version + 1
60         replBlob = ndr_pack(repl)
61         msg = ldb.Message()
62         msg.dn = res[0].dn
63         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
64         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
65
66     def test_error_replpropertymetadata_nochange(self):
67         res = self.samdb.search(expression="cn=Administrator",
68                             scope=ldb.SCOPE_SUBTREE,
69                             attrs=["replPropertyMetaData"])
70         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
71                             str(res[0]["replPropertyMetaData"]))
72         replBlob = ndr_pack(repl)
73         msg = ldb.Message()
74         msg.dn = res[0].dn
75         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
76         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
77
78     def test_error_replpropertymetadata_allow_sort(self):
79         res = self.samdb.search(expression="cn=Administrator",
80                             scope=ldb.SCOPE_SUBTREE,
81                             attrs=["replPropertyMetaData"])
82         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
83                             str(res[0]["replPropertyMetaData"]))
84         replBlob = ndr_pack(repl)
85         msg = ldb.Message()
86         msg.dn = res[0].dn
87         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
88         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
89
90     def test_twoatt_replpropertymetadata(self):
91         res = self.samdb.search(expression="cn=Administrator",
92                             scope=ldb.SCOPE_SUBTREE,
93                             attrs=["replPropertyMetaData", "uSNChanged"])
94         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
95                             str(res[0]["replPropertyMetaData"]))
96         ctr = repl.ctr
97         for o in ctr.array:
98             # Search for Description
99             if o.attid == 13:
100                 old_version = o.version
101                 o.version = o.version + 1
102                 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
103         replBlob = ndr_pack(repl)
104         msg = ldb.Message()
105         msg.dn = res[0].dn
106         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
107         msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
108         self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
109
110     def test_set_replpropertymetadata(self):
111         res = self.samdb.search(expression="cn=Administrator",
112                             scope=ldb.SCOPE_SUBTREE,
113                             attrs=["replPropertyMetaData", "uSNChanged"])
114         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
115                             str(res[0]["replPropertyMetaData"]))
116         ctr = repl.ctr
117         for o in ctr.array:
118             # Search for Description
119             if o.attid == 13:
120                 old_version = o.version
121                 o.version = o.version + 1
122                 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
123                 o.originating_usn = long(str(res[0]["uSNChanged"])) + 1
124         replBlob = ndr_pack(repl)
125         msg = ldb.Message()
126         msg.dn = res[0].dn
127         msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
128         self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
129
130     def test_ok_get_attribute_from_attid(self):
131         self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
132
133     def test_ko_get_attribute_from_attid(self):
134         self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
135
136     def test_get_attribute_replmetadata_version(self):
137         res = self.samdb.search(expression="cn=Administrator",
138                             scope=ldb.SCOPE_SUBTREE,
139                             attrs=["dn"])
140         self.assertEquals(len(res), 1)
141         dn = str(res[0].dn)
142         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1)
143
144     def test_set_attribute_replmetadata_version(self):
145         res = self.samdb.search(expression="cn=Administrator",
146                             scope=ldb.SCOPE_SUBTREE,
147                             attrs=["dn"])
148         self.assertEquals(len(res), 1)
149         dn = str(res[0].dn)
150         version = self.samdb.get_attribute_replmetadata_version(dn, "description")
151         self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
152         self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
153
154     def test_db_lock(self):
155         basedn = self.samdb.get_default_basedn()
156         (r1, w1) = os.pipe()
157
158         pid = os.fork()
159         if pid == 0:
160             # In the child, close the main DB, re-open just one DB
161             del(self.samdb)
162             gc.collect()
163             self.samdb = SamDB(session_info=self.session,
164                                credentials=self.creds,
165                                lp=self.lp)
166
167             self.samdb.transaction_start()
168
169             self.samdb.add({
170                  "dn": "cn=test_db_lock_user,cn=users," + str(basedn),
171                  "objectclass": "user",
172             })
173
174             # Obtain a write lock
175             self.samdb.transaction_prepare_commit()
176             os.write(w1, b"added")
177             time.sleep(2)
178
179             # Drop the write lock
180             self.samdb.transaction_cancel()
181             os._exit(0)
182
183         self.assertEqual(os.read(r1, 5), b"added")
184
185         start = time.time()
186
187         # We need to hold this iterator open to hold the all-record lock.
188         res = self.samdb.search_iterator()
189
190         # This should take at least 2 seconds because the transaction
191         # has a write lock on one backend db open
192
193         # Release the locks
194         for l in res:
195             pass
196
197         end = time.time()
198         self.assertGreater(end - start, 1.9)
199
200         (got_pid, status) = os.waitpid(pid, 0)
201         self.assertEqual(got_pid, pid)
202         self.assertTrue(os.WIFEXITED(status))
203         self.assertEqual(os.WEXITSTATUS(status), 0)