Fix last samba3sam.py test.
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
24
25 import os
26 import ldb
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir
30
31 datadir = os.path.join(os.path.dirname(__file__), 
32                        "../../../../../testdata/samba3")
33
34 def read_datafile(filename):
35     return open(os.path.join(datadir, filename), 'r').read()
36
37 def ldb_debug(l, text):
38     print text
39
40
41 class MapBaseTestCase(TestCaseInTempDir):
42     """Base test case for mapping tests."""
43
44     def setup_modules(self, ldb, s3, s4):
45         ldb.add({"dn": "@MAP=samba3sam",
46                  "@FROM": s4.basedn,
47                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
48
49         ldb.add({"dn": "@MODULES",
50                  "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
51
52         ldb.add({"dn": "@PARTITION",
53             "partition": ["%s:%s" % (s4.basedn, s4.url), 
54                           "%s:%s" % (s3.basedn, s3.url)],
55             "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
56
57     def setUp(self):
58         super(MapBaseTestCase, self).setUp()
59
60         def make_dn(basedn, rdn):
61             return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
62
63         def make_s4dn(basedn, rdn):
64             return "%s,%s" % (rdn, basedn)
65
66         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
67         self.ldburl = "tdb://" + self.ldbfile
68
69         tempdir = self.tempdir
70
71         class Target:
72             """Simple helper class that contains data for a specific SAM 
73             connection."""
74             def __init__(self, file, basedn, dn):
75                 self.file = os.path.join(tempdir, file)
76                 self.url = "tdb://" + self.file
77                 self.basedn = basedn
78                 self.substvars = {"BASEDN": self.basedn}
79                 self.db = Ldb()
80                 self._dn = dn
81
82             def dn(self, rdn):
83                 return self._dn(self.basedn, rdn)
84
85             def connect(self):
86                 return self.db.connect(self.url)
87
88             def setup_data(self, path):
89                 self.add_ldif(read_datafile(path))
90
91             def subst(self, text):
92                 return substitute_var(text, self.substvars)
93
94             def add_ldif(self, ldif):
95                 self.db.add_ldif(self.subst(ldif))
96
97             def modify_ldif(self, ldif):
98                 self.db.modify_ldif(self.subst(ldif))
99
100         self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
101         self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
102         self.templates = Target("templates.ldb", "cn=templates", None)
103
104         self.samba3.connect()
105         self.templates.connect()
106         self.samba4.connect()
107
108     def tearDown(self):
109         os.unlink(self.ldbfile)
110         os.unlink(self.samba3.file)
111         os.unlink(self.templates.file)
112         os.unlink(self.samba4.file)
113         super(MapBaseTestCase, self).tearDown()
114
115
116 class Samba3SamTestCase(MapBaseTestCase):
117
118     def setUp(self):
119         super(Samba3SamTestCase, self).setUp()
120         ldb = Ldb(self.ldburl)
121         self.samba3.setup_data("samba3.ldif")
122         self.templates.setup_data("provision_samba3sam_templates.ldif")
123         ldif = read_datafile("provision_samba3sam.ldif")
124         ldb.add_ldif(self.samba4.subst(ldif))
125         self.setup_modules(ldb, self.samba3, self.samba4)
126         del ldb
127         self.ldb = Ldb(self.ldburl)
128
129     def test_search_non_mapped(self):
130         """Looking up by non-mapped attribute"""
131         msg = self.ldb.search(expression="(cn=Administrator)")
132         self.assertEquals(len(msg), 1)
133         self.assertEquals(msg[0]["cn"], "Administrator")
134
135     def test_search_non_mapped(self):
136         """Looking up by mapped attribute"""
137         msg = self.ldb.search(expression="(name=Backup Operators)")
138         self.assertEquals(len(msg), 1)
139         self.assertEquals(msg[0]["name"], "Backup Operators")
140
141     def test_old_name_of_renamed(self):
142         """Looking up by old name of renamed attribute"""
143         msg = self.ldb.search(expression="(displayName=Backup Operators)")
144         self.assertEquals(len(msg), 0)
145
146     def test_mapped_containing_sid(self):
147         """Looking up mapped entry containing SID"""
148         msg = self.ldb.search(expression="(cn=Replicator)")
149         self.assertEquals(len(msg), 1)
150         self.assertEquals(str(msg[0].dn), 
151                           "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
152         self.assertTrue("objectSid" in msg[0]) 
153         # FIXME: NDR unpack msg[0]["objectSid"] before comparing:
154         # self.assertEquals(msg[0]["objectSid"], 
155         #                   "S-1-5-21-4231626423-2410014848-2360679739-552")
156         # Check mapping of objectClass
157         oc = set(msg[0]["objectClass"])
158         self.assertEquals(oc, set(["group"]))
159
160     def test_search_by_objclass(self):
161         """Looking up by objectClass"""
162         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
163         self.assertEquals(set([str(m.dn) for m in msg]), 
164                 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", 
165                      "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
166
167     def test_s3sam_modify(self):
168         # Adding a record that will be fallbacked
169         self.ldb.add({"dn": "cn=Foo", 
170             "foo": "bar", 
171             "blah": "Blie", 
172             "cn": "Foo", 
173             "showInAdvancedViewOnly": "TRUE"}
174             )
175
176         # Checking for existence of record (local)
177         # TODO: This record must be searched in the local database, which is 
178         # currently only supported for base searches
179         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
180         # TODO: Actually, this version should work as well but doesn't...
181         # 
182         #    
183         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", 
184                 scope=SCOPE_BASE, 
185                 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
186         self.assertEquals(len(msg), 1)
187         self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
188         self.assertEquals(msg[0]["foo"], "bar")
189         self.assertEquals(msg[0]["blah"], "Blie")
190
191         # Adding record that will be mapped
192         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
193                  "objectClass": "user",
194                  "unixName": "bin",
195                  "sambaUnicodePwd": "geheim",
196                  "cn": "Niemand"})
197
198         # Checking for existence of record (remote)
199         msg = self.ldb.search(expression="(unixName=bin)", 
200                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
201         self.assertEquals(len(msg), 1)
202         self.assertEquals(msg[0]["cn"], "Niemand")
203         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
204
205         # Checking for existence of record (local && remote)
206         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
207                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
208         self.assertEquals(len(msg), 1)           # TODO: should check with more records
209         self.assertEquals(msg[0]["cn"], "Niemand")
210         self.assertEquals(msg[0]["unixName"], "bin")
211         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
212
213         # Checking for existence of record (local || remote)
214         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
215                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
216         #print "got %d replies" % len(msg)
217         self.assertEquals(len(msg), 1)        # TODO: should check with more records
218         self.assertEquals(msg[0]["cn"], "Niemand")
219         self.assertEquals(msg[0]["unixName"], "bin")
220         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
221
222         # Checking for data in destination database
223         msg = self.samba3.db.search(expression="(cn=Niemand)")
224         self.assertTrue(len(msg) >= 1)
225         self.assertEquals(msg[0]["sambaSID"], 
226                 "S-1-5-21-4231626423-2410014848-2360679739-2001")
227         self.assertEquals(msg[0]["displayName"], "Niemand")
228
229         # Adding attribute...
230         self.ldb.modify_ldif("""
231 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
232 changetype: modify
233 add: description
234 description: Blah
235 """)
236
237         # Checking whether changes are still there...
238         msg = self.ldb.search(expression="(cn=Niemand)")
239         self.assertTrue(len(msg) >= 1)
240         self.assertEquals(msg[0]["cn"], "Niemand")
241         self.assertEquals(msg[0]["description"], "Blah")
242
243         # Modifying attribute...
244         self.ldb.modify_ldif("""
245 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
246 changetype: modify
247 replace: description
248 description: Blie
249 """)
250
251         # Checking whether changes are still there...
252         msg = self.ldb.search(expression="(cn=Niemand)")
253         self.assertTrue(len(msg) >= 1)
254         self.assertEquals(msg[0]["description"], "Blie")
255
256         # Deleting attribute...
257         self.ldb.modify_ldif("""
258 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
259 changetype: modify
260 delete: description
261 """)
262
263         # Checking whether changes are no longer there...
264         msg = self.ldb.search(expression="(cn=Niemand)")
265         self.assertTrue(len(msg) >= 1)
266         self.assertTrue(not "description" in msg[0])
267
268         # Renaming record...
269         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", 
270                         "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
271
272         # Checking whether DN has changed...
273         msg = self.ldb.search(expression="(cn=Niemand2)")
274         self.assertEquals(len(msg), 1)
275         self.assertEquals(str(msg[0].dn), 
276                           "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
277
278         # Deleting record...
279         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
280
281         # Checking whether record is gone...
282         msg = self.ldb.search(expression="(cn=Niemand2)")
283         self.assertEquals(len(msg), 0)
284
285
286 class MapTestCase(MapBaseTestCase):
287
288     def setUp(self):
289         super(MapTestCase, self).setUp()
290         ldb = Ldb(self.ldburl)
291         self.templates.setup_data("provision_samba3sam_templates.ldif")
292         ldif = read_datafile("provision_samba3sam.ldif")
293         ldb.add_ldif(self.samba4.subst(ldif))
294         self.setup_modules(ldb, self.samba3, self.samba4)
295         del ldb
296         self.ldb = Ldb(self.ldburl)
297
298     def test_map_search(self):
299         """Running search tests on mapped data."""
300         self.samba3.db.add({
301             "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
302             "objectclass": ["sambaDomain", "top"],
303             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
304             "sambaNextRid": "2000",
305             "sambaDomainName": "TESTS"
306             })
307
308         # Add a set of split records
309         self.ldb.add_ldif("""
310 dn: """+ self.samba4.dn("cn=X") + """
311 objectClass: user
312 cn: X
313 codePage: x
314 revision: x
315 dnsHostName: x
316 nextRid: y
317 lastLogon: x
318 description: x
319 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
320 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
321
322 """)
323
324         self.ldb.add({
325             "dn": self.samba4.dn("cn=Y"),
326             "objectClass": "top",
327             "cn": "Y",
328             "codePage": "x",
329             "revision": "x",
330             "dnsHostName": "y",
331             "nextRid": "y",
332             "lastLogon": "y",
333             "description": "x"})
334
335         self.ldb.add({
336             "dn": self.samba4.dn("cn=Z"),
337             "objectClass": "top",
338             "cn": "Z",
339             "codePage": "x",
340             "revision": "y",
341             "dnsHostName": "z",
342             "nextRid": "y",
343             "lastLogon": "z",
344             "description": "y"})
345
346         # Add a set of remote records
347
348         self.samba3.db.add({
349             "dn": self.samba3.dn("cn=A"),
350             "objectClass": "posixAccount",
351             "cn": "A",
352             "sambaNextRid": "x",
353             "sambaBadPasswordCount": "x", 
354             "sambaLogonTime": "x",
355             "description": "x",
356             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
357             "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
358
359         self.samba3.db.add({
360             "dn": self.samba3.dn("cn=B"),
361             "objectClass": "top",
362             "cn": "B",
363             "sambaNextRid": "x",
364             "sambaBadPasswordCount": "x",
365             "sambaLogonTime": "y",
366             "description": "x"})
367
368         self.samba3.db.add({
369             "dn": self.samba3.dn("cn=C"),
370             "objectClass": "top",
371             "cn": "C",
372             "sambaNextRid": "x",
373             "sambaBadPasswordCount": "y",
374             "sambaLogonTime": "z",
375             "description": "y"})
376
377         # Testing search by DN
378
379         # Search remote record by local DN
380         dn = self.samba4.dn("cn=A")
381         res = self.ldb.search(dn, scope=SCOPE_BASE, 
382                 attrs=["dnsHostName", "lastLogon"])
383         self.assertEquals(len(res), 1)
384         self.assertEquals(str(res[0].dn), dn)
385         self.assertTrue(not "dnsHostName" in res[0])
386         self.assertEquals(res[0]["lastLogon"], "x")
387
388         # Search remote record by remote DN
389         dn = self.samba3.dn("cn=A")
390         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
391                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
392         self.assertEquals(len(res), 1)
393         self.assertEquals(str(res[0].dn), dn)
394         self.assertTrue(not "dnsHostName" in res[0])
395         self.assertTrue(not "lastLogon" in res[0])
396         self.assertEquals(res[0]["sambaLogonTime"], "x")
397
398         # Search split record by local DN
399         dn = self.samba4.dn("cn=X")
400         res = self.ldb.search(dn, scope=SCOPE_BASE, 
401                 attrs=["dnsHostName", "lastLogon"])
402         self.assertEquals(len(res), 1)
403         self.assertEquals(str(res[0].dn), dn)
404         self.assertEquals(res[0]["dnsHostName"], "x")
405         self.assertEquals(res[0]["lastLogon"], "x")
406
407         # Search split record by remote DN
408         dn = self.samba3.dn("cn=X")
409         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
410                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
411         self.assertEquals(len(res), 1)
412         self.assertEquals(str(res[0].dn), dn)
413         self.assertTrue(not "dnsHostName" in res[0])
414         self.assertTrue(not "lastLogon" in res[0])
415         self.assertEquals(res[0]["sambaLogonTime"], "x")
416
417         # Testing search by attribute
418
419         # Search by ignored attribute
420         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, 
421                 attrs=["dnsHostName", "lastLogon"])
422         self.assertEquals(len(res), 2)
423         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
424         self.assertEquals(res[0]["dnsHostName"], "y")
425         self.assertEquals(res[0]["lastLogon"], "y")
426         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
427         self.assertEquals(res[1]["dnsHostName"], "x")
428         self.assertEquals(res[1]["lastLogon"], "x")
429
430         # Search by kept attribute
431         res = self.ldb.search(expression="(description=y)", 
432                 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
433         self.assertEquals(len(res), 2)
434         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
435         self.assertEquals(res[0]["dnsHostName"], "z")
436         self.assertEquals(res[0]["lastLogon"], "z")
437         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
438         self.assertTrue(not "dnsHostName" in res[1])
439         self.assertEquals(res[1]["lastLogon"], "z")
440
441         # Search by renamed attribute
442         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
443                               attrs=["dnsHostName", "lastLogon"])
444         self.assertEquals(len(res), 2)
445         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
446         self.assertTrue(not "dnsHostName" in res[0])
447         self.assertEquals(res[0]["lastLogon"], "y")
448         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
449         self.assertTrue(not "dnsHostName" in res[1])
450         self.assertEquals(res[1]["lastLogon"], "x")
451
452         # Search by converted attribute
453         # TODO:
454         #   Using the SID directly in the parse tree leads to conversion
455         #   errors, letting the search fail with no results.
456         #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
457         res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
458         self.assertEquals(len(res), 3)
459         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
460         self.assertEquals(res[0]["dnsHostName"], "x")
461         self.assertEquals(res[0]["lastLogon"], "x")
462         # FIXME:Properly compare sid,requires converting between NDR encoding 
463         # and string
464         #self.assertEquals(res[0]["objectSid"], 
465         #                  "S-1-5-21-4231626423-2410014848-2360679739-552")
466         self.assertTrue("objectSid" in res[0])
467         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
468         self.assertTrue(not "dnsHostName" in res[1])
469         self.assertEquals(res[1]["lastLogon"], "x")
470         # FIXME: Properly compare sid,see above
471         #self.assertEquals(res[1]["objectSid"], 
472         #                  "S-1-5-21-4231626423-2410014848-2360679739-552")
473         self.assertTrue("objectSid" in res[1])
474
475         # Search by generated attribute 
476         # In most cases, this even works when the mapping is missing
477         # a `convert_operator' by enumerating the remote db.
478         res = self.ldb.search(expression="(primaryGroupID=512)", 
479                            attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
480         self.assertEquals(len(res), 1)
481         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
482         self.assertTrue(not "dnsHostName" in res[0])
483         self.assertEquals(res[0]["lastLogon"], "x")
484         self.assertEquals(res[0]["primaryGroupID"], "512")
485
486         # TODO: There should actually be two results, A and X.  The
487         # primaryGroupID of X seems to get corrupted somewhere, and the
488         # objectSid isn't available during the generation of remote (!) data,
489         # which can be observed with the following search.  Also note that Xs
490         # objectSid seems to be fine in the previous search for objectSid... */
491         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
492         #print len(res) + " results found"
493         #for i in range(len(res)):
494         #    for (obj in res[i]) {
495         #        print obj + ": " + res[i][obj]
496         #    }
497         #    print "---"
498         #    
499
500         # Search by remote name of renamed attribute */
501         res = self.ldb.search(expression="(sambaBadPasswordCount=*)", 
502                               attrs=["dnsHostName", "lastLogon"])
503         self.assertEquals(len(res), 0)
504
505         # Search by objectClass
506         attrs = ["dnsHostName", "lastLogon", "objectClass"]
507         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
508         self.assertEquals(len(res), 2)
509         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
510         self.assertEquals(res[0]["dnsHostName"], "x")
511         self.assertEquals(res[0]["lastLogon"], "x")
512         self.assertEquals(res[0]["objectClass"][0], "user")
513         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
514         self.assertTrue(not "dnsHostName" in res[1])
515         self.assertEquals(res[1]["lastLogon"], "x")
516         self.assertEquals(res[1]["objectClass"][0], "user")
517
518         # Prove that the objectClass is actually used for the search
519         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
520                               attrs=attrs)
521         self.assertEquals(len(res), 3)
522         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
523         self.assertTrue(not "dnsHostName" in res[0])
524         self.assertEquals(res[0]["lastLogon"], "y")
525         self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
526         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
527         self.assertEquals(res[1]["dnsHostName"], "x")
528         self.assertEquals(res[1]["lastLogon"], "x")
529         self.assertEquals(res[1]["objectClass"][0], "user")
530         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
531         self.assertTrue(not "dnsHostName" in res[2])
532         self.assertEquals(res[2]["lastLogon"], "x")
533         self.assertEquals(res[2]["objectClass"][0], "user")
534
535         # Testing search by parse tree
536
537         # Search by conjunction of local attributes
538         res = self.ldb.search(expression="(&(codePage=x)(revision=x))", 
539                               attrs=["dnsHostName", "lastLogon"])
540         self.assertEquals(len(res), 2)
541         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
542         self.assertEquals(res[0]["dnsHostName"], "y")
543         self.assertEquals(res[0]["lastLogon"], "y")
544         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
545         self.assertEquals(res[1]["dnsHostName"], "x")
546         self.assertEquals(res[1]["lastLogon"], "x")
547
548         # Search by conjunction of remote attributes
549         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", 
550                               attrs=["dnsHostName", "lastLogon"])
551         self.assertEquals(len(res), 2)
552         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
553         self.assertEquals(res[0]["dnsHostName"], "x")
554         self.assertEquals(res[0]["lastLogon"], "x")
555         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
556         self.assertTrue(not "dnsHostName" in res[1])
557         self.assertEquals(res[1]["lastLogon"], "x")
558         
559         # Search by conjunction of local and remote attribute 
560         res = self.ldb.search(expression="(&(codePage=x)(description=x))", 
561                               attrs=["dnsHostName", "lastLogon"])
562         self.assertEquals(len(res), 2)
563         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
564         self.assertEquals(res[0]["dnsHostName"], "y")
565         self.assertEquals(res[0]["lastLogon"], "y")
566         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
567         self.assertEquals(res[1]["dnsHostName"], "x")
568         self.assertEquals(res[1]["lastLogon"], "x")
569
570         # Search by conjunction of local and remote attribute w/o match
571         attrs = ["dnsHostName", "lastLogon"]
572         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", 
573                               attrs=attrs)
574         self.assertEquals(len(res), 0)
575         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", 
576                               attrs=attrs)
577         self.assertEquals(len(res), 0)
578
579         # Search by disjunction of local attributes
580         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", 
581                               attrs=["dnsHostName", "lastLogon"])
582         self.assertEquals(len(res), 2)
583         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
584         self.assertEquals(res[0]["dnsHostName"], "y")
585         self.assertEquals(res[0]["lastLogon"], "y")
586         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
587         self.assertEquals(res[1]["dnsHostName"], "x")
588         self.assertEquals(res[1]["lastLogon"], "x")
589
590         # Search by disjunction of remote attributes
591         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", 
592                               attrs=["dnsHostName", "lastLogon"])
593         self.assertEquals(len(res), 3)
594         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
595         self.assertFalse("dnsHostName" in res[0])
596         self.assertEquals(res[0]["lastLogon"], "y")
597         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
598         self.assertEquals(res[1]["dnsHostName"], "x")
599         self.assertEquals(res[1]["lastLogon"], "x")
600         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
601         self.assertFalse("dnsHostName" in res[2])
602         self.assertEquals(res[2]["lastLogon"], "x")
603
604         # Search by disjunction of local and remote attribute
605         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", 
606                               attrs=["dnsHostName", "lastLogon"])
607         self.assertEquals(len(res), 3)
608         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
609         self.assertEquals(res[0]["dnsHostName"], "y")
610         self.assertEquals(res[0]["lastLogon"], "y")
611         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
612         self.assertFalse("dnsHostName" in res[1])
613         self.assertEquals(res[1]["lastLogon"], "y")
614         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
615         self.assertEquals(res[2]["dnsHostName"], "x")
616         self.assertEquals(res[2]["lastLogon"], "x")
617
618         # Search by disjunction of local and remote attribute w/o match
619         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", 
620                               attrs=["dnsHostName", "lastLogon"])
621         self.assertEquals(len(res), 0)
622
623         # Search by negated local attribute
624         res = self.ldb.search(expression="(!(revision=x))", 
625                               attrs=["dnsHostName", "lastLogon"])
626         self.assertEquals(len(res), 5)
627         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
628         self.assertTrue(not "dnsHostName" in res[0])
629         self.assertEquals(res[0]["lastLogon"], "y")
630         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
631         self.assertTrue(not "dnsHostName" in res[1])
632         self.assertEquals(res[1]["lastLogon"], "x")
633         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
634         self.assertEquals(res[2]["dnsHostName"], "z")
635         self.assertEquals(res[2]["lastLogon"], "z")
636         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
637         self.assertTrue(not "dnsHostName" in res[3])
638         self.assertEquals(res[3]["lastLogon"], "z")
639
640         # Search by negated remote attribute
641         res = self.ldb.search(expression="(!(description=x))", 
642                               attrs=["dnsHostName", "lastLogon"])
643         self.assertEquals(len(res), 3)
644         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
645         self.assertEquals(res[0]["dnsHostName"], "z")
646         self.assertEquals(res[0]["lastLogon"], "z")
647         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
648         self.assertTrue(not "dnsHostName" in res[1])
649         self.assertEquals(res[1]["lastLogon"], "z")
650
651         # Search by negated conjunction of local attributes
652         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", 
653                               attrs=["dnsHostName", "lastLogon"])
654         self.assertEquals(len(res), 5)
655         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
656         self.assertTrue(not "dnsHostName" in res[0])
657         self.assertEquals(res[0]["lastLogon"], "y")
658         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
659         self.assertTrue(not "dnsHostName" in res[1])
660         self.assertEquals(res[1]["lastLogon"], "x")
661         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
662         self.assertEquals(res[2]["dnsHostName"], "z")
663         self.assertEquals(res[2]["lastLogon"], "z")
664         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
665         self.assertTrue(not "dnsHostName" in res[3])
666         self.assertEquals(res[3]["lastLogon"], "z")
667
668         # Search by negated conjunction of remote attributes
669         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", 
670                               attrs=["dnsHostName", "lastLogon"])
671         self.assertEquals(len(res), 5)
672         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
673         self.assertEquals(res[0]["dnsHostName"], "y")
674         self.assertEquals(res[0]["lastLogon"], "y")
675         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
676         self.assertTrue(not "dnsHostName" in res[1])
677         self.assertEquals(res[1]["lastLogon"], "y")
678         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
679         self.assertEquals(res[2]["dnsHostName"], "z")
680         self.assertEquals(res[2]["lastLogon"], "z")
681         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
682         self.assertTrue(not "dnsHostName" in res[3])
683         self.assertEquals(res[3]["lastLogon"], "z")
684
685         # Search by negated conjunction of local and remote attribute
686         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", 
687                               attrs=["dnsHostName", "lastLogon"])
688         self.assertEquals(len(res), 5)
689         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
690         self.assertTrue(not "dnsHostName" in res[0])
691         self.assertEquals(res[0]["lastLogon"], "y")
692         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
693         self.assertTrue(not "dnsHostName" in res[1])
694         self.assertEquals(res[1]["lastLogon"], "x")
695         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
696         self.assertEquals(res[2]["dnsHostName"], "z")
697         self.assertEquals(res[2]["lastLogon"], "z")
698         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
699         self.assertTrue(not "dnsHostName" in res[3])
700         self.assertEquals(res[3]["lastLogon"], "z")
701
702         # Search by negated disjunction of local attributes
703         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", 
704                               attrs=["dnsHostName", "lastLogon"])
705         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
706         self.assertTrue(not "dnsHostName" in res[0])
707         self.assertEquals(res[0]["lastLogon"], "y")
708         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
709         self.assertTrue(not "dnsHostName" in res[1])
710         self.assertEquals(res[1]["lastLogon"], "x")
711         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
712         self.assertEquals(res[2]["dnsHostName"], "z")
713         self.assertEquals(res[2]["lastLogon"], "z")
714         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
715         self.assertTrue(not "dnsHostName" in res[3])
716         self.assertEquals(res[3]["lastLogon"], "z")
717
718         # Search by negated disjunction of remote attributes
719         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", 
720                               attrs=["dnsHostName", "lastLogon"])
721         self.assertEquals(len(res), 4)
722         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
723         self.assertEquals(res[0]["dnsHostName"], "y")
724         self.assertEquals(res[0]["lastLogon"], "y")
725         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
726         self.assertEquals(res[1]["dnsHostName"], "z")
727         self.assertEquals(res[1]["lastLogon"], "z")
728         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
729         self.assertTrue(not "dnsHostName" in res[2])
730         self.assertEquals(res[2]["lastLogon"], "z")
731
732         # Search by negated disjunction of local and remote attribute
733         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", 
734                               attrs=["dnsHostName", "lastLogon"])
735         self.assertEquals(len(res), 4)
736         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
737         self.assertTrue(not "dnsHostName" in res[0])
738         self.assertEquals(res[0]["lastLogon"], "x")
739         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
740         self.assertEquals(res[1]["dnsHostName"], "z")
741         self.assertEquals(res[1]["lastLogon"], "z")
742         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
743         self.assertTrue(not "dnsHostName" in res[2])
744         self.assertEquals(res[2]["lastLogon"], "z")
745
746         # Search by complex parse tree
747         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
748         self.assertEquals(len(res), 6)
749         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
750         self.assertTrue(not "dnsHostName" in res[0])
751         self.assertEquals(res[0]["lastLogon"], "y")
752         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
753         self.assertEquals(res[1]["dnsHostName"], "x")
754         self.assertEquals(res[1]["lastLogon"], "x")
755         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
756         self.assertTrue(not "dnsHostName" in res[2])
757         self.assertEquals(res[2]["lastLogon"], "x")
758         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
759         self.assertEquals(res[3]["dnsHostName"], "z")
760         self.assertEquals(res[3]["lastLogon"], "z")
761         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
762         self.assertTrue(not "dnsHostName" in res[4])
763         self.assertEquals(res[4]["lastLogon"], "z")
764
765         # Clean up
766         dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
767         for dn in dns:
768             self.ldb.delete(dn)
769
770     def test_map_modify_local(self):
771         """Modification of local records."""
772         # Add local record
773         dn = "cn=test,dc=idealx,dc=org"
774         self.ldb.add({"dn": dn, 
775                  "cn": "test",
776                  "foo": "bar",
777                  "revision": "1",
778                  "description": "test"})
779         # Check it's there
780         attrs = ["foo", "revision", "description"]
781         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
782         self.assertEquals(len(res), 1)
783         self.assertEquals(str(res[0].dn), dn)
784         self.assertEquals(res[0]["foo"], "bar")
785         self.assertEquals(res[0]["revision"], "1")
786         self.assertEquals(res[0]["description"], "test")
787         # Check it's not in the local db
788         res = self.samba4.db.search(expression="(cn=test)", 
789                                     scope=SCOPE_DEFAULT, attrs=attrs)
790         self.assertEquals(len(res), 0)
791         # Check it's not in the remote db
792         res = self.samba3.db.search(expression="(cn=test)", 
793                                     scope=SCOPE_DEFAULT, attrs=attrs)
794         self.assertEquals(len(res), 0)
795
796         # Modify local record
797         ldif = """
798 dn: """ + dn + """
799 replace: foo
800 foo: baz
801 replace: description
802 description: foo
803 """
804         self.ldb.modify_ldif(ldif)
805         # Check in local db
806         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
807         self.assertEquals(len(res), 1)
808         self.assertEquals(str(res[0].dn), dn)
809         self.assertEquals(res[0]["foo"], "baz")
810         self.assertEquals(res[0]["revision"], "1")
811         self.assertEquals(res[0]["description"], "foo")
812
813         # Rename local record
814         dn2 = "cn=toast,dc=idealx,dc=org"
815         self.ldb.rename(dn, dn2)
816         # Check in local db
817         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
818         self.assertEquals(len(res), 1)
819         self.assertEquals(str(res[0].dn), dn2)
820         self.assertEquals(res[0]["foo"], "baz")
821         self.assertEquals(res[0]["revision"], "1")
822         self.assertEquals(res[0]["description"], "foo")
823
824         # Delete local record
825         self.ldb.delete(dn2)
826         # Check it's gone
827         res = self.ldb.search(dn2, scope=SCOPE_BASE)
828         self.assertEquals(len(res), 0)
829
830     def test_map_modify_remote_remote(self):
831         """Modification of remote data of remote records"""
832         # Add remote record
833         dn = self.samba4.dn("cn=test")
834         dn2 = self.samba3.dn("cn=test")
835         self.samba3.db.add({"dn": dn2, 
836                    "cn": "test",
837                    "description": "foo",
838                    "sambaBadPasswordCount": "3",
839                    "sambaNextRid": "1001"})
840         # Check it's there
841         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
842                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
843         self.assertEquals(len(res), 1)
844         self.assertEquals(str(res[0].dn), dn2)
845         self.assertEquals(res[0]["description"], "foo")
846         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
847         self.assertEquals(res[0]["sambaNextRid"], "1001")
848         # Check in mapped db
849         attrs = ["description", "badPwdCount", "nextRid"]
850         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
851         self.assertEquals(len(res), 1)
852         self.assertEquals(str(res[0].dn), dn)
853         self.assertEquals(res[0]["description"], "foo")
854         self.assertEquals(res[0]["badPwdCount"], "3")
855         self.assertEquals(res[0]["nextRid"], "1001")
856         # Check in local db
857         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
858         self.assertEquals(len(res), 0)
859
860         # Modify remote data of remote record
861         ldif = """
862 dn: """ + dn + """
863 replace: description
864 description: test
865 replace: badPwdCount
866 badPwdCount: 4
867 """
868         self.ldb.modify_ldif(ldif)
869         # Check in mapped db
870         res = self.ldb.search(dn, scope=SCOPE_BASE, 
871                 attrs=["description", "badPwdCount", "nextRid"])
872         self.assertEquals(len(res), 1)
873         self.assertEquals(str(res[0].dn), dn)
874         self.assertEquals(res[0]["description"], "test")
875         self.assertEquals(res[0]["badPwdCount"], "4")
876         self.assertEquals(res[0]["nextRid"], "1001")
877         # Check in remote db
878         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
879                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
880         self.assertEquals(len(res), 1)
881         self.assertEquals(str(res[0].dn), dn2)
882         self.assertEquals(res[0]["description"], "test")
883         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
884         self.assertEquals(res[0]["sambaNextRid"], "1001")
885
886         # Rename remote record
887         dn2 = self.samba4.dn("cn=toast")
888         self.ldb.rename(dn, dn2)
889         # Check in mapped db
890         dn = dn2
891         res = self.ldb.search(dn, scope=SCOPE_BASE, 
892                 attrs=["description", "badPwdCount", "nextRid"])
893         self.assertEquals(len(res), 1)
894         self.assertEquals(str(res[0].dn), dn)
895         self.assertEquals(res[0]["description"], "test")
896         self.assertEquals(res[0]["badPwdCount"], "4")
897         self.assertEquals(res[0]["nextRid"], "1001")
898         # Check in remote db 
899         dn2 = self.samba3.dn("cn=toast")
900         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
901                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
902         self.assertEquals(len(res), 1)
903         self.assertEquals(str(res[0].dn), dn2)
904         self.assertEquals(res[0]["description"], "test")
905         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
906         self.assertEquals(res[0]["sambaNextRid"], "1001")
907
908         # Delete remote record
909         self.ldb.delete(dn)
910         # Check in mapped db that it's removed
911         res = self.ldb.search(dn, scope=SCOPE_BASE)
912         self.assertEquals(len(res), 0)
913         # Check in remote db
914         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
915         self.assertEquals(len(res), 0)
916
917     def test_map_modify_remote_local(self):
918         """Modification of local data of remote records"""
919         # Add remote record (same as before)
920         dn = self.samba4.dn("cn=test")
921         dn2 = self.samba3.dn("cn=test")
922         self.samba3.db.add({"dn": dn2, 
923                    "cn": "test",
924                    "description": "foo",
925                    "sambaBadPasswordCount": "3",
926                    "sambaNextRid": "1001"})
927
928         # Modify local data of remote record
929         ldif = """
930 dn: """ + dn + """
931 add: revision
932 revision: 1
933 replace: description
934 description: test
935
936 """
937         self.ldb.modify_ldif(ldif)
938         # Check in mapped db
939         attrs = ["revision", "description"]
940         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
941         self.assertEquals(len(res), 1)
942         self.assertEquals(str(res[0].dn), dn)
943         self.assertEquals(res[0]["description"], "test")
944         self.assertEquals(res[0]["revision"], "1")
945         # Check in remote db
946         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
947         self.assertEquals(len(res), 1)
948         self.assertEquals(str(res[0].dn), dn2)
949         self.assertEquals(res[0]["description"], "test")
950         self.assertTrue(not "revision" in res[0])
951         # Check in local db
952         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
953         self.assertEquals(len(res), 1)
954         self.assertEquals(str(res[0].dn), dn)
955         self.assertTrue(not "description" in res[0])
956         self.assertEquals(res[0]["revision"], "1")
957
958         # Delete (newly) split record
959         self.ldb.delete(dn)
960
961     def test_map_modify_split(self):
962         """Testing modification of split records"""
963         # Add split record
964         dn = self.samba4.dn("cn=test")
965         dn2 = self.samba3.dn("cn=test")
966         self.ldb.add({
967             "dn": dn,
968             "cn": "test",
969             "description": "foo",
970             "badPwdCount": "3",
971             "nextRid": "1001",
972             "revision": "1"})
973         # Check it's there
974         attrs = ["description", "badPwdCount", "nextRid", "revision"]
975         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
976         self.assertEquals(len(res), 1)
977         self.assertEquals(str(res[0].dn), dn)
978         self.assertEquals(res[0]["description"], "foo")
979         self.assertEquals(res[0]["badPwdCount"], "3")
980         self.assertEquals(res[0]["nextRid"], "1001")
981         self.assertEquals(res[0]["revision"], "1")
982         # Check in local db
983         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
984         self.assertEquals(len(res), 1)
985         self.assertEquals(str(res[0].dn), dn)
986         self.assertTrue(not "description" in res[0])
987         self.assertTrue(not "badPwdCount" in res[0])
988         self.assertTrue(not "nextRid" in res[0])
989         self.assertEquals(res[0]["revision"], "1")
990         # Check in remote db
991         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
992                  "revision"]
993         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
994         self.assertEquals(len(res), 1)
995         self.assertEquals(str(res[0].dn), dn2)
996         self.assertEquals(res[0]["description"], "foo")
997         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
998         self.assertEquals(res[0]["sambaNextRid"], "1001")
999         self.assertTrue(not "revision" in res[0])
1000
1001         # Modify of split record
1002         ldif = """
1003 dn: """ + dn + """
1004 replace: description
1005 description: test
1006 replace: badPwdCount
1007 badPwdCount: 4
1008 replace: revision
1009 revision: 2
1010 """
1011         self.ldb.modify_ldif(ldif)
1012         # Check in mapped db
1013         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1014         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1015         self.assertEquals(len(res), 1)
1016         self.assertEquals(str(res[0].dn), dn)
1017         self.assertEquals(res[0]["description"], "test")
1018         self.assertEquals(res[0]["badPwdCount"], "4")
1019         self.assertEquals(res[0]["nextRid"], "1001")
1020         self.assertEquals(res[0]["revision"], "2")
1021         # Check in local db
1022         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1023         self.assertEquals(len(res), 1)
1024         self.assertEquals(str(res[0].dn), dn)
1025         self.assertTrue(not "description" in res[0])
1026         self.assertTrue(not "badPwdCount" in res[0])
1027         self.assertTrue(not "nextRid" in res[0])
1028         self.assertEquals(res[0]["revision"], "2")
1029         # Check in remote db
1030         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
1031                  "revision"]
1032         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1033         self.assertEquals(len(res), 1)
1034         self.assertEquals(str(res[0].dn), dn2)
1035         self.assertEquals(res[0]["description"], "test")
1036         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1037         self.assertEquals(res[0]["sambaNextRid"], "1001")
1038         self.assertTrue(not "revision" in res[0])
1039
1040         # Rename split record
1041         dn2 = self.samba4.dn("cn=toast")
1042         self.ldb.rename(dn, dn2)
1043         # Check in mapped db
1044         dn = dn2
1045         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1046         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1047         self.assertEquals(len(res), 1)
1048         self.assertEquals(str(res[0].dn), dn)
1049         self.assertEquals(res[0]["description"], "test")
1050         self.assertEquals(res[0]["badPwdCount"], "4")
1051         self.assertEquals(res[0]["nextRid"], "1001")
1052         self.assertEquals(res[0]["revision"], "2")
1053         # Check in local db
1054         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1055         self.assertEquals(len(res), 1)
1056         self.assertEquals(str(res[0].dn), dn)
1057         self.assertTrue(not "description" in res[0])
1058         self.assertTrue(not "badPwdCount" in res[0])
1059         self.assertTrue(not "nextRid" in res[0])
1060         self.assertEquals(res[0]["revision"], "2")
1061         # Check in remote db
1062         dn2 = self.samba3.dn("cn=toast")
1063         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
1064           attrs=["description", "sambaBadPasswordCount", "sambaNextRid", 
1065                  "revision"])
1066         self.assertEquals(len(res), 1)
1067         self.assertEquals(str(res[0].dn), dn2)
1068         self.assertEquals(res[0]["description"], "test")
1069         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1070         self.assertEquals(res[0]["sambaNextRid"], "1001")
1071         self.assertTrue(not "revision" in res[0])
1072
1073         # Delete split record
1074         self.ldb.delete(dn)
1075         # Check in mapped db
1076         res = self.ldb.search(dn, scope=SCOPE_BASE)
1077         self.assertEquals(len(res), 0)
1078         # Check in local db
1079         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1080         self.assertEquals(len(res), 0)
1081         # Check in remote db
1082         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1083         self.assertEquals(len(res), 0)