Remove references to the unused @SUBCLASS feature.
[metze/samba/wip.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2007
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
24
25 import os
26 import sys
27 import samba
28 import ldb
29 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
30 from samba import Ldb, substitute_var
31 from samba.tests import LdbTestCase, TestCaseInTempDir
32
33 datadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3")
34
35 class MapBaseTestCase(TestCaseInTempDir):
36     def setup_data(self, obj, ldif):
37         self.assertTrue(ldif is not None)
38         obj.db.add_ldif(substitute_var(ldif, obj.substvars))
39
40     def setup_modules(self, ldb, s3, s4):
41         ldb.add({"dn": "@MAP=samba3sam",
42                  "@FROM": s4.basedn,
43                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
44
45         ldb.add({"dn": "@MODULES",
46                  "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
47
48         ldb.add({"dn": "@PARTITION",
49             "partition": [s4.basedn + ":" + s4.url, s3.basedn + ":" + s3.url],
50             "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
51
52     def setUp(self):
53         super(MapBaseTestCase, self).setUp()
54
55         def make_dn(basedn, rdn):
56             return rdn + ",sambaDomainName=TESTS," + basedn
57
58         def make_s4dn(basedn, rdn):
59             return rdn + "," + basedn
60
61         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
62         self.ldburl = "tdb://" + self.ldbfile
63
64         tempdir = self.tempdir
65         print tempdir
66
67         class Target:
68             """Simple helper class that contains data for a specific SAM connection."""
69             def __init__(self, file, basedn, dn):
70                 self.file = os.path.join(tempdir, file)
71                 self.url = "tdb://" + self.file
72                 self.basedn = basedn
73                 self.substvars = {"BASEDN": self.basedn}
74                 self.db = Ldb()
75                 self._dn = dn
76
77             def dn(self, rdn):
78                 return self._dn(rdn, self.basedn)
79
80             def connect(self):
81                 return self.db.connect(self.url)
82
83         self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
84         self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
85         self.templates = Target("templates.ldb", "cn=templates", None)
86
87         self.samba3.connect()
88         self.templates.connect()
89         self.samba4.connect()
90
91     def tearDown(self):
92         os.unlink(self.ldbfile)
93         os.unlink(self.samba3.file)
94         os.unlink(self.templates.file)
95         os.unlink(self.samba4.file)
96         super(MapBaseTestCase, self).tearDown()
97
98
99 class Samba3SamTestCase(MapBaseTestCase):
100     def setUp(self):
101         super(Samba3SamTestCase, self).setUp()
102         ldb = Ldb(self.ldburl)
103         self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read())
104         self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
105         ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
106         ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))
107         self.setup_modules(ldb, self.samba3, self.samba4)
108         self.ldb = Ldb(self.ldburl)
109
110     def test_s3sam_search(self):
111         print "Looking up by non-mapped attribute"
112         msg = self.ldb.search(expression="(cn=Administrator)")
113         self.assertEquals(len(msg), 1)
114         self.assertEquals(msg[0]["cn"], "Administrator")
115
116         print "Looking up by mapped attribute"
117         msg = self.ldb.search(expression="(name=Backup Operators)")
118         self.assertEquals(len(msg), 1)
119         self.assertEquals(msg[0]["name"], "Backup Operators")
120
121         print "Looking up by old name of renamed attribute"
122         msg = self.ldb.search(expression="(displayName=Backup Operators)")
123         self.assertEquals(len(msg), 0)
124
125         print "Looking up mapped entry containing SID"
126         msg = self.ldb.search(expression="(cn=Replicator)")
127         self.assertEquals(len(msg), 1)
128         print msg[0].dn
129         self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
130         self.assertEquals(msg[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
131
132         print "Checking mapping of objectClass"
133         oc = set(msg[0]["objectClass"])
134         self.assertTrue(oc is not None)
135         for i in oc:
136             self.assertEquals(oc[i] == "posixGroup" or oc[i], "group")
137
138         print "Looking up by objectClass"
139         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
140         self.assertEquals(len(msg), 2)
141         for i in range(len(msg)):
142             self.assertEquals((str(msg[i].dn), "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or
143                    (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl"))
144
145
146     def test_s3sam_modify(self):
147         print "Adding a record that will be fallbacked"
148         self.ldb.add({"dn": "cn=Foo", 
149             "foo": "bar", 
150             "blah": "Blie", 
151             "cn": "Foo", 
152             "showInAdvancedViewOnly": "TRUE"}
153             )
154
155         print "Checking for existence of record (local)"
156         # TODO: This record must be searched in the local database, which is currently only supported for base searches
157         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
158         # TODO: Actually, this version should work as well but doesn't...
159         # 
160         #    
161         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly'])
162         self.assertEquals(len(msg), 1)
163         self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
164         self.assertEquals(msg[0]["foo"], "bar")
165         self.assertEquals(msg[0]["blah"], "Blie")
166
167         print "Adding record that will be mapped"
168         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
169                  "objectClass": "user",
170                  "unixName": "bin",
171                  "sambaUnicodePwd": "geheim",
172                  "cn": "Niemand"})
173
174         print "Checking for existence of record (remote)"
175         msg = self.ldb.search(expression="(unixName=bin)", 
176                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
177         self.assertEquals(len(msg), 1)
178         self.assertEquals(msg[0]["cn"], "Niemand")
179         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
180
181         print "Checking for existence of record (local && remote)"
182         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
183                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
184         self.assertEquals(len(msg), 1)           # TODO: should check with more records
185         self.assertEquals(msg[0]["cn"], "Niemand")
186         self.assertEquals(msg[0]["unixName"], "bin")
187         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
188
189         print "Checking for existence of record (local || remote)"
190         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
191                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
192         print "got " + len(msg) + " replies"
193         self.assertEquals(len(msg), 1)        # TODO: should check with more records
194         self.assertEquals(msg[0]["cn"], "Niemand")
195         self.assertEquals(msg[0]["unixName"] == "bin" or msg[0]["sambaUnicodePwd"], "geheim")
196
197         print "Checking for data in destination database"
198         msg = s3.db.search("(cn=Niemand)")
199         self.assertTrue(len(msg) >= 1)
200         self.assertEquals(msg[0]["sambaSID"], "S-1-5-21-4231626423-2410014848-2360679739-2001")
201         self.assertEquals(msg[0]["displayName"], "Niemand")
202
203         print "Adding attribute..."
204         self.ldb.modify_ldif("""
205 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
206 changetype: modify
207 add: description
208 description: Blah
209 """)
210
211         print "Checking whether changes are still there..."
212         msg = self.ldb.search(expression="(cn=Niemand)")
213         self.assertTrue(len(msg) >= 1)
214         self.assertEquals(msg[0]["cn"], "Niemand")
215         self.assertEquals(msg[0]["description"], "Blah")
216
217         print "Modifying attribute..."
218         self.ldb.modify_ldif("""
219 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
220 changetype: modify
221 replace: description
222 description: Blie
223 """)
224
225         print "Checking whether changes are still there..."
226         msg = self.ldb.search(expression="(cn=Niemand)")
227         self.assertTrue(len(msg) >= 1)
228         self.assertEquals(msg[0]["description"], "Blie")
229
230         print "Deleting attribute..."
231         self.ldb.modify_ldif("""
232 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
233 changetype: modify
234 delete: description
235 """)
236
237         print "Checking whether changes are no longer there..."
238         msg = self.ldb.search(expression="(cn=Niemand)")
239         self.assertTrue(len(msg) >= 1)
240         self.assertTrue(not "description" in res[0])
241
242         print "Renaming record..."
243         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
244
245         print "Checking whether DN has changed..."
246         msg = self.ldb.search(expression="(cn=Niemand2)")
247         self.assertEquals(len(msg), 1)
248         self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
249
250         print "Deleting record..."
251         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
252
253         print "Checking whether record is gone..."
254         msg = self.ldb.search(expression="(cn=Niemand2)")
255         self.assertEquals(len(msg), 0)
256
257
258
259 class MapTestCase(MapBaseTestCase):
260     def setUp(self):
261         super(MapTestCase, self).setUp()
262         ldb = Ldb(self.ldburl)
263         self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
264         ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
265         ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))
266         self.setup_modules(ldb, self.samba3, self.samba4)
267         self.ldb = Ldb(self.ldburl)
268
269     def test_map_search(self):
270         print "Running search tests on mapped data"
271         ldif = """
272 dn: """ + "sambaDomainName=TESTS,""" + self.samba3.basedn + """
273 objectclass: sambaDomain
274 objectclass: top
275 sambaSID: S-1-5-21-4231626423-2410014848-2360679739
276 sambaNextRid: 2000
277 sambaDomainName: TESTS"""
278         self.samba3.db.add_ldif(substitute_var(ldif, self.samba3.substvars))
279
280         print "Add a set of split records"
281         ldif = """
282 dn: """ + self.samba4.dn("cn=X") + """
283 objectClass: user
284 cn: X
285 codePage: x
286 revision: x
287 dnsHostName: x
288 nextRid: y
289 lastLogon: x
290 description: x
291 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
292 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
293
294 dn: """ + self.samba4.dn("cn=Y") + """
295 objectClass: top
296 cn: Y
297 codePage: x
298 revision: x
299 dnsHostName: y
300 nextRid: y
301 lastLogon: y
302 description: x
303
304 dn: """ + self.samba4.dn("cn=Z") + """
305 objectClass: top
306 cn: Z
307 codePage: x
308 revision: y
309 dnsHostName: z
310 nextRid: y
311 lastLogon: z
312 description: y
313 """
314
315         self.ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))
316
317         print "Add a set of remote records"
318
319         ldif = """
320 dn: """ + self.samba3.dn("cn=A") + """
321 objectClass: posixAccount
322 cn: A
323 sambaNextRid: x
324 sambaBadPasswordCount: x
325 sambaLogonTime: x
326 description: x
327 sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552
328 sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512
329
330 dn: """ + self.samba3.dn("cn=B") + """
331 objectClass: top
332 cn:B
333 sambaNextRid: x
334 sambaBadPasswordCount: x
335 sambaLogonTime: y
336 description: x
337
338 dn: """ + self.samba3.dn("cn=C") + """
339 objectClass: top
340 cn: C
341 sambaNextRid: x
342 sambaBadPasswordCount: y
343 sambaLogonTime: z
344 description: y
345 """
346         self.samba3.add_ldif(substitute_var(ldif, self.samba3.substvars))
347
348         print "Testing search by DN"
349
350         # Search remote record by local DN
351         dn = self.samba4.dn("cn=A")
352         attrs = ["dnsHostName", "lastLogon"]
353         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
354         self.assertEquals(len(res), 1)
355         self.assertEquals(str(str(res[0].dn)), dn)
356         self.assertTrue(not "dnsHostName" in res[0])
357         self.assertEquals(res[0]["lastLogon"], "x")
358
359         # Search remote record by remote DN
360         dn = self.samba3.dn("cn=A")
361         attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
362         res = self.samba3.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
363         self.assertEquals(len(res), 1)
364         self.assertEquals(str(str(res[0].dn)), dn)
365         self.assertTrue(not "dnsHostName" in res[0])
366         self.assertTrue(not "lastLogon" in res[0])
367         self.assertEquals(res[0]["sambaLogonTime"], "x")
368
369         # Search split record by local DN
370         dn = self.samba4.dn("cn=X")
371         attrs = ["dnsHostName", "lastLogon"]
372         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
373         self.assertEquals(len(res), 1)
374         self.assertEquals(str(str(res[0].dn)), dn)
375         self.assertEquals(res[0]["dnsHostName"], "x")
376         self.assertEquals(res[0]["lastLogon"], "x")
377
378         # Search split record by remote DN
379         dn = self.samba3.dn("cn=X")
380         attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
381         res = self.samba3.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
382         self.assertEquals(len(res), 1)
383         self.assertEquals(str(str(res[0].dn)), dn)
384         self.assertTrue(not "dnsHostName" in res[0])
385         self.assertTrue(not "lastLogon" in res[0])
386         self.assertEquals(res[0]["sambaLogonTime"], "x")
387
388         print "Testing search by attribute"
389
390         # Search by ignored attribute
391         attrs = ["dnsHostName", "lastLogon"]
392         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, attrs=attrs)
393         self.assertEquals(len(res), 2)
394         self.assertEquals(str(str(res[0].dn)), self.samba4.dn("cn=Y"))
395         self.assertEquals(res[0]["dnsHostName"], "y")
396         self.assertEquals(res[0]["lastLogon"], "y")
397         self.assertEquals(str(str(res[1].dn)), self.samba4.dn("cn=X"))
398         self.assertEquals(res[1]["dnsHostName"], "x")
399         self.assertEquals(res[1]["lastLogon"], "x")
400
401         # Search by kept attribute
402         attrs = ["dnsHostName", "lastLogon"]
403         res = self.ldb.search(expression="(description=y)", scope=SCOPE_DEFAULT, attrs=attrs)
404         self.assertEquals(len(res), 2)
405         self.assertEquals(str(str(res[0].dn)), self.samba4.dn("cn=Z"))
406         self.assertEquals(res[0]["dnsHostName"], "z")
407         self.assertEquals(res[0]["lastLogon"], "z")
408         self.assertEquals(str(str(res[1].dn)), self.samba4.dn("cn=C"))
409         self.assertTrue(not "dnsHostName" in res[1])
410         self.assertEquals(res[1]["lastLogon"], "z")
411
412         # Search by renamed attribute
413         attrs = ["dnsHostName", "lastLogon"]
414         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, attrs=attrs)
415         self.assertEquals(len(res), 2)
416         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
417         self.assertTrue(not "dnsHostName" in res[0])
418         self.assertEquals(res[0]["lastLogon"], "y")
419         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
420         self.assertTrue(not "dnsHostName" in res[1])
421         self.assertEquals(res[1]["lastLogon"], "x")
422
423         # Search by converted attribute
424         attrs = ["dnsHostName", "lastLogon", "objectSid"]
425         # TODO:
426         #   Using the SID directly in the parse tree leads to conversion
427         #   errors, letting the search fail with no results.
428         #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
429         res = self.ldb.search(expression="(objectSid=*)", attrs=attrs)
430         self.assertEquals(len(res), 3)
431         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
432         self.assertEquals(res[0]["dnsHostName"], "x")
433         self.assertEquals(res[0]["lastLogon"], "x")
434         self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
435         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
436         self.assertTrue(not "dnsHostName" in res[1])
437         self.assertEquals(res[1]["lastLogon"], "x")
438         self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
439
440         # Search by generated attribute 
441         # In most cases, this even works when the mapping is missing
442         # a `convert_operator' by enumerating the remote db.
443         attrs = ["dnsHostName", "lastLogon", "primaryGroupID"]
444         res = self.ldb.search(expression="(primaryGroupID=512)", attrs=attrs)
445         self.assertEquals(len(res), 1)
446         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
447         self.assertTrue(not "dnsHostName" in res[0])
448         self.assertEquals(res[0]["lastLogon"], "x")
449         self.assertEquals(res[0]["primaryGroupID"], "512")
450
451         # TODO: There should actually be two results, A and X.  The
452         # primaryGroupID of X seems to get corrupted somewhere, and the
453         # objectSid isn't available during the generation of remote (!) data,
454         # which can be observed with the following search.  Also note that Xs
455         # objectSid seems to be fine in the previous search for objectSid... */
456         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
457         #print len(res) + " results found"
458         #for i in range(len(res)):
459         #    for (obj in res[i]) {
460         #        print obj + ": " + res[i][obj]
461         #    }
462         #    print "---"
463         #    
464
465         # Search by remote name of renamed attribute */
466         attrs = ["dnsHostName", "lastLogon"]
467         res = self.ldb.search(expression="(sambaBadPasswordCount=*)", attrs=attrs)
468         self.assertEquals(len(res), 0)
469
470         # Search by objectClass
471         attrs = ["dnsHostName", "lastLogon", "objectClass"]
472         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
473         self.assertEquals(len(res), 2)
474         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
475         self.assertEquals(res[0]["dnsHostName"], "x")
476         self.assertEquals(res[0]["lastLogon"], "x")
477         self.assertTrue(res[0]["objectClass"] is not None)
478         self.assertEquals(res[0]["objectClass"][0], "user")
479         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
480         self.assertTrue(not "dnsHostName" in res[1])
481         self.assertEquals(res[1]["lastLogon"], "x")
482         self.assertTrue(res[1]["objectClass"] is not None)
483         self.assertEquals(res[1]["objectClass"][0], "user")
484
485         # Prove that the objectClass is actually used for the search
486         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs)
487         self.assertEquals(len(res), 3)
488         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
489         self.assertTrue(not "dnsHostName" in res[0])
490         self.assertEquals(res[0]["lastLogon"], "y")
491         self.assertTrue(res[0]["objectClass"] is not None)
492         for oc in set(res[0]["objectClass"]):
493             self.assertEquals(oc, "user")
494         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
495         self.assertEquals(res[1]["dnsHostName"], "x")
496         self.assertEquals(res[1]["lastLogon"], "x")
497         self.assertTrue(res[1]["objectClass"] is not None)
498         self.assertEquals(res[1]["objectClass"][0], "user")
499         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
500         self.assertTrue(not "dnsHostName" in res[2])
501         self.assertEquals(res[2]["lastLogon"], "x")
502         self.assertTrue(res[2]["objectClass"] is not None)
503         self.assertEquals(res[2]["objectClass"][0], "user")
504
505         print "Testing search by parse tree"
506
507         # Search by conjunction of local attributes
508         attrs = ["dnsHostName", "lastLogon"]
509         res = self.ldb.search(expression="(&(codePage=x)(revision=x))", attrs=attrs)
510         self.assertEquals(len(res), 2)
511         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
512         self.assertEquals(res[0]["dnsHostName"], "y")
513         self.assertEquals(res[0]["lastLogon"], "y")
514         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
515         self.assertEquals(res[1]["dnsHostName"], "x")
516         self.assertEquals(res[1]["lastLogon"], "x")
517
518         # Search by conjunction of remote attributes
519         attrs = ["dnsHostName", "lastLogon"]
520         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=attrs)
521         self.assertEquals(len(res), 2)
522         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
523         self.assertEquals(res[0]["dnsHostName"], "x")
524         self.assertEquals(res[0]["lastLogon"], "x")
525         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
526         self.assertTrue(not "dnsHostName" in res[1])
527         self.assertEquals(res[1]["lastLogon"], "x")
528         
529         # Search by conjunction of local and remote attribute 
530         attrs = ["dnsHostName", "lastLogon"]
531         res = self.ldb.search(expression="(&(codePage=x)(description=x))", attrs=attrs)
532         self.assertEquals(len(res), 2)
533         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
534         self.assertEquals(res[0]["dnsHostName"], "y")
535         self.assertEquals(res[0]["lastLogon"], "y")
536         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
537         self.assertEquals(res[1]["dnsHostName"], "x")
538         self.assertEquals(res[1]["lastLogon"], "x")
539
540         # Search by conjunction of local and remote attribute w/o match
541         attrs = ["dnsHostName", "lastLogon"]
542         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs)
543         self.assertEquals(len(res), 0)
544         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs)
545         self.assertEquals(len(res), 0)
546
547         # Search by disjunction of local attributes
548         attrs = ["dnsHostName", "lastLogon"]
549         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=attrs)
550         self.assertEquals(len(res), 2)
551         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
552         self.assertEquals(res[0]["dnsHostName"], "y")
553         self.assertEquals(res[0]["lastLogon"], "y")
554         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
555         self.assertEquals(res[1]["dnsHostName"], "x")
556         self.assertEquals(res[1]["lastLogon"], "x")
557
558         # Search by disjunction of remote attributes
559         attrs = ["dnsHostName", "lastLogon"]
560         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=attrs)
561         self.assertEquals(len(res), 3)
562         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
563         self.assertTrue("dnsHostName" in res[0])
564         self.assertEquals(res[0]["lastLogon"], "y")
565         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
566         self.assertEquals(res[1]["dnsHostName"], "x")
567         self.assertEquals(res[1]["lastLogon"], "x")
568         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
569         self.assertTrue("dnsHostName" in res[2])
570         self.assertEquals(res[2]["lastLogon"], "x")
571
572         # Search by disjunction of local and remote attribute
573         attrs = ["dnsHostName", "lastLogon"]
574         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=attrs)
575         self.assertEquals(len(res), 3)
576         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
577         self.assertEquals(res[0]["dnsHostName"], "y")
578         self.assertEquals(res[0]["lastLogon"], "y")
579         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
580         self.assertTrue("dnsHostName" in res[1])
581         self.assertEquals(res[1]["lastLogon"], "y")
582         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
583         self.assertEquals(res[2]["dnsHostName"], "x")
584         self.assertEquals(res[2]["lastLogon"], "x")
585
586         # Search by disjunction of local and remote attribute w/o match
587         attrs = ["dnsHostName", "lastLogon"]
588         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=attrs)
589         self.assertEquals(len(res), 0)
590
591         # Search by negated local attribute
592         attrs = ["dnsHostName", "lastLogon"]
593         res = self.ldb.search(expression="(!(revision=x))", attrs=attrs)
594         self.assertEquals(len(res), 5)
595         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
596         self.assertTrue(not "dnsHostName" in res[0])
597         self.assertEquals(res[0]["lastLogon"], "y")
598         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
599         self.assertTrue(not "dnsHostName" in res[1])
600         self.assertEquals(res[1]["lastLogon"], "x")
601         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
602         self.assertEquals(res[2]["dnsHostName"], "z")
603         self.assertEquals(res[2]["lastLogon"], "z")
604         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
605         self.assertTrue(not "dnsHostName" in res[3])
606         self.assertEquals(res[3]["lastLogon"], "z")
607
608         # Search by negated remote attribute
609         attrs = ["dnsHostName", "lastLogon"]
610         res = self.ldb.search(expression="(!(description=x))", attrs=attrs)
611         self.assertEquals(len(res), 3)
612         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
613         self.assertEquals(res[0]["dnsHostName"], "z")
614         self.assertEquals(res[0]["lastLogon"], "z")
615         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
616         self.assertTrue(not "dnsHostName" in res[1])
617         self.assertEquals(res[1]["lastLogon"], "z")
618
619         # Search by negated conjunction of local attributes
620         attrs = ["dnsHostName", "lastLogon"]
621         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=attrs)
622         self.assertEquals(len(res), 5)
623         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
624         self.assertTrue(not "dnsHostName" in res[0])
625         self.assertEquals(res[0]["lastLogon"], "y")
626         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
627         self.assertTrue(not "dnsHostName" in res[1])
628         self.assertEquals(res[1]["lastLogon"], "x")
629         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
630         self.assertEquals(res[2]["dnsHostName"], "z")
631         self.assertEquals(res[2]["lastLogon"], "z")
632         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
633         self.assertTrue(not "dnsHostName" in res[3])
634         self.assertEquals(res[3]["lastLogon"], "z")
635
636         # Search by negated conjunction of remote attributes
637         attrs = ["dnsHostName", "lastLogon"]
638         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=attrs)
639         self.assertEquals(len(res), 5)
640         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
641         self.assertEquals(res[0]["dnsHostName"], "y")
642         self.assertEquals(res[0]["lastLogon"], "y")
643         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
644         self.assertTrue(not "dnsHostName" in res[1])
645         self.assertEquals(res[1]["lastLogon"], "y")
646         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
647         self.assertEquals(res[2]["dnsHostName"], "z")
648         self.assertEquals(res[2]["lastLogon"], "z")
649         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
650         self.assertTrue(not "dnsHostName" in res[3])
651         self.assertEquals(res[3]["lastLogon"], "z")
652
653         # Search by negated conjunction of local and remote attribute
654         attrs = ["dnsHostName", "lastLogon"]
655         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=attrs)
656         self.assertEquals(len(res), 5)
657         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
658         self.assertTrue(not "dnsHostName" in res[0])
659         self.assertEquals(res[0]["lastLogon"], "y")
660         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
661         self.assertTrue(not "dnsHostName" in res[1])
662         self.assertEquals(res[1]["lastLogon"], "x")
663         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
664         self.assertEquals(res[2]["dnsHostName"], "z")
665         self.assertEquals(res[2]["lastLogon"], "z")
666         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
667         self.assertTrue(not "dnsHostName" in res[3])
668         self.assertEquals(res[3]["lastLogon"], "z")
669
670         # Search by negated disjunction of local attributes
671         attrs = ["dnsHostName", "lastLogon"]
672         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=attrs)
673         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
674         self.assertTrue(not "dnsHostName" in res[0])
675         self.assertEquals(res[0]["lastLogon"], "y")
676         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
677         self.assertTrue(not "dnsHostName" in res[1])
678         self.assertEquals(res[1]["lastLogon"], "x")
679         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
680         self.assertEquals(res[2]["dnsHostName"], "z")
681         self.assertEquals(res[2]["lastLogon"], "z")
682         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
683         self.assertTrue(not "dnsHostName" in res[3])
684         self.assertEquals(res[3]["lastLogon"], "z")
685
686         # Search by negated disjunction of remote attributes
687         attrs = ["dnsHostName", "lastLogon"]
688         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=attrs)
689         self.assertEquals(len(res), 4)
690         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
691         self.assertEquals(res[0]["dnsHostName"], "y")
692         self.assertEquals(res[0]["lastLogon"], "y")
693         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
694         self.assertEquals(res[1]["dnsHostName"], "z")
695         self.assertEquals(res[1]["lastLogon"], "z")
696         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
697         self.assertTrue(not "dnsHostName" in res[2])
698         self.assertEquals(res[2]["lastLogon"], "z")
699
700         # Search by negated disjunction of local and remote attribute
701         attrs = ["dnsHostName", "lastLogon"]
702         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", attrs=attrs)
703         self.assertEquals(len(res), 4)
704         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
705         self.assertTrue(not "dnsHostName" in res[0])
706         self.assertEquals(res[0]["lastLogon"], "x")
707         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
708         self.assertEquals(res[1]["dnsHostName"], "z")
709         self.assertEquals(res[1]["lastLogon"], "z")
710         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
711         self.assertTrue(not "dnsHostName" in res[2])
712         self.assertEquals(res[2]["lastLogon"], "z")
713
714         print "Search by complex parse tree"
715         attrs = ["dnsHostName", "lastLogon"]
716         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=attrs)
717         self.assertEquals(len(res), 6)
718         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
719         self.assertTrue(not "dnsHostName" in res[0])
720         self.assertEquals(res[0]["lastLogon"], "y")
721         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
722         self.assertEquals(res[1]["dnsHostName"], "x")
723         self.assertEquals(res[1]["lastLogon"], "x")
724         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
725         self.assertTrue(not "dnsHostName" in res[2])
726         self.assertEquals(res[2]["lastLogon"], "x")
727         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
728         self.assertEquals(res[3]["dnsHostName"], "z")
729         self.assertEquals(res[3]["lastLogon"], "z")
730         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
731         self.assertTrue(not "dnsHostName" in res[4])
732         self.assertEquals(res[4]["lastLogon"], "z")
733
734         # Clean up
735         dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
736         for dn in dns:
737             self.ldb.delete(dn)
738
739     def test_map_modify_local(self):
740         """Modification of local records."""
741         # Add local record
742         dn = "cn=test,dc=idealx,dc=org"
743         self.ldb.add({"dn": dn, 
744                  "cn": "test",
745                  "foo": "bar",
746                  "revision": "1",
747                  "description": "test"})
748         # Check it's there
749         attrs = ["foo", "revision", "description"]
750         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
751         self.assertEquals(len(res), 1)
752         self.assertEquals(str(res[0].dn), dn)
753         self.assertEquals(res[0]["foo"], "bar")
754         self.assertEquals(res[0]["revision"], "1")
755         self.assertEquals(res[0]["description"], "test")
756         # Check it's not in the local db
757         res = self.samba4.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs)
758         self.assertEquals(len(res), 0)
759         # Check it's not in the remote db
760         res = self.samba3.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs)
761         self.assertEquals(len(res), 0)
762
763         # Modify local record
764         ldif = """
765 dn: """ + dn + """
766 replace: foo
767 foo: baz
768 replace: description
769 description: foo
770 """
771         self.ldb.modify_ldif(ldif)
772         # Check in local db
773         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
774         self.assertEquals(len(res), 1)
775         self.assertEquals(str(res[0].dn), dn)
776         self.assertEquals(res[0]["foo"], "baz")
777         self.assertEquals(res[0]["revision"], "1")
778         self.assertEquals(res[0]["description"], "foo")
779
780         # Rename local record
781         dn2 = "cn=toast,dc=idealx,dc=org"
782         self.ldb.rename(dn, dn2)
783         # Check in local db
784         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
785         self.assertEquals(len(res), 1)
786         self.assertEquals(str(res[0].dn), dn2)
787         self.assertEquals(res[0]["foo"], "baz")
788         self.assertEquals(res[0]["revision"], "1")
789         self.assertEquals(res[0]["description"], "foo")
790
791         # Delete local record
792         self.ldb.delete(dn2)
793         # Check it's gone
794         res = self.ldb.search(dn2, scope=SCOPE_BASE)
795         self.assertEquals(len(res), 0)
796
797     def test_map_modify_remote_remote(self):
798         """Modification of remote data of remote records"""
799         # Add remote record
800         dn = self.samba4.dn("cn=test")
801         dn2 = self.samba3.dn("cn=test")
802         self.samba3.db.add({"dn": dn2, 
803                    "cn": "test",
804                    "description": "foo",
805                    "sambaBadPasswordCount": "3",
806                    "sambaNextRid": "1001"})
807         # Check it's there
808         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
809         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
810         self.assertEquals(len(res), 1)
811         self.assertEquals(str(res[0].dn), dn2)
812         self.assertEquals(res[0]["description"], "foo")
813         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
814         self.assertEquals(res[0]["sambaNextRid"], "1001")
815         # Check in mapped db
816         attrs = ["description", "badPwdCount", "nextRid"]
817         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
818         self.assertEquals(len(res), 1)
819         self.assertEquals(str(res[0].dn), dn)
820         self.assertEquals(res[0]["description"], "foo")
821         self.assertEquals(res[0]["badPwdCount"], "3")
822         self.assertEquals(res[0]["nextRid"], "1001")
823         # Check in local db
824         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
825         self.assertEquals(len(res), 0)
826
827         # Modify remote data of remote record
828         ldif = """
829 dn: """ + dn + """
830 replace: description
831 description: test
832 replace: badPwdCount
833 badPwdCount: 4
834 """
835         self.ldb.modify_ldif(ldif)
836         # Check in mapped db
837         attrs = ["description", "badPwdCount", "nextRid"]
838         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
839         self.assertEquals(len(res), 1)
840         self.assertEquals(str(res[0].dn), dn)
841         self.assertEquals(res[0]["description"], "test")
842         self.assertEquals(res[0]["badPwdCount"], "4")
843         self.assertEquals(res[0]["nextRid"], "1001")
844         # Check in remote db
845         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
846         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
847         self.assertEquals(len(res), 1)
848         self.assertEquals(str(res[0].dn), dn2)
849         self.assertEquals(res[0]["description"], "test")
850         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
851         self.assertEquals(res[0]["sambaNextRid"], "1001")
852
853         # Rename remote record
854         dn2 = self.samba4.dn("cn=toast")
855         self.ldb.rename(dn, dn2)
856         # Check in mapped db
857         dn = dn2
858         attrs = ["description", "badPwdCount", "nextRid"]
859         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
860         self.assertEquals(len(res), 1)
861         self.assertEquals(str(res[0].dn), dn)
862         self.assertEquals(res[0]["description"], "test")
863         self.assertEquals(res[0]["badPwdCount"], "4")
864         self.assertEquals(res[0]["nextRid"], "1001")
865         # Check in remote db 
866         dn2 = self.samba3.dn("cn=toast")
867         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
868         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
869         self.assertEquals(len(res), 1)
870         self.assertEquals(str(res[0].dn), dn2)
871         self.assertEquals(res[0]["description"], "test")
872         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
873         self.assertEquals(res[0]["sambaNextRid"], "1001")
874
875         # Delete remote record
876         self.ldb.delete(dn)
877         # Check in mapped db
878         res = self.ldb.search(dn, scope=SCOPE_BASE)
879         self.assertEquals(len(res), 0)
880         # Check in remote db
881         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
882         self.assertEquals(len(res), 0)
883
884     def test_map_modify_remote_local(self):
885         """Modification of local data of remote records"""
886         # Add remote record (same as before)
887         dn = self.samba4.dn("cn=test")
888         dn2 = self.samba3.dn("cn=test")
889         self.samba3.db.add({"dn": dn2, 
890                    "cn": "test",
891                    "description": "foo",
892                    "sambaBadPasswordCount": "3",
893                    "sambaNextRid": "1001"})
894
895         # Modify local data of remote record
896         ldif = """
897 dn: """ + dn + """
898 add: revision
899 revision: 1
900 replace: description
901 description: test
902 """
903         self.ldb.modify_ldif(ldif)
904         # Check in mapped db
905         attrs = ["revision", "description"]
906         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
907         self.assertEquals(len(res), 1)
908         self.assertEquals(str(res[0].dn), dn)
909         self.assertEquals(res[0]["description"], "test")
910         self.assertEquals(res[0]["revision"], "1")
911         # Check in remote db
912         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
913         self.assertEquals(len(res), 1)
914         self.assertEquals(str(res[0].dn), dn2)
915         self.assertEquals(res[0]["description"], "test")
916         self.assertTrue(not "revision" in res[0])
917         # Check in local db
918         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
919         self.assertEquals(len(res), 1)
920         self.assertEquals(str(res[0].dn), dn)
921         self.assertTrue(not "description" in res[0])
922         self.assertEquals(res[0]["revision"], "1")
923
924         # Delete (newly) split record
925         self.ldb.delete(dn)
926
927     def test_map_modify_split(self):
928         """Testing modification of split records"""
929         # Add split record
930         dn = self.samba4.dn("cn=test")
931         dn2 = self.samba3.dn("cn=test")
932         self.ldb.add({
933             "dn": dn,
934             "cn": "test",
935             "description": "foo",
936             "badPwdCount": "3",
937             "nextRid": "1001",
938             "revision": "1"})
939         # Check it's there
940         attrs = ["description", "badPwdCount", "nextRid", "revision"]
941         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
942         self.assertEquals(len(res), 1)
943         self.assertEquals(str(res[0].dn), dn)
944         self.assertEquals(res[0]["description"], "foo")
945         self.assertEquals(res[0]["badPwdCount"], "3")
946         self.assertEquals(res[0]["nextRid"], "1001")
947         self.assertEquals(res[0]["revision"], "1")
948         # Check in local db
949         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
950         self.assertEquals(len(res), 1)
951         self.assertEquals(str(res[0].dn), dn)
952         self.assertTrue(not "description" in res[0])
953         self.assertTrue(not "badPwdCount" in res[0])
954         self.assertTrue(not "nextRid" in res[0])
955         self.assertEquals(res[0]["revision"], "1")
956         # Check in remote db
957         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
958         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
959         self.assertEquals(len(res), 1)
960         self.assertEquals(str(res[0].dn), dn2)
961         self.assertEquals(res[0]["description"], "foo")
962         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
963         self.assertEquals(res[0]["sambaNextRid"], "1001")
964         self.assertTrue(not "revision" in res[0])
965
966         # Modify of split record
967         ldif = """
968 dn: """ + dn + """
969 replace: description
970 description: test
971 replace: badPwdCount
972 badPwdCount: 4
973 replace: revision
974 revision: 2
975 """
976         self.ldb.modify_ldif(ldif)
977         # Check in mapped db
978         attrs = ["description", "badPwdCount", "nextRid", "revision"]
979         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
980         self.assertEquals(len(res), 1)
981         self.assertEquals(str(res[0].dn), dn)
982         self.assertEquals(res[0]["description"], "test")
983         self.assertEquals(res[0]["badPwdCount"], "4")
984         self.assertEquals(res[0]["nextRid"], "1001")
985         self.assertEquals(res[0]["revision"], "2")
986         # Check in local db
987         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
988         self.assertEquals(len(res), 1)
989         self.assertEquals(str(res[0].dn), dn)
990         self.assertTrue(not "description" in res[0])
991         self.assertTrue(not "badPwdCount" in res[0])
992         self.assertTrue(not "nextRid" in res[0])
993         self.assertEquals(res[0]["revision"], "2")
994         # Check in remote db
995         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
996         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
997         self.assertEquals(len(res), 1)
998         self.assertEquals(str(res[0].dn), dn2)
999         self.assertEquals(res[0]["description"], "test")
1000         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1001         self.assertEquals(res[0]["sambaNextRid"], "1001")
1002         self.assertTrue(not "revision" in res[0])
1003
1004         # Rename split record
1005         dn2 = self.samba4.dn("cn=toast")
1006         self.ldb.rename(dn, dn2)
1007         # Check in mapped db
1008         dn = dn2
1009         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1010         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1011         self.assertEquals(len(res), 1)
1012         self.assertEquals(str(res[0].dn), dn)
1013         self.assertEquals(res[0]["description"], "test")
1014         self.assertEquals(res[0]["badPwdCount"], "4")
1015         self.assertEquals(res[0]["nextRid"], "1001")
1016         self.assertEquals(res[0]["revision"], "2")
1017         # Check in local db
1018         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1019         self.assertEquals(len(res), 1)
1020         self.assertEquals(str(res[0].dn), dn)
1021         self.assertTrue(not "description" in res[0])
1022         self.assertTrue(not "badPwdCount" in res[0])
1023         self.assertTrue(not "nextRid" in res[0])
1024         self.assertEquals(res[0]["revision"], "2")
1025         # Check in remote db
1026         dn2 = self.samba3.dn("cn=toast")
1027         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
1028         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1029         self.assertEquals(len(res), 1)
1030         self.assertEquals(str(res[0].dn), dn2)
1031         self.assertEquals(res[0]["description"], "test")
1032         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1033         self.assertEquals(res[0]["sambaNextRid"], "1001")
1034         self.assertTrue(not "revision" in res[0])
1035
1036         # Delete split record
1037         self.ldb.delete(dn)
1038         # Check in mapped db
1039         res = self.ldb.search(dn, scope=SCOPE_BASE)
1040         self.assertEquals(len(res), 0)
1041         # Check in local db
1042         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1043         self.assertEquals(len(res), 0)
1044         # Check in remote db
1045         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1046         self.assertEquals(len(res), 0)