Simplify code, remove print statements.
[kai/samba.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
24
25 import os
26 import sys
27 import samba
28 import ldb
29 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
30 from samba import Ldb, substitute_var
31 from samba.tests import LdbTestCase, TestCaseInTempDir
32
33 datadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3")
34
35 def ldb_debug(l, text):
36     print text
37
38
39 class MapBaseTestCase(TestCaseInTempDir):
40     """Base test case for mapping tests."""
41
42     def setup_modules(self, ldb, s3, s4):
43         ldb.add({"dn": "@MAP=samba3sam",
44                  "@FROM": s4.basedn,
45                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
46
47         ldb.add({"dn": "@MODULES",
48                  "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
49
50         ldb.add({"dn": "@PARTITION",
51             "partition": [s4.basedn + ":" + s4.url, s3.basedn + ":" + s3.url],
52             "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
53
54
55     def setUp(self):
56         super(MapBaseTestCase, self).setUp()
57
58         def make_dn(basedn, rdn):
59             return rdn + ",sambaDomainName=TESTS," + basedn
60
61         def make_s4dn(basedn, rdn):
62             return rdn + "," + basedn
63
64         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
65         self.ldburl = "tdb://" + self.ldbfile
66
67         tempdir = self.tempdir
68
69         class Target:
70             """Simple helper class that contains data for a specific SAM connection."""
71             def __init__(self, file, basedn, dn):
72                 self.file = os.path.join(tempdir, file)
73                 self.url = "tdb://" + self.file
74                 self.basedn = basedn
75                 self.substvars = {"BASEDN": self.basedn}
76                 self.db = Ldb()
77                 self._dn = dn
78
79             def dn(self, rdn):
80                 return self._dn(rdn, self.basedn)
81
82             def connect(self):
83                 return self.db.connect(self.url)
84
85             def setup_data(self, path):
86                 ldif = open(os.path.join(datadir, path), 'r').read()
87                 self.add_ldif(ldif)
88
89             def subst(self, text):
90                 return substitute_var(text, self.substvars)
91
92             def add_ldif(self, ldif):
93                 self.db.add_ldif(self.subst(ldif))
94
95             def modify_ldif(self, ldif):
96                 self.db.modify_ldif(self.subst(ldif))
97
98         self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
99         self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
100         self.templates = Target("templates.ldb", "cn=templates", None)
101
102         self.samba3.connect()
103         self.templates.connect()
104         self.samba4.connect()
105
106     def tearDown(self):
107         os.unlink(self.ldbfile)
108         os.unlink(self.samba3.file)
109         os.unlink(self.templates.file)
110         os.unlink(self.samba4.file)
111         super(MapBaseTestCase, self).tearDown()
112
113
114 class Samba3SamTestCase(MapBaseTestCase):
115     def setUp(self):
116         super(Samba3SamTestCase, self).setUp()
117         ldb = Ldb(self.ldburl)
118         self.samba3.setup_data("samba3.ldif")
119         self.templates.setup_data("provision_samba3sam_templates.ldif")
120         ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
121         ldb.add_ldif(self.samba4.subst(ldif))
122         self.setup_modules(ldb, self.samba3, self.samba4)
123         self.ldb = Ldb(self.ldburl)
124
125     def test_search_non_mapped(self):
126         """Looking up by non-mapped attribute"""
127         msg = self.ldb.search(expression="(cn=Administrator)")
128         self.assertEquals(len(msg), 1)
129         self.assertEquals(msg[0]["cn"], "Administrator")
130
131     def test_search_non_mapped(self):
132         """Looking up by mapped attribute"""
133         msg = self.ldb.search(expression="(name=Backup Operators)")
134         self.assertEquals(len(msg), 1)
135         self.assertEquals(msg[0]["name"], "Backup Operators")
136
137     def test_old_name_of_renamed(self):
138         """Looking up by old name of renamed attribute"""
139         msg = self.ldb.search(expression="(displayName=Backup Operators)")
140         self.assertEquals(len(msg), 0)
141
142     def test_mapped_containing_sid(self):
143         """Looking up mapped entry containing SID"""
144         msg = self.ldb.search(expression="(cn=Replicator)")
145         self.assertEquals(len(msg), 1)
146         self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
147         self.assertTrue("objectSid" in msg[0]) 
148         # FIXME: NDR unpack msg[0]["objectSid"] before comparing:
149         # self.assertEquals(msg[0]["objectSid"], 
150         #                   "S-1-5-21-4231626423-2410014848-2360679739-552")
151         # Check mapping of objectClass
152         oc = set(msg[0]["objectClass"])
153         self.assertTrue(oc is not None)
154         self.assertEquals(oc, set(["group"]))
155
156     def test_search_by_objclass(self):
157         """Looking up by objectClass"""
158         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
159         self.assertEquals(set([str(m.dn) for m in msg]), 
160                 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
161
162     def test_s3sam_modify(self):
163         # Adding a record that will be fallbacked
164         self.ldb.add({"dn": "cn=Foo", 
165             "foo": "bar", 
166             "blah": "Blie", 
167             "cn": "Foo", 
168             "showInAdvancedViewOnly": "TRUE"}
169             )
170
171         # Checking for existence of record (local)
172         # TODO: This record must be searched in the local database, which is currently only supported for base searches
173         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
174         # TODO: Actually, this version should work as well but doesn't...
175         # 
176         #    
177         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", 
178                 scope=SCOPE_BASE, 
179                 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
180         self.assertEquals(len(msg), 1)
181         self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
182         self.assertEquals(msg[0]["foo"], "bar")
183         self.assertEquals(msg[0]["blah"], "Blie")
184
185         # Adding record that will be mapped
186         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
187                  "objectClass": "user",
188                  "unixName": "bin",
189                  "sambaUnicodePwd": "geheim",
190                  "cn": "Niemand"})
191
192         # Checking for existence of record (remote)
193         msg = self.ldb.search(expression="(unixName=bin)", 
194                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
195         self.assertEquals(len(msg), 1)
196         self.assertEquals(msg[0]["cn"], "Niemand")
197         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
198
199         # Checking for existence of record (local && remote)
200         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
201                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
202         self.assertEquals(len(msg), 1)           # TODO: should check with more records
203         self.assertEquals(msg[0]["cn"], "Niemand")
204         self.assertEquals(msg[0]["unixName"], "bin")
205         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
206
207         # Checking for existence of record (local || remote)
208         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
209                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
210         #print "got %d replies" % len(msg)
211         self.assertEquals(len(msg), 1)        # TODO: should check with more records
212         self.assertEquals(msg[0]["cn"], "Niemand")
213         self.assertEquals(msg[0]["unixName"], "bin")
214         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
215
216         # Checking for data in destination database
217         msg = self.samba3.db.search(expression="(cn=Niemand)")
218         self.assertTrue(len(msg) >= 1)
219         self.assertEquals(msg[0]["sambaSID"], 
220                 "S-1-5-21-4231626423-2410014848-2360679739-2001")
221         self.assertEquals(msg[0]["displayName"], "Niemand")
222
223         # Adding attribute...
224         self.ldb.modify_ldif("""
225 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
226 changetype: modify
227 add: description
228 description: Blah
229 """)
230
231         # Checking whether changes are still there...
232         msg = self.ldb.search(expression="(cn=Niemand)")
233         self.assertTrue(len(msg) >= 1)
234         self.assertEquals(msg[0]["cn"], "Niemand")
235         self.assertEquals(msg[0]["description"], "Blah")
236
237         # Modifying attribute...
238         self.ldb.modify_ldif("""
239 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
240 changetype: modify
241 replace: description
242 description: Blie
243 """)
244
245         # Checking whether changes are still there...
246         msg = self.ldb.search(expression="(cn=Niemand)")
247         self.assertTrue(len(msg) >= 1)
248         self.assertEquals(msg[0]["description"], "Blie")
249
250         # Deleting attribute...
251         self.ldb.modify_ldif("""
252 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
253 changetype: modify
254 delete: description
255 """)
256
257         # Checking whether changes are no longer there...
258         msg = self.ldb.search(expression="(cn=Niemand)")
259         self.assertTrue(len(msg) >= 1)
260         self.assertTrue(not "description" in msg[0])
261
262         # Renaming record...
263         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", 
264                         "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
265
266         # Checking whether DN has changed...
267         msg = self.ldb.search(expression="(cn=Niemand2)")
268         self.assertEquals(len(msg), 1)
269         self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
270
271         # Deleting record...
272         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
273
274         # Checking whether record is gone...
275         msg = self.ldb.search(expression="(cn=Niemand2)")
276         self.assertEquals(len(msg), 0)
277
278
279
280 class MapTestCase(MapBaseTestCase):
281     def setUp(self):
282         super(MapTestCase, self).setUp()
283         ldb = Ldb(self.ldburl)
284         self.samba3.setup_data("samba3.ldif")
285         self.templates.setup_data("provision_samba3sam_templates.ldif")
286         ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
287         ldb.add_ldif(self.samba4.subst(ldif))
288         self.setup_modules(ldb, self.samba3, self.samba4)
289         self.ldb = Ldb(self.ldburl)
290
291     def test_map_search(self):
292         """Running search tests on mapped data."""
293         ldif = """
294 dn: sambaDomainName=TESTS,""" + self.samba3.basedn + """
295 objectclass: sambaDomain
296 objectclass: top
297 sambaSID: S-1-5-21-4231626423-2410014848-2360679739
298 sambaNextRid: 2000
299 sambaDomainName: TESTS"""
300         self.samba3.add_ldif(ldif)
301
302         # Add a set of split records
303         ldif = """
304 dn: """ + self.samba4.dn("cn=X") + """
305 objectClass: user
306 cn: X
307 codePage: x
308 revision: x
309 dnsHostName: x
310 nextRid: y
311 lastLogon: x
312 description: x
313 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
314 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
315 """
316
317         self.ldb.add_ldif(self.samba4.subst(ldif))
318
319         ldif = """
320 dn: """ + self.samba4.dn("cn=Y") + """
321 objectClass: top
322 cn: Y
323 codePage: x
324 revision: x
325 dnsHostName: y
326 nextRid: y
327 lastLogon: y
328 description: x
329 """
330         self.ldb.add_ldif(self.samba4.subst(ldif))
331
332         ldif = """
333 dn: """ + self.samba4.dn("cn=Z") + """
334 objectClass: top
335 cn: Z
336 codePage: x
337 revision: y
338 dnsHostName: z
339 nextRid: y
340 lastLogon: z
341 description: y
342 """
343
344         self.ldb.add_ldif(self.samba4.subst(ldif))
345
346         # Add a set of remote records
347
348         ldif = """
349 dn: """ + self.samba3.dn("cn=A") + """
350 objectClass: posixAccount
351 cn: A
352 sambaNextRid: x
353 sambaBadPasswordCount: x
354 sambaLogonTime: x
355 description: x
356 sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552
357 sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512
358
359 dn: """ + self.samba3.dn("cn=B") + """
360 objectClass: top
361 cn:B
362 sambaNextRid: x
363 sambaBadPasswordCount: x
364 sambaLogonTime: y
365 description: x
366
367 dn: """ + self.samba3.dn("cn=C") + """
368 objectClass: top
369 cn: C
370 sambaNextRid: x
371 sambaBadPasswordCount: y
372 sambaLogonTime: z
373 description: y
374 """
375         self.samba3.add_ldif(ldif)
376
377         # Testing search by DN
378
379         # Search remote record by local DN
380         dn = self.samba4.dn("cn=A")
381         res = self.ldb.search(dn, scope=SCOPE_BASE, 
382                 attrs=["dnsHostName", "lastLogon"])
383         self.assertEquals(len(res), 1)
384         self.assertEquals(str(str(res[0].dn)), dn)
385         self.assertTrue(not "dnsHostName" in res[0])
386         self.assertEquals(res[0]["lastLogon"], "x")
387
388         # Search remote record by remote DN
389         dn = self.samba3.dn("cn=A")
390         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
391                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
392         self.assertEquals(len(res), 1)
393         self.assertEquals(str(str(res[0].dn)), dn)
394         self.assertTrue(not "dnsHostName" in res[0])
395         self.assertTrue(not "lastLogon" in res[0])
396         self.assertEquals(res[0]["sambaLogonTime"], "x")
397
398         # Search split record by local DN
399         dn = self.samba4.dn("cn=X")
400         res = self.ldb.search(dn, scope=SCOPE_BASE, 
401                 attrs=["dnsHostName", "lastLogon"])
402         self.assertEquals(len(res), 1)
403         self.assertEquals(str(str(res[0].dn)), dn)
404         self.assertEquals(res[0]["dnsHostName"], "x")
405         self.assertEquals(res[0]["lastLogon"], "x")
406
407         # Search split record by remote DN
408         dn = self.samba3.dn("cn=X")
409         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
410                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
411         self.assertEquals(len(res), 1)
412         self.assertEquals(str(str(res[0].dn)), dn)
413         self.assertTrue(not "dnsHostName" in res[0])
414         self.assertTrue(not "lastLogon" in res[0])
415         self.assertEquals(res[0]["sambaLogonTime"], "x")
416
417         # Testing search by attribute
418
419         # Search by ignored attribute
420         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, 
421                 attrs=["dnsHostName", "lastLogon"])
422         self.assertEquals(len(res), 2)
423         self.assertEquals(str(str(res[0].dn)), self.samba4.dn("cn=Y"))
424         self.assertEquals(res[0]["dnsHostName"], "y")
425         self.assertEquals(res[0]["lastLogon"], "y")
426         self.assertEquals(str(str(res[1].dn)), self.samba4.dn("cn=X"))
427         self.assertEquals(res[1]["dnsHostName"], "x")
428         self.assertEquals(res[1]["lastLogon"], "x")
429
430         # Search by kept attribute
431         res = self.ldb.search(expression="(description=y)", 
432                 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
433         self.assertEquals(len(res), 2)
434         self.assertEquals(str(str(res[0].dn)), self.samba4.dn("cn=Z"))
435         self.assertEquals(res[0]["dnsHostName"], "z")
436         self.assertEquals(res[0]["lastLogon"], "z")
437         self.assertEquals(str(str(res[1].dn)), self.samba4.dn("cn=C"))
438         self.assertTrue(not "dnsHostName" in res[1])
439         self.assertEquals(res[1]["lastLogon"], "z")
440
441         # Search by renamed attribute
442         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
443         self.assertEquals(len(res), 2)
444         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
445         self.assertTrue(not "dnsHostName" in res[0])
446         self.assertEquals(res[0]["lastLogon"], "y")
447         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
448         self.assertTrue(not "dnsHostName" in res[1])
449         self.assertEquals(res[1]["lastLogon"], "x")
450
451         # Search by converted attribute
452         # TODO:
453         #   Using the SID directly in the parse tree leads to conversion
454         #   errors, letting the search fail with no results.
455         #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
456         res = self.ldb.search(expression="(objectSid=*)", attrs=["dnsHostName", "lastLogon", "objectSid"])
457         self.assertEquals(len(res), 3)
458         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
459         self.assertEquals(res[0]["dnsHostName"], "x")
460         self.assertEquals(res[0]["lastLogon"], "x")
461         self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
462         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
463         self.assertTrue(not "dnsHostName" in res[1])
464         self.assertEquals(res[1]["lastLogon"], "x")
465         self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
466
467         # Search by generated attribute 
468         # In most cases, this even works when the mapping is missing
469         # a `convert_operator' by enumerating the remote db.
470         res = self.ldb.search(expression="(primaryGroupID=512)", attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
471         self.assertEquals(len(res), 1)
472         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
473         self.assertTrue(not "dnsHostName" in res[0])
474         self.assertEquals(res[0]["lastLogon"], "x")
475         self.assertEquals(res[0]["primaryGroupID"], "512")
476
477         # TODO: There should actually be two results, A and X.  The
478         # primaryGroupID of X seems to get corrupted somewhere, and the
479         # objectSid isn't available during the generation of remote (!) data,
480         # which can be observed with the following search.  Also note that Xs
481         # objectSid seems to be fine in the previous search for objectSid... */
482         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
483         #print len(res) + " results found"
484         #for i in range(len(res)):
485         #    for (obj in res[i]) {
486         #        print obj + ": " + res[i][obj]
487         #    }
488         #    print "---"
489         #    
490
491         # Search by remote name of renamed attribute */
492         res = self.ldb.search(expression="(sambaBadPasswordCount=*)", attrs=["dnsHostName", "lastLogon"])
493         self.assertEquals(len(res), 0)
494
495         # Search by objectClass
496         attrs = ["dnsHostName", "lastLogon", "objectClass"]
497         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
498         self.assertEquals(len(res), 2)
499         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
500         self.assertEquals(res[0]["dnsHostName"], "x")
501         self.assertEquals(res[0]["lastLogon"], "x")
502         self.assertTrue(res[0]["objectClass"] is not None)
503         self.assertEquals(res[0]["objectClass"][0], "user")
504         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
505         self.assertTrue(not "dnsHostName" in res[1])
506         self.assertEquals(res[1]["lastLogon"], "x")
507         self.assertTrue(res[1]["objectClass"] is not None)
508         self.assertEquals(res[1]["objectClass"][0], "user")
509
510         # Prove that the objectClass is actually used for the search
511         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs)
512         self.assertEquals(len(res), 3)
513         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
514         self.assertTrue(not "dnsHostName" in res[0])
515         self.assertEquals(res[0]["lastLogon"], "y")
516         self.assertTrue(res[0]["objectClass"] is not None)
517         self.assertEquals(set(res[0]["objectClass"]), set(["user"]))
518         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
519         self.assertEquals(res[1]["dnsHostName"], "x")
520         self.assertEquals(res[1]["lastLogon"], "x")
521         self.assertTrue(res[1]["objectClass"] is not None)
522         self.assertEquals(res[1]["objectClass"][0], "user")
523         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
524         self.assertTrue(not "dnsHostName" in res[2])
525         self.assertEquals(res[2]["lastLogon"], "x")
526         self.assertTrue(res[2]["objectClass"] is not None)
527         self.assertEquals(res[2]["objectClass"][0], "user")
528
529         # Testing search by parse tree
530
531         # Search by conjunction of local attributes
532         res = self.ldb.search(expression="(&(codePage=x)(revision=x))", attrs=["dnsHostName", "lastLogon"])
533         self.assertEquals(len(res), 2)
534         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
535         self.assertEquals(res[0]["dnsHostName"], "y")
536         self.assertEquals(res[0]["lastLogon"], "y")
537         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
538         self.assertEquals(res[1]["dnsHostName"], "x")
539         self.assertEquals(res[1]["lastLogon"], "x")
540
541         # Search by conjunction of remote attributes
542         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=["dnsHostName", "lastLogon"])
543         self.assertEquals(len(res), 2)
544         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
545         self.assertEquals(res[0]["dnsHostName"], "x")
546         self.assertEquals(res[0]["lastLogon"], "x")
547         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
548         self.assertTrue(not "dnsHostName" in res[1])
549         self.assertEquals(res[1]["lastLogon"], "x")
550         
551         # Search by conjunction of local and remote attribute 
552         res = self.ldb.search(expression="(&(codePage=x)(description=x))", attrs=["dnsHostName", "lastLogon"])
553         self.assertEquals(len(res), 2)
554         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
555         self.assertEquals(res[0]["dnsHostName"], "y")
556         self.assertEquals(res[0]["lastLogon"], "y")
557         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
558         self.assertEquals(res[1]["dnsHostName"], "x")
559         self.assertEquals(res[1]["lastLogon"], "x")
560
561         # Search by conjunction of local and remote attribute w/o match
562         attrs = ["dnsHostName", "lastLogon"]
563         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs)
564         self.assertEquals(len(res), 0)
565         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs)
566         self.assertEquals(len(res), 0)
567
568         # Search by disjunction of local attributes
569         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=["dnsHostName", "lastLogon"])
570         self.assertEquals(len(res), 2)
571         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
572         self.assertEquals(res[0]["dnsHostName"], "y")
573         self.assertEquals(res[0]["lastLogon"], "y")
574         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
575         self.assertEquals(res[1]["dnsHostName"], "x")
576         self.assertEquals(res[1]["lastLogon"], "x")
577
578         # Search by disjunction of remote attributes
579         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=["dnsHostName", "lastLogon"])
580         self.assertEquals(len(res), 3)
581         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
582         self.assertTrue("dnsHostName" in res[0])
583         self.assertEquals(res[0]["lastLogon"], "y")
584         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
585         self.assertEquals(res[1]["dnsHostName"], "x")
586         self.assertEquals(res[1]["lastLogon"], "x")
587         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
588         self.assertTrue("dnsHostName" in res[2])
589         self.assertEquals(res[2]["lastLogon"], "x")
590
591         # Search by disjunction of local and remote attribute
592         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=["dnsHostName", "lastLogon"])
593         self.assertEquals(len(res), 3)
594         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
595         self.assertEquals(res[0]["dnsHostName"], "y")
596         self.assertEquals(res[0]["lastLogon"], "y")
597         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
598         self.assertTrue("dnsHostName" in res[1])
599         self.assertEquals(res[1]["lastLogon"], "y")
600         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
601         self.assertEquals(res[2]["dnsHostName"], "x")
602         self.assertEquals(res[2]["lastLogon"], "x")
603
604         # Search by disjunction of local and remote attribute w/o match
605         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=["dnsHostName", "lastLogon"])
606         self.assertEquals(len(res), 0)
607
608         # Search by negated local attribute
609         res = self.ldb.search(expression="(!(revision=x))", attrs=["dnsHostName", "lastLogon"])
610         self.assertEquals(len(res), 5)
611         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
612         self.assertTrue(not "dnsHostName" in res[0])
613         self.assertEquals(res[0]["lastLogon"], "y")
614         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
615         self.assertTrue(not "dnsHostName" in res[1])
616         self.assertEquals(res[1]["lastLogon"], "x")
617         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
618         self.assertEquals(res[2]["dnsHostName"], "z")
619         self.assertEquals(res[2]["lastLogon"], "z")
620         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
621         self.assertTrue(not "dnsHostName" in res[3])
622         self.assertEquals(res[3]["lastLogon"], "z")
623
624         # Search by negated remote attribute
625         res = self.ldb.search(expression="(!(description=x))", attrs=["dnsHostName", "lastLogon"])
626         self.assertEquals(len(res), 3)
627         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
628         self.assertEquals(res[0]["dnsHostName"], "z")
629         self.assertEquals(res[0]["lastLogon"], "z")
630         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
631         self.assertTrue(not "dnsHostName" in res[1])
632         self.assertEquals(res[1]["lastLogon"], "z")
633
634         # Search by negated conjunction of local attributes
635         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=["dnsHostName", "lastLogon"])
636         self.assertEquals(len(res), 5)
637         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
638         self.assertTrue(not "dnsHostName" in res[0])
639         self.assertEquals(res[0]["lastLogon"], "y")
640         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
641         self.assertTrue(not "dnsHostName" in res[1])
642         self.assertEquals(res[1]["lastLogon"], "x")
643         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
644         self.assertEquals(res[2]["dnsHostName"], "z")
645         self.assertEquals(res[2]["lastLogon"], "z")
646         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
647         self.assertTrue(not "dnsHostName" in res[3])
648         self.assertEquals(res[3]["lastLogon"], "z")
649
650         # Search by negated conjunction of remote attributes
651         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=["dnsHostName", "lastLogon"])
652         self.assertEquals(len(res), 5)
653         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
654         self.assertEquals(res[0]["dnsHostName"], "y")
655         self.assertEquals(res[0]["lastLogon"], "y")
656         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
657         self.assertTrue(not "dnsHostName" in res[1])
658         self.assertEquals(res[1]["lastLogon"], "y")
659         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
660         self.assertEquals(res[2]["dnsHostName"], "z")
661         self.assertEquals(res[2]["lastLogon"], "z")
662         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
663         self.assertTrue(not "dnsHostName" in res[3])
664         self.assertEquals(res[3]["lastLogon"], "z")
665
666         # Search by negated conjunction of local and remote attribute
667         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=["dnsHostName", "lastLogon"])
668         self.assertEquals(len(res), 5)
669         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
670         self.assertTrue(not "dnsHostName" in res[0])
671         self.assertEquals(res[0]["lastLogon"], "y")
672         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
673         self.assertTrue(not "dnsHostName" in res[1])
674         self.assertEquals(res[1]["lastLogon"], "x")
675         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
676         self.assertEquals(res[2]["dnsHostName"], "z")
677         self.assertEquals(res[2]["lastLogon"], "z")
678         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
679         self.assertTrue(not "dnsHostName" in res[3])
680         self.assertEquals(res[3]["lastLogon"], "z")
681
682         # Search by negated disjunction of local attributes
683         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=["dnsHostName", "lastLogon"])
684         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
685         self.assertTrue(not "dnsHostName" in res[0])
686         self.assertEquals(res[0]["lastLogon"], "y")
687         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
688         self.assertTrue(not "dnsHostName" in res[1])
689         self.assertEquals(res[1]["lastLogon"], "x")
690         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
691         self.assertEquals(res[2]["dnsHostName"], "z")
692         self.assertEquals(res[2]["lastLogon"], "z")
693         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
694         self.assertTrue(not "dnsHostName" in res[3])
695         self.assertEquals(res[3]["lastLogon"], "z")
696
697         # Search by negated disjunction of remote attributes
698         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=["dnsHostName", "lastLogon"])
699         self.assertEquals(len(res), 4)
700         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
701         self.assertEquals(res[0]["dnsHostName"], "y")
702         self.assertEquals(res[0]["lastLogon"], "y")
703         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
704         self.assertEquals(res[1]["dnsHostName"], "z")
705         self.assertEquals(res[1]["lastLogon"], "z")
706         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
707         self.assertTrue(not "dnsHostName" in res[2])
708         self.assertEquals(res[2]["lastLogon"], "z")
709
710         # Search by negated disjunction of local and remote attribute
711         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", 
712                               attrs=["dnsHostName", "lastLogon"])
713         self.assertEquals(len(res), 4)
714         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
715         self.assertTrue(not "dnsHostName" in res[0])
716         self.assertEquals(res[0]["lastLogon"], "x")
717         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
718         self.assertEquals(res[1]["dnsHostName"], "z")
719         self.assertEquals(res[1]["lastLogon"], "z")
720         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
721         self.assertTrue(not "dnsHostName" in res[2])
722         self.assertEquals(res[2]["lastLogon"], "z")
723
724         # Search by complex parse tree
725         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
726         self.assertEquals(len(res), 6)
727         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
728         self.assertTrue(not "dnsHostName" in res[0])
729         self.assertEquals(res[0]["lastLogon"], "y")
730         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
731         self.assertEquals(res[1]["dnsHostName"], "x")
732         self.assertEquals(res[1]["lastLogon"], "x")
733         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
734         self.assertTrue(not "dnsHostName" in res[2])
735         self.assertEquals(res[2]["lastLogon"], "x")
736         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
737         self.assertEquals(res[3]["dnsHostName"], "z")
738         self.assertEquals(res[3]["lastLogon"], "z")
739         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
740         self.assertTrue(not "dnsHostName" in res[4])
741         self.assertEquals(res[4]["lastLogon"], "z")
742
743         # Clean up
744         dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
745         for dn in dns:
746             self.ldb.delete(dn)
747
748     def test_map_modify_local(self):
749         """Modification of local records."""
750         # Add local record
751         dn = "cn=test,dc=idealx,dc=org"
752         self.ldb.add({"dn": dn, 
753                  "cn": "test",
754                  "foo": "bar",
755                  "revision": "1",
756                  "description": "test"})
757         # Check it's there
758         attrs = ["foo", "revision", "description"]
759         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
760         self.assertEquals(len(res), 1)
761         self.assertEquals(str(res[0].dn), dn)
762         self.assertEquals(res[0]["foo"], "bar")
763         self.assertEquals(res[0]["revision"], "1")
764         self.assertEquals(res[0]["description"], "test")
765         # Check it's not in the local db
766         res = self.samba4.db.search(expression="(cn=test)", 
767                                     scope=SCOPE_DEFAULT, attrs=attrs)
768         self.assertEquals(len(res), 0)
769         # Check it's not in the remote db
770         res = self.samba3.db.search(expression="(cn=test)", 
771                                     scope=SCOPE_DEFAULT, attrs=attrs)
772         self.assertEquals(len(res), 0)
773
774         # Modify local record
775         ldif = """
776 dn: """ + dn + """
777 replace: foo
778 foo: baz
779 replace: description
780 description: foo
781 """
782         self.ldb.modify_ldif(ldif)
783         # Check in local db
784         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
785         self.assertEquals(len(res), 1)
786         self.assertEquals(str(res[0].dn), dn)
787         self.assertEquals(res[0]["foo"], "baz")
788         self.assertEquals(res[0]["revision"], "1")
789         self.assertEquals(res[0]["description"], "foo")
790
791         # Rename local record
792         dn2 = "cn=toast,dc=idealx,dc=org"
793         self.ldb.rename(dn, dn2)
794         # Check in local db
795         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
796         self.assertEquals(len(res), 1)
797         self.assertEquals(str(res[0].dn), dn2)
798         self.assertEquals(res[0]["foo"], "baz")
799         self.assertEquals(res[0]["revision"], "1")
800         self.assertEquals(res[0]["description"], "foo")
801
802         # Delete local record
803         self.ldb.delete(dn2)
804         # Check it's gone
805         res = self.ldb.search(dn2, scope=SCOPE_BASE)
806         self.assertEquals(len(res), 0)
807
808     def test_map_modify_remote_remote(self):
809         """Modification of remote data of remote records"""
810         # Add remote record
811         dn = self.samba4.dn("cn=test")
812         dn2 = self.samba3.dn("cn=test")
813         self.samba3.db.add({"dn": dn2, 
814                    "cn": "test",
815                    "description": "foo",
816                    "sambaBadPasswordCount": "3",
817                    "sambaNextRid": "1001"})
818         # Check it's there
819         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
820                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
821         self.assertEquals(len(res), 1)
822         self.assertEquals(str(res[0].dn), dn2)
823         self.assertEquals(res[0]["description"], "foo")
824         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
825         self.assertEquals(res[0]["sambaNextRid"], "1001")
826         # Check in mapped db
827         attrs = ["description", "badPwdCount", "nextRid"]
828         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
829         self.assertEquals(len(res), 1)
830         self.assertEquals(str(res[0].dn), dn)
831         self.assertEquals(res[0]["description"], "foo")
832         self.assertEquals(res[0]["badPwdCount"], "3")
833         self.assertEquals(res[0]["nextRid"], "1001")
834         # Check in local db
835         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
836         self.assertEquals(len(res), 0)
837
838         # Modify remote data of remote record
839         ldif = """
840 dn: """ + dn + """
841 replace: description
842 description: test
843 replace: badPwdCount
844 badPwdCount: 4
845 """
846         self.ldb.modify_ldif(ldif)
847         # Check in mapped db
848         res = self.ldb.search(dn, scope=SCOPE_BASE, 
849                 attrs=["description", "badPwdCount", "nextRid"])
850         self.assertEquals(len(res), 1)
851         self.assertEquals(str(res[0].dn), dn)
852         self.assertEquals(res[0]["description"], "test")
853         self.assertEquals(res[0]["badPwdCount"], "4")
854         self.assertEquals(res[0]["nextRid"], "1001")
855         # Check in remote db
856         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
857                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
858         self.assertEquals(len(res), 1)
859         self.assertEquals(str(res[0].dn), dn2)
860         self.assertEquals(res[0]["description"], "test")
861         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
862         self.assertEquals(res[0]["sambaNextRid"], "1001")
863
864         # Rename remote record
865         dn2 = self.samba4.dn("cn=toast")
866         self.ldb.rename(dn, dn2)
867         # Check in mapped db
868         dn = dn2
869         res = self.ldb.search(dn, scope=SCOPE_BASE, 
870                 attrs=["description", "badPwdCount", "nextRid"])
871         self.assertEquals(len(res), 1)
872         self.assertEquals(str(res[0].dn), dn)
873         self.assertEquals(res[0]["description"], "test")
874         self.assertEquals(res[0]["badPwdCount"], "4")
875         self.assertEquals(res[0]["nextRid"], "1001")
876         # Check in remote db 
877         dn2 = self.samba3.dn("cn=toast")
878         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
879                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
880         self.assertEquals(len(res), 1)
881         self.assertEquals(str(res[0].dn), dn2)
882         self.assertEquals(res[0]["description"], "test")
883         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
884         self.assertEquals(res[0]["sambaNextRid"], "1001")
885
886         # Delete remote record
887         self.ldb.delete(dn)
888         # Check in mapped db that it's removed
889         res = self.ldb.search(dn, scope=SCOPE_BASE)
890         self.assertEquals(len(res), 0)
891         # Check in remote db
892         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
893         self.assertEquals(len(res), 0)
894
895     def test_map_modify_remote_local(self):
896         """Modification of local data of remote records"""
897         # Add remote record (same as before)
898         dn = self.samba4.dn("cn=test")
899         dn2 = self.samba3.dn("cn=test")
900         self.samba3.db.add({"dn": dn2, 
901                    "cn": "test",
902                    "description": "foo",
903                    "sambaBadPasswordCount": "3",
904                    "sambaNextRid": "1001"})
905
906         # Modify local data of remote record
907         ldif = """
908 dn: """ + dn + """
909 add: revision
910 revision: 1
911 replace: description
912 description: test
913
914 """
915         self.ldb.modify_ldif(ldif)
916         # Check in mapped db
917         attrs = ["revision", "description"]
918         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
919         self.assertEquals(len(res), 1)
920         self.assertEquals(str(res[0].dn), dn)
921         self.assertEquals(res[0]["description"], "test")
922         self.assertEquals(res[0]["revision"], "1")
923         # Check in remote db
924         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
925         self.assertEquals(len(res), 1)
926         self.assertEquals(str(res[0].dn), dn2)
927         self.assertEquals(res[0]["description"], "test")
928         self.assertTrue(not "revision" in res[0])
929         # Check in local db
930         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
931         self.assertEquals(len(res), 1)
932         self.assertEquals(str(res[0].dn), dn)
933         self.assertTrue(not "description" in res[0])
934         self.assertEquals(res[0]["revision"], "1")
935
936         # Delete (newly) split record
937         self.ldb.delete(dn)
938
939     def test_map_modify_split(self):
940         """Testing modification of split records"""
941         # Add split record
942         dn = self.samba4.dn("cn=test")
943         dn2 = self.samba3.dn("cn=test")
944         self.ldb.add({
945             "dn": dn,
946             "cn": "test",
947             "description": "foo",
948             "badPwdCount": "3",
949             "nextRid": "1001",
950             "revision": "1"})
951         # Check it's there
952         attrs = ["description", "badPwdCount", "nextRid", "revision"]
953         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
954         self.assertEquals(len(res), 1)
955         self.assertEquals(str(res[0].dn), dn)
956         self.assertEquals(res[0]["description"], "foo")
957         self.assertEquals(res[0]["badPwdCount"], "3")
958         self.assertEquals(res[0]["nextRid"], "1001")
959         self.assertEquals(res[0]["revision"], "1")
960         # Check in local db
961         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
962         self.assertEquals(len(res), 1)
963         self.assertEquals(str(res[0].dn), dn)
964         self.assertTrue(not "description" in res[0])
965         self.assertTrue(not "badPwdCount" in res[0])
966         self.assertTrue(not "nextRid" in res[0])
967         self.assertEquals(res[0]["revision"], "1")
968         # Check in remote db
969         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
970         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
971         self.assertEquals(len(res), 1)
972         self.assertEquals(str(res[0].dn), dn2)
973         self.assertEquals(res[0]["description"], "foo")
974         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
975         self.assertEquals(res[0]["sambaNextRid"], "1001")
976         self.assertTrue(not "revision" in res[0])
977
978         # Modify of split record
979         ldif = """
980 dn: """ + dn + """
981 replace: description
982 description: test
983 replace: badPwdCount
984 badPwdCount: 4
985 replace: revision
986 revision: 2
987 """
988         self.ldb.modify_ldif(ldif)
989         # Check in mapped db
990         attrs = ["description", "badPwdCount", "nextRid", "revision"]
991         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
992         self.assertEquals(len(res), 1)
993         self.assertEquals(str(res[0].dn), dn)
994         self.assertEquals(res[0]["description"], "test")
995         self.assertEquals(res[0]["badPwdCount"], "4")
996         self.assertEquals(res[0]["nextRid"], "1001")
997         self.assertEquals(res[0]["revision"], "2")
998         # Check in local db
999         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1000         self.assertEquals(len(res), 1)
1001         self.assertEquals(str(res[0].dn), dn)
1002         self.assertTrue(not "description" in res[0])
1003         self.assertTrue(not "badPwdCount" in res[0])
1004         self.assertTrue(not "nextRid" in res[0])
1005         self.assertEquals(res[0]["revision"], "2")
1006         # Check in remote db
1007         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
1008         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1009         self.assertEquals(len(res), 1)
1010         self.assertEquals(str(res[0].dn), dn2)
1011         self.assertEquals(res[0]["description"], "test")
1012         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1013         self.assertEquals(res[0]["sambaNextRid"], "1001")
1014         self.assertTrue(not "revision" in res[0])
1015
1016         # Rename split record
1017         dn2 = self.samba4.dn("cn=toast")
1018         self.ldb.rename(dn, dn2)
1019         # Check in mapped db
1020         dn = dn2
1021         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1022         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1023         self.assertEquals(len(res), 1)
1024         self.assertEquals(str(res[0].dn), dn)
1025         self.assertEquals(res[0]["description"], "test")
1026         self.assertEquals(res[0]["badPwdCount"], "4")
1027         self.assertEquals(res[0]["nextRid"], "1001")
1028         self.assertEquals(res[0]["revision"], "2")
1029         # Check in local db
1030         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1031         self.assertEquals(len(res), 1)
1032         self.assertEquals(str(res[0].dn), dn)
1033         self.assertTrue(not "description" in res[0])
1034         self.assertTrue(not "badPwdCount" in res[0])
1035         self.assertTrue(not "nextRid" in res[0])
1036         self.assertEquals(res[0]["revision"], "2")
1037         # Check in remote db
1038         dn2 = self.samba3.dn("cn=toast")
1039         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
1040         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1041         self.assertEquals(len(res), 1)
1042         self.assertEquals(str(res[0].dn), dn2)
1043         self.assertEquals(res[0]["description"], "test")
1044         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1045         self.assertEquals(res[0]["sambaNextRid"], "1001")
1046         self.assertTrue(not "revision" in res[0])
1047
1048         # Delete split record
1049         self.ldb.delete(dn)
1050         # Check in mapped db
1051         res = self.ldb.search(dn, scope=SCOPE_BASE)
1052         self.assertEquals(len(res), 0)
1053         # Check in local db
1054         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1055         self.assertEquals(len(res), 0)
1056         # Check in remote db
1057         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1058         self.assertEquals(len(res), 0)