The samba3sam test does not really need the extended_dn module
[kai/samba.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
24
25 import os
26 import ldb
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm
30 import samba.dcerpc.dom_sid
31 import samba.security
32 import samba.ndr
33
34 datadir = os.path.join(os.path.dirname(__file__), 
35                        "../../../../../testdata/samba3")
36
37 def read_datafile(filename):
38     return open(os.path.join(datadir, filename), 'r').read()
39
40 def ldb_debug(l, text):
41     print text
42
43
44 class MapBaseTestCase(TestCaseInTempDir):
45     """Base test case for mapping tests."""
46
47     def setup_modules(self, ldb, s3, s4):
48         ldb.add({"dn": "@MAP=samba3sam",
49                  "@FROM": s4.basedn,
50                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
51
52         ldb.add({"dn": "@MODULES",
53                  "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
54
55         ldb.add({"dn": "@PARTITION",
56             "partition": ["%s:%s" % (s4.basedn, s4.url), 
57                           "%s:%s" % (s3.basedn, s3.url)],
58             "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
59
60     def setUp(self):
61         super(MapBaseTestCase, self).setUp()
62
63         def make_dn(basedn, rdn):
64             return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
65
66         def make_s4dn(basedn, rdn):
67             return "%s,%s" % (rdn, basedn)
68
69         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
70         self.ldburl = "tdb://" + self.ldbfile
71
72         tempdir = self.tempdir
73
74         class Target:
75             """Simple helper class that contains data for a specific SAM 
76             connection."""
77             def __init__(self, file, basedn, dn):
78                 self.file = os.path.join(tempdir, file)
79                 self.url = "tdb://" + self.file
80                 self.basedn = basedn
81                 self.substvars = {"BASEDN": self.basedn}
82                 self.db = Ldb(lp=cmdline_loadparm)
83                 self._dn = dn
84
85             def dn(self, rdn):
86                 return self._dn(self.basedn, rdn)
87
88             def connect(self):
89                 return self.db.connect(self.url)
90
91             def setup_data(self, path):
92                 self.add_ldif(read_datafile(path))
93
94             def subst(self, text):
95                 return substitute_var(text, self.substvars)
96
97             def add_ldif(self, ldif):
98                 self.db.add_ldif(self.subst(ldif))
99
100             def modify_ldif(self, ldif):
101                 self.db.modify_ldif(self.subst(ldif))
102
103         self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
104         self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
105         self.templates = Target("templates.ldb", "cn=templates", None)
106
107         self.samba3.connect()
108         self.templates.connect()
109         self.samba4.connect()
110
111     def tearDown(self):
112         os.unlink(self.ldbfile)
113         os.unlink(self.samba3.file)
114         os.unlink(self.templates.file)
115         os.unlink(self.samba4.file)
116         super(MapBaseTestCase, self).tearDown()
117
118     def assertSidEquals(self, text, ndr_sid):
119         sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.dom_sid.dom_sid,
120                                         str(ndr_sid[0]))
121         sid_obj2 = samba.security.Sid(text)
122         # For now, this is the only way we can compare these since the 
123         # classes are in different places. Should reconcile that at some point.
124         self.assertEquals(sid_obj1.sid_rev_num, sid_obj2.sid_rev_num)
125         self.assertEquals(sid_obj1.num_auths, sid_obj2.num_auths)
126         # FIXME: self.assertEquals(sid_obj1.id_auth, sid_obj2.id_auth)
127         # FIXME: self.assertEquals(sid_obj1.sub_auths[:sid_obj1.num_auths], 
128         #                  sid_obj2.sub_auths[:sid_obj2.num_auths])
129
130
131 class Samba3SamTestCase(MapBaseTestCase):
132
133     def setUp(self):
134         super(Samba3SamTestCase, self).setUp()
135         ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
136         self.samba3.setup_data("samba3.ldif")
137         self.templates.setup_data("provision_samba3sam_templates.ldif")
138         ldif = read_datafile("provision_samba3sam.ldif")
139         ldb.add_ldif(self.samba4.subst(ldif))
140         self.setup_modules(ldb, self.samba3, self.samba4)
141         del ldb
142         self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
143
144     def test_search_non_mapped(self):
145         """Looking up by non-mapped attribute"""
146         msg = self.ldb.search(expression="(cn=Administrator)")
147         self.assertEquals(len(msg), 1)
148         self.assertEquals(msg[0]["cn"], "Administrator")
149
150     def test_search_non_mapped(self):
151         """Looking up by mapped attribute"""
152         msg = self.ldb.search(expression="(name=Backup Operators)")
153         self.assertEquals(len(msg), 1)
154         self.assertEquals(msg[0]["name"], "Backup Operators")
155
156     def test_old_name_of_renamed(self):
157         """Looking up by old name of renamed attribute"""
158         msg = self.ldb.search(expression="(displayName=Backup Operators)")
159         self.assertEquals(len(msg), 0)
160
161     def test_mapped_containing_sid(self):
162         """Looking up mapped entry containing SID"""
163         msg = self.ldb.search(expression="(cn=Replicator)")
164         self.assertEquals(len(msg), 1)
165         self.assertEquals(str(msg[0].dn), 
166                           "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
167         self.assertTrue("objectSid" in msg[0]) 
168         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
169                              msg[0]["objectSid"])
170         oc = set(msg[0]["objectClass"])
171         self.assertEquals(oc, set(["group"]))
172
173     def test_search_by_objclass(self):
174         """Looking up by objectClass"""
175         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
176         self.assertEquals(set([str(m.dn) for m in msg]), 
177                 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", 
178                      "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
179
180     def test_s3sam_modify(self):
181         # Adding a record that will be fallbacked
182         self.ldb.add({"dn": "cn=Foo", 
183             "foo": "bar", 
184             "blah": "Blie", 
185             "cn": "Foo", 
186             "showInAdvancedViewOnly": "TRUE"}
187             )
188
189         # Checking for existence of record (local)
190         # TODO: This record must be searched in the local database, which is 
191         # currently only supported for base searches
192         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
193         # TODO: Actually, this version should work as well but doesn't...
194         # 
195         #    
196         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", 
197                 scope=SCOPE_BASE, 
198                 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
199         self.assertEquals(len(msg), 1)
200         self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
201         self.assertEquals(msg[0]["foo"], "bar")
202         self.assertEquals(msg[0]["blah"], "Blie")
203
204         # Adding record that will be mapped
205         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
206                  "objectClass": "user",
207                  "unixName": "bin",
208                  "sambaUnicodePwd": "geheim",
209                  "cn": "Niemand"})
210
211         # Checking for existence of record (remote)
212         msg = self.ldb.search(expression="(unixName=bin)", 
213                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
214         self.assertEquals(len(msg), 1)
215         self.assertEquals(msg[0]["cn"], "Niemand")
216         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
217
218         # Checking for existence of record (local && remote)
219         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
220                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
221         self.assertEquals(len(msg), 1)           # TODO: should check with more records
222         self.assertEquals(msg[0]["cn"], "Niemand")
223         self.assertEquals(msg[0]["unixName"], "bin")
224         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
225
226         # Checking for existence of record (local || remote)
227         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
228                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
229         #print "got %d replies" % len(msg)
230         self.assertEquals(len(msg), 1)        # TODO: should check with more records
231         self.assertEquals(msg[0]["cn"], "Niemand")
232         self.assertEquals(msg[0]["unixName"], "bin")
233         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
234
235         # Checking for data in destination database
236         msg = self.samba3.db.search(expression="(cn=Niemand)")
237         self.assertTrue(len(msg) >= 1)
238         self.assertEquals(msg[0]["sambaSID"], 
239                 "S-1-5-21-4231626423-2410014848-2360679739-2001")
240         self.assertEquals(msg[0]["displayName"], "Niemand")
241
242         # Adding attribute...
243         self.ldb.modify_ldif("""
244 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
245 changetype: modify
246 add: description
247 description: Blah
248 """)
249
250         # Checking whether changes are still there...
251         msg = self.ldb.search(expression="(cn=Niemand)")
252         self.assertTrue(len(msg) >= 1)
253         self.assertEquals(msg[0]["cn"], "Niemand")
254         self.assertEquals(msg[0]["description"], "Blah")
255
256         # Modifying attribute...
257         self.ldb.modify_ldif("""
258 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
259 changetype: modify
260 replace: description
261 description: Blie
262 """)
263
264         # Checking whether changes are still there...
265         msg = self.ldb.search(expression="(cn=Niemand)")
266         self.assertTrue(len(msg) >= 1)
267         self.assertEquals(msg[0]["description"], "Blie")
268
269         # Deleting attribute...
270         self.ldb.modify_ldif("""
271 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
272 changetype: modify
273 delete: description
274 """)
275
276         # Checking whether changes are no longer there...
277         msg = self.ldb.search(expression="(cn=Niemand)")
278         self.assertTrue(len(msg) >= 1)
279         self.assertTrue(not "description" in msg[0])
280
281         # Renaming record...
282         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", 
283                         "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
284
285         # Checking whether DN has changed...
286         msg = self.ldb.search(expression="(cn=Niemand2)")
287         self.assertEquals(len(msg), 1)
288         self.assertEquals(str(msg[0].dn), 
289                           "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
290
291         # Deleting record...
292         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
293
294         # Checking whether record is gone...
295         msg = self.ldb.search(expression="(cn=Niemand2)")
296         self.assertEquals(len(msg), 0)
297
298
299 class MapTestCase(MapBaseTestCase):
300
301     def setUp(self):
302         super(MapTestCase, self).setUp()
303         ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
304         self.templates.setup_data("provision_samba3sam_templates.ldif")
305         ldif = read_datafile("provision_samba3sam.ldif")
306         ldb.add_ldif(self.samba4.subst(ldif))
307         self.setup_modules(ldb, self.samba3, self.samba4)
308         del ldb
309         self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
310
311     def test_map_search(self):
312         """Running search tests on mapped data."""
313         self.samba3.db.add({
314             "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
315             "objectclass": ["sambaDomain", "top"],
316             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
317             "sambaNextRid": "2000",
318             "sambaDomainName": "TESTS"
319             })
320
321         # Add a set of split records
322         self.ldb.add_ldif("""
323 dn: """+ self.samba4.dn("cn=X") + """
324 objectClass: user
325 cn: X
326 codePage: x
327 revision: x
328 dnsHostName: x
329 nextRid: y
330 lastLogon: x
331 description: x
332 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
333 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
334
335 """)
336
337         self.ldb.add({
338             "dn": self.samba4.dn("cn=Y"),
339             "objectClass": "top",
340             "cn": "Y",
341             "codePage": "x",
342             "revision": "x",
343             "dnsHostName": "y",
344             "nextRid": "y",
345             "lastLogon": "y",
346             "description": "x"})
347
348         self.ldb.add({
349             "dn": self.samba4.dn("cn=Z"),
350             "objectClass": "top",
351             "cn": "Z",
352             "codePage": "x",
353             "revision": "y",
354             "dnsHostName": "z",
355             "nextRid": "y",
356             "lastLogon": "z",
357             "description": "y"})
358
359         # Add a set of remote records
360
361         self.samba3.db.add({
362             "dn": self.samba3.dn("cn=A"),
363             "objectClass": "posixAccount",
364             "cn": "A",
365             "sambaNextRid": "x",
366             "sambaBadPasswordCount": "x", 
367             "sambaLogonTime": "x",
368             "description": "x",
369             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
370             "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
371
372         self.samba3.db.add({
373             "dn": self.samba3.dn("cn=B"),
374             "objectClass": "top",
375             "cn": "B",
376             "sambaNextRid": "x",
377             "sambaBadPasswordCount": "x",
378             "sambaLogonTime": "y",
379             "description": "x"})
380
381         self.samba3.db.add({
382             "dn": self.samba3.dn("cn=C"),
383             "objectClass": "top",
384             "cn": "C",
385             "sambaNextRid": "x",
386             "sambaBadPasswordCount": "y",
387             "sambaLogonTime": "z",
388             "description": "y"})
389
390         # Testing search by DN
391
392         # Search remote record by local DN
393         dn = self.samba4.dn("cn=A")
394         res = self.ldb.search(dn, scope=SCOPE_BASE, 
395                 attrs=["dnsHostName", "lastLogon"])
396         self.assertEquals(len(res), 1)
397         self.assertEquals(str(res[0].dn), dn)
398         self.assertTrue(not "dnsHostName" in res[0])
399         self.assertEquals(res[0]["lastLogon"], "x")
400
401         # Search remote record by remote DN
402         dn = self.samba3.dn("cn=A")
403         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
404                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
405         self.assertEquals(len(res), 1)
406         self.assertEquals(str(res[0].dn), dn)
407         self.assertTrue(not "dnsHostName" in res[0])
408         self.assertTrue(not "lastLogon" in res[0])
409         self.assertEquals(res[0]["sambaLogonTime"], "x")
410
411         # Search split record by local DN
412         dn = self.samba4.dn("cn=X")
413         res = self.ldb.search(dn, scope=SCOPE_BASE, 
414                 attrs=["dnsHostName", "lastLogon"])
415         self.assertEquals(len(res), 1)
416         self.assertEquals(str(res[0].dn), dn)
417         self.assertEquals(res[0]["dnsHostName"], "x")
418         self.assertEquals(res[0]["lastLogon"], "x")
419
420         # Search split record by remote DN
421         dn = self.samba3.dn("cn=X")
422         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
423                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
424         self.assertEquals(len(res), 1)
425         self.assertEquals(str(res[0].dn), dn)
426         self.assertTrue(not "dnsHostName" in res[0])
427         self.assertTrue(not "lastLogon" in res[0])
428         self.assertEquals(res[0]["sambaLogonTime"], "x")
429
430         # Testing search by attribute
431
432         # Search by ignored attribute
433         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, 
434                 attrs=["dnsHostName", "lastLogon"])
435         self.assertEquals(len(res), 2)
436         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
437         self.assertEquals(res[0]["dnsHostName"], "y")
438         self.assertEquals(res[0]["lastLogon"], "y")
439         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
440         self.assertEquals(res[1]["dnsHostName"], "x")
441         self.assertEquals(res[1]["lastLogon"], "x")
442
443         # Search by kept attribute
444         res = self.ldb.search(expression="(description=y)", 
445                 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
446         self.assertEquals(len(res), 2)
447         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
448         self.assertEquals(res[0]["dnsHostName"], "z")
449         self.assertEquals(res[0]["lastLogon"], "z")
450         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
451         self.assertTrue(not "dnsHostName" in res[1])
452         self.assertEquals(res[1]["lastLogon"], "z")
453
454         # Search by renamed attribute
455         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
456                               attrs=["dnsHostName", "lastLogon"])
457         self.assertEquals(len(res), 2)
458         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
459         self.assertTrue(not "dnsHostName" in res[0])
460         self.assertEquals(res[0]["lastLogon"], "y")
461         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
462         self.assertTrue(not "dnsHostName" in res[1])
463         self.assertEquals(res[1]["lastLogon"], "x")
464
465         # Search by converted attribute
466         # TODO:
467         #   Using the SID directly in the parse tree leads to conversion
468         #   errors, letting the search fail with no results.
469         #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
470         res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
471         self.assertEquals(len(res), 3)
472         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
473         self.assertEquals(res[0]["dnsHostName"], "x")
474         self.assertEquals(res[0]["lastLogon"], "x")
475         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", 
476                              res[0]["objectSid"])
477         self.assertTrue("objectSid" in res[0])
478         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
479         self.assertTrue(not "dnsHostName" in res[1])
480         self.assertEquals(res[1]["lastLogon"], "x")
481         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
482                              res[1]["objectSid"])
483         self.assertTrue("objectSid" in res[1])
484
485         # Search by generated attribute 
486         # In most cases, this even works when the mapping is missing
487         # a `convert_operator' by enumerating the remote db.
488         res = self.ldb.search(expression="(primaryGroupID=512)", 
489                            attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
490         self.assertEquals(len(res), 1)
491         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
492         self.assertTrue(not "dnsHostName" in res[0])
493         self.assertEquals(res[0]["lastLogon"], "x")
494         self.assertEquals(res[0]["primaryGroupID"], "512")
495
496         # TODO: There should actually be two results, A and X.  The
497         # primaryGroupID of X seems to get corrupted somewhere, and the
498         # objectSid isn't available during the generation of remote (!) data,
499         # which can be observed with the following search.  Also note that Xs
500         # objectSid seems to be fine in the previous search for objectSid... */
501         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
502         #print len(res) + " results found"
503         #for i in range(len(res)):
504         #    for (obj in res[i]) {
505         #        print obj + ": " + res[i][obj]
506         #    }
507         #    print "---"
508         #    
509
510         # Search by remote name of renamed attribute */
511         res = self.ldb.search(expression="(sambaBadPasswordCount=*)", 
512                               attrs=["dnsHostName", "lastLogon"])
513         self.assertEquals(len(res), 0)
514
515         # Search by objectClass
516         attrs = ["dnsHostName", "lastLogon", "objectClass"]
517         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
518         self.assertEquals(len(res), 2)
519         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
520         self.assertEquals(res[0]["dnsHostName"], "x")
521         self.assertEquals(res[0]["lastLogon"], "x")
522         self.assertEquals(res[0]["objectClass"][0], "user")
523         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
524         self.assertTrue(not "dnsHostName" in res[1])
525         self.assertEquals(res[1]["lastLogon"], "x")
526         self.assertEquals(res[1]["objectClass"][0], "user")
527
528         # Prove that the objectClass is actually used for the search
529         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
530                               attrs=attrs)
531         self.assertEquals(len(res), 3)
532         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
533         self.assertTrue(not "dnsHostName" in res[0])
534         self.assertEquals(res[0]["lastLogon"], "y")
535         self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
536         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
537         self.assertEquals(res[1]["dnsHostName"], "x")
538         self.assertEquals(res[1]["lastLogon"], "x")
539         self.assertEquals(res[1]["objectClass"][0], "user")
540         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
541         self.assertTrue(not "dnsHostName" in res[2])
542         self.assertEquals(res[2]["lastLogon"], "x")
543         self.assertEquals(res[2]["objectClass"][0], "user")
544
545         # Testing search by parse tree
546
547         # Search by conjunction of local attributes
548         res = self.ldb.search(expression="(&(codePage=x)(revision=x))", 
549                               attrs=["dnsHostName", "lastLogon"])
550         self.assertEquals(len(res), 2)
551         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
552         self.assertEquals(res[0]["dnsHostName"], "y")
553         self.assertEquals(res[0]["lastLogon"], "y")
554         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
555         self.assertEquals(res[1]["dnsHostName"], "x")
556         self.assertEquals(res[1]["lastLogon"], "x")
557
558         # Search by conjunction of remote attributes
559         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", 
560                               attrs=["dnsHostName", "lastLogon"])
561         self.assertEquals(len(res), 2)
562         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
563         self.assertEquals(res[0]["dnsHostName"], "x")
564         self.assertEquals(res[0]["lastLogon"], "x")
565         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
566         self.assertTrue(not "dnsHostName" in res[1])
567         self.assertEquals(res[1]["lastLogon"], "x")
568         
569         # Search by conjunction of local and remote attribute 
570         res = self.ldb.search(expression="(&(codePage=x)(description=x))", 
571                               attrs=["dnsHostName", "lastLogon"])
572         self.assertEquals(len(res), 2)
573         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
574         self.assertEquals(res[0]["dnsHostName"], "y")
575         self.assertEquals(res[0]["lastLogon"], "y")
576         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
577         self.assertEquals(res[1]["dnsHostName"], "x")
578         self.assertEquals(res[1]["lastLogon"], "x")
579
580         # Search by conjunction of local and remote attribute w/o match
581         attrs = ["dnsHostName", "lastLogon"]
582         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", 
583                               attrs=attrs)
584         self.assertEquals(len(res), 0)
585         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", 
586                               attrs=attrs)
587         self.assertEquals(len(res), 0)
588
589         # Search by disjunction of local attributes
590         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", 
591                               attrs=["dnsHostName", "lastLogon"])
592         self.assertEquals(len(res), 2)
593         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
594         self.assertEquals(res[0]["dnsHostName"], "y")
595         self.assertEquals(res[0]["lastLogon"], "y")
596         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
597         self.assertEquals(res[1]["dnsHostName"], "x")
598         self.assertEquals(res[1]["lastLogon"], "x")
599
600         # Search by disjunction of remote attributes
601         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", 
602                               attrs=["dnsHostName", "lastLogon"])
603         self.assertEquals(len(res), 3)
604         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
605         self.assertFalse("dnsHostName" in res[0])
606         self.assertEquals(res[0]["lastLogon"], "y")
607         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
608         self.assertEquals(res[1]["dnsHostName"], "x")
609         self.assertEquals(res[1]["lastLogon"], "x")
610         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
611         self.assertFalse("dnsHostName" in res[2])
612         self.assertEquals(res[2]["lastLogon"], "x")
613
614         # Search by disjunction of local and remote attribute
615         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", 
616                               attrs=["dnsHostName", "lastLogon"])
617         self.assertEquals(len(res), 3)
618         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
619         self.assertEquals(res[0]["dnsHostName"], "y")
620         self.assertEquals(res[0]["lastLogon"], "y")
621         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
622         self.assertFalse("dnsHostName" in res[1])
623         self.assertEquals(res[1]["lastLogon"], "y")
624         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
625         self.assertEquals(res[2]["dnsHostName"], "x")
626         self.assertEquals(res[2]["lastLogon"], "x")
627
628         # Search by disjunction of local and remote attribute w/o match
629         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", 
630                               attrs=["dnsHostName", "lastLogon"])
631         self.assertEquals(len(res), 0)
632
633         # Search by negated local attribute
634         res = self.ldb.search(expression="(!(revision=x))", 
635                               attrs=["dnsHostName", "lastLogon"])
636         self.assertEquals(len(res), 5)
637         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
638         self.assertTrue(not "dnsHostName" in res[0])
639         self.assertEquals(res[0]["lastLogon"], "y")
640         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
641         self.assertTrue(not "dnsHostName" in res[1])
642         self.assertEquals(res[1]["lastLogon"], "x")
643         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
644         self.assertEquals(res[2]["dnsHostName"], "z")
645         self.assertEquals(res[2]["lastLogon"], "z")
646         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
647         self.assertTrue(not "dnsHostName" in res[3])
648         self.assertEquals(res[3]["lastLogon"], "z")
649
650         # Search by negated remote attribute
651         res = self.ldb.search(expression="(!(description=x))", 
652                               attrs=["dnsHostName", "lastLogon"])
653         self.assertEquals(len(res), 3)
654         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
655         self.assertEquals(res[0]["dnsHostName"], "z")
656         self.assertEquals(res[0]["lastLogon"], "z")
657         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
658         self.assertTrue(not "dnsHostName" in res[1])
659         self.assertEquals(res[1]["lastLogon"], "z")
660
661         # Search by negated conjunction of local attributes
662         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", 
663                               attrs=["dnsHostName", "lastLogon"])
664         self.assertEquals(len(res), 5)
665         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
666         self.assertTrue(not "dnsHostName" in res[0])
667         self.assertEquals(res[0]["lastLogon"], "y")
668         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
669         self.assertTrue(not "dnsHostName" in res[1])
670         self.assertEquals(res[1]["lastLogon"], "x")
671         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
672         self.assertEquals(res[2]["dnsHostName"], "z")
673         self.assertEquals(res[2]["lastLogon"], "z")
674         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
675         self.assertTrue(not "dnsHostName" in res[3])
676         self.assertEquals(res[3]["lastLogon"], "z")
677
678         # Search by negated conjunction of remote attributes
679         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", 
680                               attrs=["dnsHostName", "lastLogon"])
681         self.assertEquals(len(res), 5)
682         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
683         self.assertEquals(res[0]["dnsHostName"], "y")
684         self.assertEquals(res[0]["lastLogon"], "y")
685         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
686         self.assertTrue(not "dnsHostName" in res[1])
687         self.assertEquals(res[1]["lastLogon"], "y")
688         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
689         self.assertEquals(res[2]["dnsHostName"], "z")
690         self.assertEquals(res[2]["lastLogon"], "z")
691         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
692         self.assertTrue(not "dnsHostName" in res[3])
693         self.assertEquals(res[3]["lastLogon"], "z")
694
695         # Search by negated conjunction of local and remote attribute
696         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", 
697                               attrs=["dnsHostName", "lastLogon"])
698         self.assertEquals(len(res), 5)
699         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
700         self.assertTrue(not "dnsHostName" in res[0])
701         self.assertEquals(res[0]["lastLogon"], "y")
702         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
703         self.assertTrue(not "dnsHostName" in res[1])
704         self.assertEquals(res[1]["lastLogon"], "x")
705         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
706         self.assertEquals(res[2]["dnsHostName"], "z")
707         self.assertEquals(res[2]["lastLogon"], "z")
708         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
709         self.assertTrue(not "dnsHostName" in res[3])
710         self.assertEquals(res[3]["lastLogon"], "z")
711
712         # Search by negated disjunction of local attributes
713         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", 
714                               attrs=["dnsHostName", "lastLogon"])
715         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
716         self.assertTrue(not "dnsHostName" in res[0])
717         self.assertEquals(res[0]["lastLogon"], "y")
718         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
719         self.assertTrue(not "dnsHostName" in res[1])
720         self.assertEquals(res[1]["lastLogon"], "x")
721         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
722         self.assertEquals(res[2]["dnsHostName"], "z")
723         self.assertEquals(res[2]["lastLogon"], "z")
724         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
725         self.assertTrue(not "dnsHostName" in res[3])
726         self.assertEquals(res[3]["lastLogon"], "z")
727
728         # Search by negated disjunction of remote attributes
729         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", 
730                               attrs=["dnsHostName", "lastLogon"])
731         self.assertEquals(len(res), 4)
732         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
733         self.assertEquals(res[0]["dnsHostName"], "y")
734         self.assertEquals(res[0]["lastLogon"], "y")
735         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
736         self.assertEquals(res[1]["dnsHostName"], "z")
737         self.assertEquals(res[1]["lastLogon"], "z")
738         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
739         self.assertTrue(not "dnsHostName" in res[2])
740         self.assertEquals(res[2]["lastLogon"], "z")
741
742         # Search by negated disjunction of local and remote attribute
743         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", 
744                               attrs=["dnsHostName", "lastLogon"])
745         self.assertEquals(len(res), 4)
746         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
747         self.assertTrue(not "dnsHostName" in res[0])
748         self.assertEquals(res[0]["lastLogon"], "x")
749         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
750         self.assertEquals(res[1]["dnsHostName"], "z")
751         self.assertEquals(res[1]["lastLogon"], "z")
752         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
753         self.assertTrue(not "dnsHostName" in res[2])
754         self.assertEquals(res[2]["lastLogon"], "z")
755
756         # Search by complex parse tree
757         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
758         self.assertEquals(len(res), 6)
759         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
760         self.assertTrue(not "dnsHostName" in res[0])
761         self.assertEquals(res[0]["lastLogon"], "y")
762         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
763         self.assertEquals(res[1]["dnsHostName"], "x")
764         self.assertEquals(res[1]["lastLogon"], "x")
765         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
766         self.assertTrue(not "dnsHostName" in res[2])
767         self.assertEquals(res[2]["lastLogon"], "x")
768         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
769         self.assertEquals(res[3]["dnsHostName"], "z")
770         self.assertEquals(res[3]["lastLogon"], "z")
771         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
772         self.assertTrue(not "dnsHostName" in res[4])
773         self.assertEquals(res[4]["lastLogon"], "z")
774
775         # Clean up
776         dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
777         for dn in dns:
778             self.ldb.delete(dn)
779
780     def test_map_modify_local(self):
781         """Modification of local records."""
782         # Add local record
783         dn = "cn=test,dc=idealx,dc=org"
784         self.ldb.add({"dn": dn, 
785                  "cn": "test",
786                  "foo": "bar",
787                  "revision": "1",
788                  "description": "test"})
789         # Check it's there
790         attrs = ["foo", "revision", "description"]
791         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
792         self.assertEquals(len(res), 1)
793         self.assertEquals(str(res[0].dn), dn)
794         self.assertEquals(res[0]["foo"], "bar")
795         self.assertEquals(res[0]["revision"], "1")
796         self.assertEquals(res[0]["description"], "test")
797         # Check it's not in the local db
798         res = self.samba4.db.search(expression="(cn=test)", 
799                                     scope=SCOPE_DEFAULT, attrs=attrs)
800         self.assertEquals(len(res), 0)
801         # Check it's not in the remote db
802         res = self.samba3.db.search(expression="(cn=test)", 
803                                     scope=SCOPE_DEFAULT, attrs=attrs)
804         self.assertEquals(len(res), 0)
805
806         # Modify local record
807         ldif = """
808 dn: """ + dn + """
809 replace: foo
810 foo: baz
811 replace: description
812 description: foo
813 """
814         self.ldb.modify_ldif(ldif)
815         # Check in local db
816         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
817         self.assertEquals(len(res), 1)
818         self.assertEquals(str(res[0].dn), dn)
819         self.assertEquals(res[0]["foo"], "baz")
820         self.assertEquals(res[0]["revision"], "1")
821         self.assertEquals(res[0]["description"], "foo")
822
823         # Rename local record
824         dn2 = "cn=toast,dc=idealx,dc=org"
825         self.ldb.rename(dn, dn2)
826         # Check in local db
827         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
828         self.assertEquals(len(res), 1)
829         self.assertEquals(str(res[0].dn), dn2)
830         self.assertEquals(res[0]["foo"], "baz")
831         self.assertEquals(res[0]["revision"], "1")
832         self.assertEquals(res[0]["description"], "foo")
833
834         # Delete local record
835         self.ldb.delete(dn2)
836         # Check it's gone
837         res = self.ldb.search(dn2, scope=SCOPE_BASE)
838         self.assertEquals(len(res), 0)
839
840     def test_map_modify_remote_remote(self):
841         """Modification of remote data of remote records"""
842         # Add remote record
843         dn = self.samba4.dn("cn=test")
844         dn2 = self.samba3.dn("cn=test")
845         self.samba3.db.add({"dn": dn2, 
846                    "cn": "test",
847                    "description": "foo",
848                    "sambaBadPasswordCount": "3",
849                    "sambaNextRid": "1001"})
850         # Check it's there
851         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
852                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
853         self.assertEquals(len(res), 1)
854         self.assertEquals(str(res[0].dn), dn2)
855         self.assertEquals(res[0]["description"], "foo")
856         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
857         self.assertEquals(res[0]["sambaNextRid"], "1001")
858         # Check in mapped db
859         attrs = ["description", "badPwdCount", "nextRid"]
860         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
861         self.assertEquals(len(res), 1)
862         self.assertEquals(str(res[0].dn), dn)
863         self.assertEquals(res[0]["description"], "foo")
864         self.assertEquals(res[0]["badPwdCount"], "3")
865         self.assertEquals(res[0]["nextRid"], "1001")
866         # Check in local db
867         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
868         self.assertEquals(len(res), 0)
869
870         # Modify remote data of remote record
871         ldif = """
872 dn: """ + dn + """
873 replace: description
874 description: test
875 replace: badPwdCount
876 badPwdCount: 4
877 """
878         self.ldb.modify_ldif(ldif)
879         # Check in mapped db
880         res = self.ldb.search(dn, scope=SCOPE_BASE, 
881                 attrs=["description", "badPwdCount", "nextRid"])
882         self.assertEquals(len(res), 1)
883         self.assertEquals(str(res[0].dn), dn)
884         self.assertEquals(res[0]["description"], "test")
885         self.assertEquals(res[0]["badPwdCount"], "4")
886         self.assertEquals(res[0]["nextRid"], "1001")
887         # Check in remote db
888         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
889                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
890         self.assertEquals(len(res), 1)
891         self.assertEquals(str(res[0].dn), dn2)
892         self.assertEquals(res[0]["description"], "test")
893         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
894         self.assertEquals(res[0]["sambaNextRid"], "1001")
895
896         # Rename remote record
897         dn2 = self.samba4.dn("cn=toast")
898         self.ldb.rename(dn, dn2)
899         # Check in mapped db
900         dn = dn2
901         res = self.ldb.search(dn, scope=SCOPE_BASE, 
902                 attrs=["description", "badPwdCount", "nextRid"])
903         self.assertEquals(len(res), 1)
904         self.assertEquals(str(res[0].dn), dn)
905         self.assertEquals(res[0]["description"], "test")
906         self.assertEquals(res[0]["badPwdCount"], "4")
907         self.assertEquals(res[0]["nextRid"], "1001")
908         # Check in remote db 
909         dn2 = self.samba3.dn("cn=toast")
910         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
911                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
912         self.assertEquals(len(res), 1)
913         self.assertEquals(str(res[0].dn), dn2)
914         self.assertEquals(res[0]["description"], "test")
915         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
916         self.assertEquals(res[0]["sambaNextRid"], "1001")
917
918         # Delete remote record
919         self.ldb.delete(dn)
920         # Check in mapped db that it's removed
921         res = self.ldb.search(dn, scope=SCOPE_BASE)
922         self.assertEquals(len(res), 0)
923         # Check in remote db
924         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
925         self.assertEquals(len(res), 0)
926
927     def test_map_modify_remote_local(self):
928         """Modification of local data of remote records"""
929         # Add remote record (same as before)
930         dn = self.samba4.dn("cn=test")
931         dn2 = self.samba3.dn("cn=test")
932         self.samba3.db.add({"dn": dn2, 
933                    "cn": "test",
934                    "description": "foo",
935                    "sambaBadPasswordCount": "3",
936                    "sambaNextRid": "1001"})
937
938         # Modify local data of remote record
939         ldif = """
940 dn: """ + dn + """
941 add: revision
942 revision: 1
943 replace: description
944 description: test
945
946 """
947         self.ldb.modify_ldif(ldif)
948         # Check in mapped db
949         attrs = ["revision", "description"]
950         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
951         self.assertEquals(len(res), 1)
952         self.assertEquals(str(res[0].dn), dn)
953         self.assertEquals(res[0]["description"], "test")
954         self.assertEquals(res[0]["revision"], "1")
955         # Check in remote db
956         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
957         self.assertEquals(len(res), 1)
958         self.assertEquals(str(res[0].dn), dn2)
959         self.assertEquals(res[0]["description"], "test")
960         self.assertTrue(not "revision" in res[0])
961         # Check in local db
962         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
963         self.assertEquals(len(res), 1)
964         self.assertEquals(str(res[0].dn), dn)
965         self.assertTrue(not "description" in res[0])
966         self.assertEquals(res[0]["revision"], "1")
967
968         # Delete (newly) split record
969         self.ldb.delete(dn)
970
971     def test_map_modify_split(self):
972         """Testing modification of split records"""
973         # Add split record
974         dn = self.samba4.dn("cn=test")
975         dn2 = self.samba3.dn("cn=test")
976         self.ldb.add({
977             "dn": dn,
978             "cn": "test",
979             "description": "foo",
980             "badPwdCount": "3",
981             "nextRid": "1001",
982             "revision": "1"})
983         # Check it's there
984         attrs = ["description", "badPwdCount", "nextRid", "revision"]
985         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
986         self.assertEquals(len(res), 1)
987         self.assertEquals(str(res[0].dn), dn)
988         self.assertEquals(res[0]["description"], "foo")
989         self.assertEquals(res[0]["badPwdCount"], "3")
990         self.assertEquals(res[0]["nextRid"], "1001")
991         self.assertEquals(res[0]["revision"], "1")
992         # Check in local db
993         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
994         self.assertEquals(len(res), 1)
995         self.assertEquals(str(res[0].dn), dn)
996         self.assertTrue(not "description" in res[0])
997         self.assertTrue(not "badPwdCount" in res[0])
998         self.assertTrue(not "nextRid" in res[0])
999         self.assertEquals(res[0]["revision"], "1")
1000         # Check in remote db
1001         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
1002                  "revision"]
1003         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1004         self.assertEquals(len(res), 1)
1005         self.assertEquals(str(res[0].dn), dn2)
1006         self.assertEquals(res[0]["description"], "foo")
1007         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
1008         self.assertEquals(res[0]["sambaNextRid"], "1001")
1009         self.assertTrue(not "revision" in res[0])
1010
1011         # Modify of split record
1012         ldif = """
1013 dn: """ + dn + """
1014 replace: description
1015 description: test
1016 replace: badPwdCount
1017 badPwdCount: 4
1018 replace: revision
1019 revision: 2
1020 """
1021         self.ldb.modify_ldif(ldif)
1022         # Check in mapped db
1023         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1024         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1025         self.assertEquals(len(res), 1)
1026         self.assertEquals(str(res[0].dn), dn)
1027         self.assertEquals(res[0]["description"], "test")
1028         self.assertEquals(res[0]["badPwdCount"], "4")
1029         self.assertEquals(res[0]["nextRid"], "1001")
1030         self.assertEquals(res[0]["revision"], "2")
1031         # Check in local db
1032         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1033         self.assertEquals(len(res), 1)
1034         self.assertEquals(str(res[0].dn), dn)
1035         self.assertTrue(not "description" in res[0])
1036         self.assertTrue(not "badPwdCount" in res[0])
1037         self.assertTrue(not "nextRid" in res[0])
1038         self.assertEquals(res[0]["revision"], "2")
1039         # Check in remote db
1040         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
1041                  "revision"]
1042         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1043         self.assertEquals(len(res), 1)
1044         self.assertEquals(str(res[0].dn), dn2)
1045         self.assertEquals(res[0]["description"], "test")
1046         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1047         self.assertEquals(res[0]["sambaNextRid"], "1001")
1048         self.assertTrue(not "revision" in res[0])
1049
1050         # Rename split record
1051         dn2 = self.samba4.dn("cn=toast")
1052         self.ldb.rename(dn, dn2)
1053         # Check in mapped db
1054         dn = dn2
1055         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1056         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1057         self.assertEquals(len(res), 1)
1058         self.assertEquals(str(res[0].dn), dn)
1059         self.assertEquals(res[0]["description"], "test")
1060         self.assertEquals(res[0]["badPwdCount"], "4")
1061         self.assertEquals(res[0]["nextRid"], "1001")
1062         self.assertEquals(res[0]["revision"], "2")
1063         # Check in local db
1064         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1065         self.assertEquals(len(res), 1)
1066         self.assertEquals(str(res[0].dn), dn)
1067         self.assertTrue(not "description" in res[0])
1068         self.assertTrue(not "badPwdCount" in res[0])
1069         self.assertTrue(not "nextRid" in res[0])
1070         self.assertEquals(res[0]["revision"], "2")
1071         # Check in remote db
1072         dn2 = self.samba3.dn("cn=toast")
1073         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
1074           attrs=["description", "sambaBadPasswordCount", "sambaNextRid", 
1075                  "revision"])
1076         self.assertEquals(len(res), 1)
1077         self.assertEquals(str(res[0].dn), dn2)
1078         self.assertEquals(res[0]["description"], "test")
1079         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1080         self.assertEquals(res[0]["sambaNextRid"], "1001")
1081         self.assertTrue(not "revision" in res[0])
1082
1083         # Delete split record
1084         self.ldb.delete(dn)
1085         # Check in mapped db
1086         res = self.ldb.search(dn, scope=SCOPE_BASE)
1087         self.assertEquals(len(res), 0)
1088         # Check in local db
1089         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1090         self.assertEquals(len(res), 0)
1091         # Check in remote db
1092         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1093         self.assertEquals(len(res), 0)