r26630: Split up big tests into various smaller functions, making it easier to debug.
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2007
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
24
25 import os
26 import sys
27 import samba
28 import ldb
29 from ldb import SCOPE_DEFAULT, SCOPE_BASE
30 from samba import Ldb, substitute_var
31 from samba.tests import LdbTestCase, TestCaseInTempDir
32
33 datadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3")
34
35 class MapBaseTestCase(TestCaseInTempDir):
36     def setup_data(self, obj, ldif):
37         self.assertTrue(ldif is not None)
38         obj.db.add_ldif(substitute_var(ldif, obj.substvars))
39
40     def setup_modules(self, ldb, s3, s4):
41         ldb.add({"dn": "@MAP=samba3sam",
42                  "@FROM": s4.basedn,
43                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
44
45         ldb.add({"dn": "@MODULES",
46                  "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
47
48         ldb.add({"dn": "@PARTITION",
49             "partition": [s4.basedn + ":" + s4.url, s3.basedn + ":" + s3.url],
50             "replicateEntries": ["@SUBCLASSES", "@ATTRIBUTES", "@INDEXLIST"]})
51
52     def setUp(self):
53         super(MapBaseTestCase, self).setUp()
54
55         def make_dn(basedn, rdn):
56             return rdn + ",sambaDomainName=TESTS," + basedn
57
58         def make_s4dn(basedn, rdn):
59             return rdn + "," + basedn
60
61         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
62         self.ldburl = "tdb://" + self.ldbfile
63
64         tempdir = self.tempdir
65         print tempdir
66
67         class Target:
68             """Simple helper class that contains data for a specific SAM connection."""
69             def __init__(self, file, basedn, dn):
70                 self.file = os.path.join(tempdir, file)
71                 self.url = "tdb://" + self.file
72                 self.basedn = basedn
73                 self.substvars = {"BASEDN": self.basedn}
74                 self.db = Ldb()
75                 self._dn = dn
76
77             def dn(self, rdn):
78                 return self._dn(rdn, self.basedn)
79
80             def connect(self):
81                 return self.db.connect(self.url)
82
83         self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
84         self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
85         self.templates = Target("templates.ldb", "cn=templates", None)
86
87         self.samba3.connect()
88         self.templates.connect()
89         self.samba4.connect()
90
91     def tearDown(self):
92         os.unlink(self.ldbfile)
93         os.unlink(self.samba3.file)
94         os.unlink(self.templates.file)
95         os.unlink(self.samba4.file)
96         super(MapBaseTestCase, self).tearDown()
97
98
99 class Samba3SamTestCase(MapBaseTestCase):
100     def setUp(self):
101         super(Samba3SamTestCase, self).setUp()
102         ldb = Ldb(self.ldburl)
103         self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read())
104         self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
105         ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
106         ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))
107         self.setup_modules(ldb, self.samba3, self.samba4)
108
109         self.ldb = Ldb(self.ldburl)
110
111     def test_s3sam_search(self):
112         ldb = self.ldb
113         print "Looking up by non-mapped attribute"
114         msg = ldb.search(expression="(cn=Administrator)")
115         self.assertEquals(len(msg), 1)
116         self.assertEquals(msg[0]["cn"], "Administrator")
117
118         print "Looking up by mapped attribute"
119         msg = ldb.search(expression="(name=Backup Operators)")
120         self.assertEquals(len(msg), 1)
121         self.assertEquals(msg[0]["name"], "Backup Operators")
122
123         print "Looking up by old name of renamed attribute"
124         msg = ldb.search(expression="(displayName=Backup Operators)")
125         self.assertEquals(len(msg), 0)
126
127         print "Looking up mapped entry containing SID"
128         msg = ldb.search(expression="(cn=Replicator)")
129         self.assertEquals(len(msg), 1)
130         print msg[0].dn
131         self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
132         self.assertEquals(msg[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
133
134         print "Checking mapping of objectClass"
135         oc = set(msg[0]["objectClass"])
136         self.assertTrue(oc is not None)
137         for i in oc:
138             self.assertEquals(oc[i] == "posixGroup" or oc[i], "group")
139
140         print "Looking up by objectClass"
141         msg = ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
142         self.assertEquals(len(msg), 2)
143         for i in range(len(msg)):
144             self.assertEquals((str(msg[i].dn), "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or
145                    (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl"))
146
147
148     def test_s3sam_modify(self):
149         ldb = self.ldb
150         s3 = self.samba3
151         print "Adding a record that will be fallbacked"
152         ldb.add({"dn": "cn=Foo", 
153             "foo": "bar", 
154             "blah": "Blie", 
155             "cn": "Foo", 
156             "showInAdvancedViewOnly": "TRUE"}
157             )
158
159         print "Checking for existence of record (local)"
160         # TODO: This record must be searched in the local database, which is currently only supported for base searches
161         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
162         # TODO: Actually, this version should work as well but doesn't...
163         # 
164         #    
165         msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=LDB_SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly'])
166         self.assertEquals(len(msg), 1)
167         self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
168         self.assertEquals(msg[0]["foo"], "bar")
169         self.assertEquals(msg[0]["blah"], "Blie")
170
171         print "Adding record that will be mapped"
172         ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
173                  "objectClass": "user",
174                  "unixName": "bin",
175                  "sambaUnicodePwd": "geheim",
176                  "cn": "Niemand"})
177
178         print "Checking for existence of record (remote)"
179         msg = ldb.search(expression="(unixName=bin)", attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
180         self.assertEquals(len(msg), 1)
181         self.assertEquals(msg[0]["cn"], "Niemand")
182         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
183
184         print "Checking for existence of record (local && remote)"
185         msg = ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
186                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
187         self.assertEquals(len(msg), 1)           # TODO: should check with more records
188         self.assertEquals(msg[0]["cn"], "Niemand")
189         self.assertEquals(msg[0]["unixName"], "bin")
190         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
191
192         print "Checking for existence of record (local || remote)"
193         msg = ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
194                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
195         print "got " + len(msg) + " replies"
196         self.assertEquals(len(msg), 1)        # TODO: should check with more records
197         self.assertEquals(msg[0]["cn"], "Niemand")
198         self.assertEquals(msg[0]["unixName"] == "bin" or msg[0]["sambaUnicodePwd"], "geheim")
199
200         print "Checking for data in destination database"
201         msg = s3.db.search("(cn=Niemand)")
202         self.assertTrue(len(msg) >= 1)
203         self.assertEquals(msg[0]["sambaSID"], "S-1-5-21-4231626423-2410014848-2360679739-2001")
204         self.assertEquals(msg[0]["displayName"], "Niemand")
205
206         print "Adding attribute..."
207         ldb.modify_ldif("""
208 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
209 changetype: modify
210 add: description
211 description: Blah
212 """)
213
214         print "Checking whether changes are still there..."
215         msg = ldb.search(expression="(cn=Niemand)")
216         self.assertTrue(len(msg) >= 1)
217         self.assertEquals(msg[0]["cn"], "Niemand")
218         self.assertEquals(msg[0]["description"], "Blah")
219
220         print "Modifying attribute..."
221         ldb.modify_ldif("""
222 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
223 changetype: modify
224 replace: description
225 description: Blie
226 """)
227
228         print "Checking whether changes are still there..."
229         msg = ldb.search(expression="(cn=Niemand)")
230         self.assertTrue(len(msg) >= 1)
231         self.assertEquals(msg[0]["description"], "Blie")
232
233         print "Deleting attribute..."
234         ldb.modify_ldif("""
235 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
236 changetype: modify
237 delete: description
238 """)
239
240         print "Checking whether changes are no longer there..."
241         msg = ldb.search(expression="(cn=Niemand)")
242         self.assertTrue(len(msg) >= 1)
243         self.assertEquals(msg[0]["description"], undefined)
244
245         print "Renaming record..."
246         ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
247
248         print "Checking whether DN has changed..."
249         msg = ldb.search(expression="(cn=Niemand2)")
250         self.assertEquals(len(msg), 1)
251         self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
252
253         print "Deleting record..."
254         ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
255
256         print "Checking whether record is gone..."
257         msg = ldb.search(expression="(cn=Niemand2)")
258         self.assertEquals(len(msg), 0)
259
260
261
262 class MapTestCase(MapBaseTestCase):
263     def setUp(self):
264         super(MapTestCase, self).setUp()
265         ldb = Ldb(self.ldburl)
266         self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
267         ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
268         ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))
269         self.setup_modules(ldb, self.samba3, self.samba4)
270         self.ldb = Ldb(self.ldburl)
271
272     def test_map_search(self):
273         s3 = self.samba3
274         ldb = self.ldb
275         s4 = self.samba4
276         print "Running search tests on mapped data"
277         ldif = """
278 dn: """ + "sambaDomainName=TESTS,""" + s3.basedn + """
279 objectclass: sambaDomain
280 objectclass: top
281 sambaSID: S-1-5-21-4231626423-2410014848-2360679739
282 sambaNextRid: 2000
283 sambaDomainName: TESTS"""
284         s3.db.add_ldif(substitute_var(ldif, s3.substvars))
285
286         print "Add a set of split records"
287         ldif = """
288 dn: """ + s4.dn("cn=X") + """
289 objectClass: user
290 cn: X
291 codePage: x
292 revision: x
293 dnsHostName: x
294 nextRid: y
295 lastLogon: x
296 description: x
297 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
298 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
299
300 dn: """ + s4.dn("cn=Y") + """
301 objectClass: top
302 cn: Y
303 codePage: x
304 revision: x
305 dnsHostName: y
306 nextRid: y
307 lastLogon: y
308 description: x
309
310 dn: """ + s4.dn("cn=Z") + """
311 objectClass: top
312 cn: Z
313 codePage: x
314 revision: y
315 dnsHostName: z
316 nextRid: y
317 lastLogon: z
318 description: y
319 """
320
321         ldb.add_ldif(substitute_var(ldif, s4.substvars))
322
323         print "Add a set of remote records"
324
325         ldif = """
326 dn: """ + s3.dn("cn=A") + """
327 objectClass: posixAccount
328 cn: A
329 sambaNextRid: x
330 sambaBadPasswordCount: x
331 sambaLogonTime: x
332 description: x
333 sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552
334 sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512
335
336 dn: """ + s3.dn("cn=B") + """
337 objectClass: top
338 cn:B
339 sambaNextRid: x
340 sambaBadPasswordCount: x
341 sambaLogonTime: y
342 description: x
343
344 dn: """ + s3.dn("cn=C") + """
345 objectClass: top
346 cn: C
347 sambaNextRid: x
348 sambaBadPasswordCount: y
349 sambaLogonTime: z
350 description: y
351 """
352         s3.add_ldif(substitute_var(ldif, s3.substvars))
353
354         print "Testing search by DN"
355
356         # Search remote record by local DN
357         dn = s4.dn("cn=A")
358         attrs = ["dnsHostName", "lastLogon"]
359         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
360         self.assertEquals(len(res), 1)
361         self.assertEquals(str(str(res[0].dn)), dn)
362         self.assertEquals(res[0]["dnsHostName"], undefined)
363         self.assertEquals(res[0]["lastLogon"], "x")
364
365         # Search remote record by remote DN
366         dn = s3.dn("cn=A")
367         attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
368         res = s3.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
369         self.assertEquals(len(res), 1)
370         self.assertEquals(str(str(res[0].dn)), dn)
371         self.assertEquals(res[0]["dnsHostName"], undefined)
372         self.assertEquals(res[0]["lastLogon"], undefined)
373         self.assertEquals(res[0]["sambaLogonTime"], "x")
374
375         # Search split record by local DN
376         dn = s4.dn("cn=X")
377         attrs = ["dnsHostName", "lastLogon"]
378         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
379         self.assertEquals(len(res), 1)
380         self.assertEquals(str(str(res[0].dn)), dn)
381         self.assertEquals(res[0]["dnsHostName"], "x")
382         self.assertEquals(res[0]["lastLogon"], "x")
383
384         # Search split record by remote DN
385         dn = s3.dn("cn=X")
386         attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
387         res = s3.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
388         self.assertEquals(len(res), 1)
389         self.assertEquals(str(str(res[0].dn)), dn)
390         self.assertEquals(res[0]["dnsHostName"], undefined)
391         self.assertEquals(res[0]["lastLogon"], undefined)
392         self.assertEquals(res[0]["sambaLogonTime"], "x")
393
394         print "Testing search by attribute"
395
396         # Search by ignored attribute
397         attrs = ["dnsHostName", "lastLogon"]
398         res = ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, attrs=attrs)
399         self.assertEquals(len(res), 2)
400         self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Y"))
401         self.assertEquals(res[0]["dnsHostName"], "y")
402         self.assertEquals(res[0]["lastLogon"], "y")
403         self.assertEquals(str(str(res[1].dn)), s4.dn("cn=X"))
404         self.assertEquals(res[1]["dnsHostName"], "x")
405         self.assertEquals(res[1]["lastLogon"], "x")
406
407         # Search by kept attribute
408         attrs = ["dnsHostName", "lastLogon"]
409         res = ldb.search(expression="(description=y)", scope=SCOPE_DEFAULT, attrs=attrs)
410         self.assertEquals(len(res), 2)
411         self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Z"))
412         self.assertEquals(res[0]["dnsHostName"], "z")
413         self.assertEquals(res[0]["lastLogon"], "z")
414         self.assertEquals(str(str(res[1].dn)), s4.dn("cn=C"))
415         self.assertEquals(res[1]["dnsHostName"], undefined)
416         self.assertEquals(res[1]["lastLogon"], "z")
417
418         # Search by renamed attribute
419         attrs = ["dnsHostName", "lastLogon"]
420         res = ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, attrs=attrs)
421         self.assertEquals(len(res), 2)
422         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
423         self.assertEquals(res[0]["dnsHostName"], undefined)
424         self.assertEquals(res[0]["lastLogon"], "y")
425         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
426         self.assertEquals(res[1]["dnsHostName"], undefined)
427         self.assertEquals(res[1]["lastLogon"], "x")
428
429         # Search by converted attribute
430         attrs = ["dnsHostName", "lastLogon", "objectSid"]
431         # TODO:
432         #   Using the SID directly in the parse tree leads to conversion
433         #   errors, letting the search fail with no results.
434         #res = ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", NULL, ldb. SCOPE_DEFAULT, attrs)
435         res = ldb.search(expression="(objectSid=*)", attrs=attrs)
436         self.assertEquals(len(res), 3)
437         self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
438         self.assertEquals(res[0]["dnsHostName"], "x")
439         self.assertEquals(res[0]["lastLogon"], "x")
440         self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
441         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
442         self.assertEquals(res[1]["dnsHostName"], undefined)
443         self.assertEquals(res[1]["lastLogon"], "x")
444         self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
445
446         # Search by generated attribute 
447         # In most cases, this even works when the mapping is missing
448         # a `convert_operator' by enumerating the remote db.
449         attrs = ["dnsHostName", "lastLogon", "primaryGroupID"]
450         res = ldb.search(expression="(primaryGroupID=512)", attrs=attrs)
451         self.assertEquals(len(res), 1)
452         self.assertEquals(str(res[0].dn), s4.dn("cn=A"))
453         self.assertEquals(res[0]["dnsHostName"], undefined)
454         self.assertEquals(res[0]["lastLogon"], "x")
455         self.assertEquals(res[0]["primaryGroupID"], "512")
456
457         # TODO: There should actually be two results, A and X.  The
458         # primaryGroupID of X seems to get corrupted somewhere, and the
459         # objectSid isn't available during the generation of remote (!) data,
460         # which can be observed with the following search.  Also note that Xs
461         # objectSid seems to be fine in the previous search for objectSid... */
462         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
463         #print len(res) + " results found"
464         #for i in range(len(res)):
465         #    for (obj in res[i]) {
466         #        print obj + ": " + res[i][obj]
467         #    }
468         #    print "---"
469         #    
470
471         # Search by remote name of renamed attribute */
472         attrs = ["dnsHostName", "lastLogon"]
473         res = ldb.search(expression="(sambaBadPasswordCount=*)", attrs=attrs)
474         self.assertEquals(len(res), 0)
475
476         # Search by objectClass
477         attrs = ["dnsHostName", "lastLogon", "objectClass"]
478         res = ldb.search(expression="(objectClass=user)", attrs=attrs)
479         self.assertEquals(len(res), 2)
480         self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
481         self.assertEquals(res[0]["dnsHostName"], "x")
482         self.assertEquals(res[0]["lastLogon"], "x")
483         self.assertTrue(res[0]["objectClass"] is not None)
484         self.assertEquals(res[0]["objectClass"][0], "user")
485         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
486         self.assertEquals(res[1]["dnsHostName"], undefined)
487         self.assertEquals(res[1]["lastLogon"], "x")
488         self.assertTrue(res[1]["objectClass"] is not None)
489         self.assertEquals(res[1]["objectClass"][0], "user")
490
491         # Prove that the objectClass is actually used for the search
492         res = ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs)
493         self.assertEquals(len(res), 3)
494         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
495         self.assertEquals(res[0]["dnsHostName"], undefined)
496         self.assertEquals(res[0]["lastLogon"], "y")
497         self.assertTrue(res[0]["objectClass"] is not None)
498         for oc in set(res[0]["objectClass"]):
499             self.assertEquals(oc, "user")
500         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
501         self.assertEquals(res[1]["dnsHostName"], "x")
502         self.assertEquals(res[1]["lastLogon"], "x")
503         self.assertTrue(res[1]["objectClass"] is not None)
504         self.assertEquals(res[1]["objectClass"][0], "user")
505         self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
506         self.assertEquals(res[2]["dnsHostName"], undefined)
507         self.assertEquals(res[2]["lastLogon"], "x")
508         self.assertTrue(res[2]["objectClass"] is not None)
509         self.assertEquals(res[2]["objectClass"][0], "user")
510
511         print "Testing search by parse tree"
512
513         # Search by conjunction of local attributes
514         attrs = ["dnsHostName", "lastLogon"]
515         res = ldb.search(expression="(&(codePage=x)(revision=x))", attrs=attrs)
516         self.assertEquals(len(res), 2)
517         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
518         self.assertEquals(res[0]["dnsHostName"], "y")
519         self.assertEquals(res[0]["lastLogon"], "y")
520         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
521         self.assertEquals(res[1]["dnsHostName"], "x")
522         self.assertEquals(res[1]["lastLogon"], "x")
523
524         # Search by conjunction of remote attributes
525         attrs = ["dnsHostName", "lastLogon"]
526         res = ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=attrs)
527         self.assertEquals(len(res), 2)
528         self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
529         self.assertEquals(res[0]["dnsHostName"], "x")
530         self.assertEquals(res[0]["lastLogon"], "x")
531         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
532         self.assertEquals(res[1]["dnsHostName"], undefined)
533         self.assertEquals(res[1]["lastLogon"], "x")
534         
535         # Search by conjunction of local and remote attribute 
536         attrs = ["dnsHostName", "lastLogon"]
537         res = ldb.search(expression="(&(codePage=x)(description=x))", attrs=attrs)
538         self.assertEquals(len(res), 2)
539         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
540         self.assertEquals(res[0]["dnsHostName"], "y")
541         self.assertEquals(res[0]["lastLogon"], "y")
542         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
543         self.assertEquals(res[1]["dnsHostName"], "x")
544         self.assertEquals(res[1]["lastLogon"], "x")
545
546         # Search by conjunction of local and remote attribute w/o match
547         attrs = ["dnsHostName", "lastLogon"]
548         res = ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs)
549         self.assertEquals(len(res), 0)
550         res = ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs)
551         self.assertEquals(len(res), 0)
552
553         # Search by disjunction of local attributes
554         attrs = ["dnsHostName", "lastLogon"]
555         res = ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=attrs)
556         self.assertEquals(len(res), 2)
557         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
558         self.assertEquals(res[0]["dnsHostName"], "y")
559         self.assertEquals(res[0]["lastLogon"], "y")
560         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
561         self.assertEquals(res[1]["dnsHostName"], "x")
562         self.assertEquals(res[1]["lastLogon"], "x")
563
564         # Search by disjunction of remote attributes
565         attrs = ["dnsHostName", "lastLogon"]
566         res = ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=attrs)
567         self.assertEquals(len(res), 3)
568         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
569         self.assertEquals(res[0]["dnsHostName"], undefined)
570         self.assertEquals(res[0]["lastLogon"], "y")
571         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
572         self.assertEquals(res[1]["dnsHostName"], "x")
573         self.assertEquals(res[1]["lastLogon"], "x")
574         self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
575         self.assertEquals(res[2]["dnsHostName"], undefined)
576         self.assertEquals(res[2]["lastLogon"], "x")
577
578         # Search by disjunction of local and remote attribute
579         attrs = ["dnsHostName", "lastLogon"]
580         res = ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=attrs)
581         self.assertEquals(len(res), 3)
582         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
583         self.assertEquals(res[0]["dnsHostName"], "y")
584         self.assertEquals(res[0]["lastLogon"], "y")
585         self.assertEquals(str(res[1].dn), s4.dn("cn=B"))
586         self.assertEquals(res[1]["dnsHostName"], undefined)
587         self.assertEquals(res[1]["lastLogon"], "y")
588         self.assertEquals(str(res[2].dn), s4.dn("cn=X"))
589         self.assertEquals(res[2]["dnsHostName"], "x")
590         self.assertEquals(res[2]["lastLogon"], "x")
591
592         # Search by disjunction of local and remote attribute w/o match
593         attrs = ["dnsHostName", "lastLogon"]
594         res = ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=attrs)
595         self.assertEquals(len(res), 0)
596
597         # Search by negated local attribute
598         attrs = ["dnsHostName", "lastLogon"]
599         res = ldb.search(expression="(!(revision=x))", attrs=attrs)
600         self.assertEquals(len(res), 5)
601         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
602         self.assertEquals(res[0]["dnsHostName"], undefined)
603         self.assertEquals(res[0]["lastLogon"], "y")
604         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
605         self.assertEquals(res[1]["dnsHostName"], undefined)
606         self.assertEquals(res[1]["lastLogon"], "x")
607         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
608         self.assertEquals(res[2]["dnsHostName"], "z")
609         self.assertEquals(res[2]["lastLogon"], "z")
610         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
611         self.assertEquals(res[3]["dnsHostName"], undefined)
612         self.assertEquals(res[3]["lastLogon"], "z")
613
614         # Search by negated remote attribute
615         attrs = ["dnsHostName", "lastLogon"]
616         res = ldb.search(expression="(!(description=x))", attrs=attrs)
617         self.assertEquals(len(res), 3)
618         self.assertEquals(str(res[0].dn), s4.dn("cn=Z"))
619         self.assertEquals(res[0]["dnsHostName"], "z")
620         self.assertEquals(res[0]["lastLogon"], "z")
621         self.assertEquals(str(res[1].dn), s4.dn("cn=C"))
622         self.assertEquals(res[1]["dnsHostName"], undefined)
623         self.assertEquals(res[1]["lastLogon"], "z")
624
625         # Search by negated conjunction of local attributes
626         attrs = ["dnsHostName", "lastLogon"]
627         res = ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=attrs)
628         self.assertEquals(len(res), 5)
629         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
630         self.assertEquals(res[0]["dnsHostName"], undefined)
631         self.assertEquals(res[0]["lastLogon"], "y")
632         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
633         self.assertEquals(res[1]["dnsHostName"], undefined)
634         self.assertEquals(res[1]["lastLogon"], "x")
635         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
636         self.assertEquals(res[2]["dnsHostName"], "z")
637         self.assertEquals(res[2]["lastLogon"], "z")
638         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
639         self.assertEquals(res[3]["dnsHostName"], undefined)
640         self.assertEquals(res[3]["lastLogon"], "z")
641
642         # Search by negated conjunction of remote attributes
643         attrs = ["dnsHostName", "lastLogon"]
644         res = ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=attrs)
645         self.assertEquals(len(res), 5)
646         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
647         self.assertEquals(res[0]["dnsHostName"], "y")
648         self.assertEquals(res[0]["lastLogon"], "y")
649         self.assertEquals(str(res[1].dn), s4.dn("cn=B"))
650         self.assertEquals(res[1]["dnsHostName"], undefined)
651         self.assertEquals(res[1]["lastLogon"], "y")
652         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
653         self.assertEquals(res[2]["dnsHostName"], "z")
654         self.assertEquals(res[2]["lastLogon"], "z")
655         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
656         self.assertEquals(res[3]["dnsHostName"], undefined)
657         self.assertEquals(res[3]["lastLogon"], "z")
658
659         # Search by negated conjunction of local and remote attribute
660         attrs = ["dnsHostName", "lastLogon"]
661         res = ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=attrs)
662         self.assertEquals(len(res), 5)
663         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
664         self.assertEquals(res[0]["dnsHostName"], undefined)
665         self.assertEquals(res[0]["lastLogon"], "y")
666         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
667         self.assertEquals(res[1]["dnsHostName"], undefined)
668         self.assertEquals(res[1]["lastLogon"], "x")
669         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
670         self.assertEquals(res[2]["dnsHostName"], "z")
671         self.assertEquals(res[2]["lastLogon"], "z")
672         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
673         self.assertEquals(res[3]["dnsHostName"], undefined)
674         self.assertEquals(res[3]["lastLogon"], "z")
675
676         # Search by negated disjunction of local attributes
677         attrs = ["dnsHostName", "lastLogon"]
678         res = ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=attrs)
679         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
680         self.assertEquals(res[0]["dnsHostName"], undefined)
681         self.assertEquals(res[0]["lastLogon"], "y")
682         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
683         self.assertEquals(res[1]["dnsHostName"], undefined)
684         self.assertEquals(res[1]["lastLogon"], "x")
685         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
686         self.assertEquals(res[2]["dnsHostName"], "z")
687         self.assertEquals(res[2]["lastLogon"], "z")
688         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
689         self.assertEquals(res[3]["dnsHostName"], undefined)
690         self.assertEquals(res[3]["lastLogon"], "z")
691
692         # Search by negated disjunction of remote attributes
693         attrs = ["dnsHostName", "lastLogon"]
694         res = ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=attrs)
695         self.assertEquals(len(res), 4)
696         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
697         self.assertEquals(res[0]["dnsHostName"], "y")
698         self.assertEquals(res[0]["lastLogon"], "y")
699         self.assertEquals(str(res[1].dn), s4.dn("cn=Z"))
700         self.assertEquals(res[1]["dnsHostName"], "z")
701         self.assertEquals(res[1]["lastLogon"], "z")
702         self.assertEquals(str(res[2].dn), s4.dn("cn=C"))
703         self.assertEquals(res[2]["dnsHostName"], undefined)
704         self.assertEquals(res[2]["lastLogon"], "z")
705
706         # Search by negated disjunction of local and remote attribute
707         attrs = ["dnsHostName", "lastLogon"]
708         res = ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", attrs=attrs)
709         self.assertEquals(len(res), 4)
710         self.assertEquals(str(res[0].dn), s4.dn("cn=A"))
711         self.assertEquals(res[0]["dnsHostName"], undefined)
712         self.assertEquals(res[0]["lastLogon"], "x")
713         self.assertEquals(str(res[1].dn), s4.dn("cn=Z"))
714         self.assertEquals(res[1]["dnsHostName"], "z")
715         self.assertEquals(res[1]["lastLogon"], "z")
716         self.assertEquals(str(res[2].dn), s4.dn("cn=C"))
717         self.assertEquals(res[2]["dnsHostName"], undefined)
718         self.assertEquals(res[2]["lastLogon"], "z")
719
720         print "Search by complex parse tree"
721         attrs = ["dnsHostName", "lastLogon"]
722         res = ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=attrs)
723         self.assertEquals(len(res), 6)
724         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
725         self.assertEquals(res[0]["dnsHostName"], undefined)
726         self.assertEquals(res[0]["lastLogon"], "y")
727         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
728         self.assertEquals(res[1]["dnsHostName"], "x")
729         self.assertEquals(res[1]["lastLogon"], "x")
730         self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
731         self.assertEquals(res[2]["dnsHostName"], undefined)
732         self.assertEquals(res[2]["lastLogon"], "x")
733         self.assertEquals(str(res[3].dn), s4.dn("cn=Z"))
734         self.assertEquals(res[3]["dnsHostName"], "z")
735         self.assertEquals(res[3]["lastLogon"], "z")
736         self.assertEquals(str(res[4].dn), s4.dn("cn=C"))
737         self.assertEquals(res[4]["dnsHostName"], undefined)
738         self.assertEquals(res[4]["lastLogon"], "z")
739
740         # Clean up
741         dns = [s4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
742         for dn in dns:
743             ldb.delete(dn)
744
745     def test_map_modify_local(self):
746         """Modification of local records."""
747         s3 = self.samba3
748         ldb = self.ldb
749         s4 = self.samba4
750
751         # Add local record
752         dn = "cn=test,dc=idealx,dc=org"
753         ldb.add({"dn": dn, 
754                  "cn": "test",
755                  "foo": "bar",
756                  "revision": "1",
757                  "description": "test"})
758         # Check it's there
759         attrs = ["foo", "revision", "description"]
760         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
761         self.assertEquals(len(res), 1)
762         self.assertEquals(str(res[0].dn), dn)
763         self.assertEquals(res[0]["foo"], "bar")
764         self.assertEquals(res[0]["revision"], "1")
765         self.assertEquals(res[0]["description"], "test")
766         # Check it's not in the local db
767         res = s4.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs)
768         self.assertEquals(len(res), 0)
769         # Check it's not in the remote db
770         res = s3.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs)
771         self.assertEquals(len(res), 0)
772
773         # Modify local record
774         ldif = """
775 dn: """ + dn + """
776 replace: foo
777 foo: baz
778 replace: description
779 description: foo
780 """
781         ldb.modify_ldif(ldif)
782         # Check in local db
783         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
784         self.assertEquals(len(res), 1)
785         self.assertEquals(str(res[0].dn), dn)
786         self.assertEquals(res[0]["foo"], "baz")
787         self.assertEquals(res[0]["revision"], "1")
788         self.assertEquals(res[0]["description"], "foo")
789
790         # Rename local record
791         dn2 = "cn=toast,dc=idealx,dc=org"
792         ldb.rename(dn, dn2)
793         # Check in local db
794         res = ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
795         self.assertEquals(len(res), 1)
796         self.assertEquals(str(res[0].dn), dn2)
797         self.assertEquals(res[0]["foo"], "baz")
798         self.assertEquals(res[0]["revision"], "1")
799         self.assertEquals(res[0]["description"], "foo")
800
801         # Delete local record
802         ldb.delete(dn2)
803         # Check it's gone
804         res = ldb.search(dn2, scope=SCOPE_BASE)
805         self.assertEquals(len(res), 0)
806
807     def test_map_modify_remote_remote(self):
808         """Modification of remote data of remote records"""
809         s3 = self.samba3
810         ldb = self.ldb
811         s4 = self.samba4
812
813         # Add remote record
814         dn = s4.dn("cn=test")
815         dn2 = s3.dn("cn=test")
816         s3.db.add({"dn": dn2, 
817                    "cn": "test",
818                    "description": "foo",
819                    "sambaBadPasswordCount": "3",
820                    "sambaNextRid": "1001"})
821         # Check it's there
822         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
823         res = s3.db.search("", dn2, SCOPE_BASE, attrs)
824         self.assertEquals(len(res), 1)
825         self.assertEquals(str(res[0].dn), dn2)
826         self.assertEquals(res[0]["description"], "foo")
827         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
828         self.assertEquals(res[0]["sambaNextRid"], "1001")
829         # Check in mapped db
830         attrs = ["description", "badPwdCount", "nextRid"]
831         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
832         self.assertEquals(len(res), 1)
833         self.assertEquals(str(res[0].dn), dn)
834         self.assertEquals(res[0]["description"], "foo")
835         self.assertEquals(res[0]["badPwdCount"], "3")
836         self.assertEquals(res[0]["nextRid"], "1001")
837         # Check in local db
838         res = s4.db.search("", dn, SCOPE_BASE, attrs)
839         self.assertEquals(len(res), 0)
840
841         # Modify remote data of remote record
842         ldif = """
843 dn: """ + dn + """
844 replace: description
845 description: test
846 replace: badPwdCount
847 badPwdCount: 4
848 """
849         ldb.modify_ldif(ldif)
850         # Check in mapped db
851         attrs = ["description", "badPwdCount", "nextRid"]
852         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
853         self.assertEquals(len(res), 1)
854         self.assertEquals(str(res[0].dn), dn)
855         self.assertEquals(res[0]["description"], "test")
856         self.assertEquals(res[0]["badPwdCount"], "4")
857         self.assertEquals(res[0]["nextRid"], "1001")
858         # Check in remote db
859         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
860         res = s3.db.search("", dn2, SCOPE_BASE, attrs)
861         self.assertEquals(len(res), 1)
862         self.assertEquals(str(res[0].dn), dn2)
863         self.assertEquals(res[0]["description"], "test")
864         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
865         self.assertEquals(res[0]["sambaNextRid"], "1001")
866
867         # Rename remote record
868         dn2 = s4.dn("cn=toast")
869         ldb.rename(dn, dn2)
870         # Check in mapped db
871         dn = dn2
872         attrs = ["description", "badPwdCount", "nextRid"]
873         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
874         self.assertEquals(len(res), 1)
875         self.assertEquals(str(res[0].dn), dn)
876         self.assertEquals(res[0]["description"], "test")
877         self.assertEquals(res[0]["badPwdCount"], "4")
878         self.assertEquals(res[0]["nextRid"], "1001")
879         # Check in remote db 
880         dn2 = s3.dn("cn=toast")
881         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
882         res = s3.db.search("", dn2, SCOPE_BASE, attrs)
883         self.assertEquals(len(res), 1)
884         self.assertEquals(str(res[0].dn), dn2)
885         self.assertEquals(res[0]["description"], "test")
886         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
887         self.assertEquals(res[0]["sambaNextRid"], "1001")
888
889         # Delete remote record
890         ldb.delete(dn)
891         # Check in mapped db
892         res = ldb.search(dn, scope=SCOPE_BASE)
893         self.assertEquals(len(res), 0)
894         # Check in remote db
895         res = s3.db.search("", dn2, SCOPE_BASE)
896         self.assertEquals(len(res), 0)
897
898     def test_map_modify_remote_local(self):
899         """Modification of local data of remote records"""
900         s3 = self.samba3
901         ldb = self.ldb
902         s4 = self.samba4
903
904         # Add remote record (same as before)
905         dn = s4.dn("cn=test")
906         dn2 = s3.dn("cn=test")
907         s3.db.add({"dn": dn2, 
908                    "cn": "test",
909                    "description": "foo",
910                    "sambaBadPasswordCount": "3",
911                    "sambaNextRid": "1001"})
912
913         # Modify local data of remote record
914         ldif = """
915 dn: """ + dn + """
916 add: revision
917 revision: 1
918 replace: description
919 description: test
920 """
921         ldb.modify_ldif(ldif)
922         # Check in mapped db
923         attrs = ["revision", "description"]
924         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
925         self.assertEquals(len(res), 1)
926         self.assertEquals(str(res[0].dn), dn)
927         self.assertEquals(res[0]["description"], "test")
928         self.assertEquals(res[0]["revision"], "1")
929         # Check in remote db
930         res = s3.db.search("", dn2, SCOPE_BASE, attrs)
931         self.assertEquals(len(res), 1)
932         self.assertEquals(str(res[0].dn), dn2)
933         self.assertEquals(res[0]["description"], "test")
934         self.assertEquals(res[0]["revision"], undefined)
935         # Check in local db
936         res = s4.db.search("", dn, SCOPE_BASE, attrs)
937         self.assertEquals(len(res), 1)
938         self.assertEquals(str(res[0].dn), dn)
939         self.assertEquals(res[0]["description"], undefined)
940         self.assertEquals(res[0]["revision"], "1")
941
942         # Delete (newly) split record
943         ldb.delete(dn)
944
945     def test_map_modify_split(self):
946         """Testing modification of split records"""
947         s3 = self.samba3
948         ldb = self.ldb
949         s4 = self.samba4
950
951         # Add split record
952         dn = s4.dn("cn=test")
953         dn2 = s3.dn("cn=test")
954         ldb.add({
955             "dn": dn,
956             "cn": "test",
957             "description": "foo",
958             "badPwdCount": "3",
959             "nextRid": "1001",
960             "revision": "1"})
961         # Check it's there
962         attrs = ["description", "badPwdCount", "nextRid", "revision"]
963         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
964         self.assertEquals(len(res), 1)
965         self.assertEquals(str(res[0].dn), dn)
966         self.assertEquals(res[0]["description"], "foo")
967         self.assertEquals(res[0]["badPwdCount"], "3")
968         self.assertEquals(res[0]["nextRid"], "1001")
969         self.assertEquals(res[0]["revision"], "1")
970         # Check in local db
971         res = s4.db.search("", dn, SCOPE_BASE, attrs)
972         self.assertEquals(len(res), 1)
973         self.assertEquals(str(res[0].dn), dn)
974         self.assertEquals(res[0]["description"], undefined)
975         self.assertEquals(res[0]["badPwdCount"], undefined)
976         self.assertEquals(res[0]["nextRid"], undefined)
977         self.assertEquals(res[0]["revision"], "1")
978         # Check in remote db
979         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
980         res = s3.db.search("", dn2, SCOPE_BASE, attrs)
981         self.assertEquals(len(res), 1)
982         self.assertEquals(str(res[0].dn), dn2)
983         self.assertEquals(res[0]["description"], "foo")
984         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
985         self.assertEquals(res[0]["sambaNextRid"], "1001")
986         self.assertEquals(res[0]["revision"], undefined)
987
988         # Modify of split record
989         ldif = """
990 dn: """ + dn + """
991 replace: description
992 description: test
993 replace: badPwdCount
994 badPwdCount: 4
995 replace: revision
996 revision: 2
997 """
998         ldb.modify_ldif(ldif)
999         # Check in mapped db
1000         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1001         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1002         self.assertEquals(len(res), 1)
1003         self.assertEquals(str(res[0].dn), dn)
1004         self.assertEquals(res[0]["description"], "test")
1005         self.assertEquals(res[0]["badPwdCount"], "4")
1006         self.assertEquals(res[0]["nextRid"], "1001")
1007         self.assertEquals(res[0]["revision"], "2")
1008         # Check in local db
1009         res = s4.db.search("", dn, SCOPE_BASE, attrs)
1010         self.assertEquals(len(res), 1)
1011         self.assertEquals(str(res[0].dn), dn)
1012         self.assertEquals(res[0]["description"], undefined)
1013         self.assertEquals(res[0]["badPwdCount"], undefined)
1014         self.assertEquals(res[0]["nextRid"], undefined)
1015         self.assertEquals(res[0]["revision"], "2")
1016         # Check in remote db
1017         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
1018         res = s3.db.search("", dn2, SCOPE_BASE, attrs)
1019         self.assertEquals(len(res), 1)
1020         self.assertEquals(str(res[0].dn), dn2)
1021         self.assertEquals(res[0]["description"], "test")
1022         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1023         self.assertEquals(res[0]["sambaNextRid"], "1001")
1024         self.assertEquals(res[0]["revision"], undefined)
1025
1026         # Rename split record
1027         dn2 = s4.dn("cn=toast")
1028         ldb.rename(dn, dn2)
1029         # Check in mapped db
1030         dn = dn2
1031         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1032         res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1033         self.assertEquals(len(res), 1)
1034         self.assertEquals(str(res[0].dn), dn)
1035         self.assertEquals(res[0]["description"], "test")
1036         self.assertEquals(res[0]["badPwdCount"], "4")
1037         self.assertEquals(res[0]["nextRid"], "1001")
1038         self.assertEquals(res[0]["revision"], "2")
1039         # Check in local db
1040         res = s4.db.search("", dn, SCOPE_BASE, attrs)
1041         self.assertEquals(len(res), 1)
1042         self.assertEquals(str(res[0].dn), dn)
1043         self.assertEquals(res[0]["description"], undefined)
1044         self.assertEquals(res[0]["badPwdCount"], undefined)
1045         self.assertEquals(res[0]["nextRid"], undefined)
1046         self.assertEquals(res[0]["revision"], "2")
1047         # Check in remote db
1048         dn2 = s3.dn("cn=toast")
1049         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
1050         res = s3.db.search("", dn2, SCOPE_BASE, attrs)
1051         self.assertEquals(len(res), 1)
1052         self.assertEquals(str(res[0].dn), dn2)
1053         self.assertEquals(res[0]["description"], "test")
1054         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1055         self.assertEquals(res[0]["sambaNextRid"], "1001")
1056         self.assertEquals(res[0]["revision"], undefined)
1057
1058         # Delete split record
1059         ldb.delete(dn)
1060         # Check in mapped db
1061         res = ldb.search(dn, scope=SCOPE_BASE)
1062         self.assertEquals(len(res), 0)
1063         # Check in local db
1064         res = s4.db.search("", dn, SCOPE_BASE)
1065         self.assertEquals(len(res), 0)
1066         # Check in remote db
1067         res = s3.db.search("", dn2, SCOPE_BASE)
1068         self.assertEquals(len(res), 0)