s4-samba3samtest: we need to force netbios name as well
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
24
25 import os
26 import ldb
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm
30 import samba.dcerpc.security
31 import samba.ndr
32 from samba.auth import system_session
33 from samba import param
34
35 datadir = os.path.join(os.path.dirname(__file__), 
36                        "../../../../../testdata/samba3")
37
38 def read_datafile(filename):
39     return open(os.path.join(datadir, filename), 'r').read()
40
41 def ldb_debug(l, text):
42     print text
43
44
45 class MapBaseTestCase(TestCaseInTempDir):
46     """Base test case for mapping tests."""
47
48     def setup_modules(self, ldb, s3, s4):
49         ldb.add({"dn": "@MAP=samba3sam",
50                  "@FROM": s4.basedn,
51                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
52
53         ldb.add({"dn": "@MODULES",
54                  "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,partition"})
55
56         ldb.add({"dn": "@PARTITION",
57             "partition": ["%s" % (s4.basedn_casefold), 
58                           "%s" % (s3.basedn_casefold)],
59             "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
60             "modules": "*:"})
61
62     def setUp(self):
63         cmdline_loadparm.set("sid generator", "backend")
64         cmdline_loadparm.set("workgroup", "TESTS")
65         cmdline_loadparm.set("netbios name", "TESTS")
66         super(MapBaseTestCase, self).setUp()
67
68         def make_dn(basedn, rdn):
69             return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
70
71         def make_s4dn(basedn, rdn):
72             return "%s,%s" % (rdn, basedn)
73
74         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
75         self.ldburl = "tdb://" + self.ldbfile
76
77         tempdir = self.tempdir
78
79         class Target:
80             """Simple helper class that contains data for a specific SAM 
81             connection."""
82             def __init__(self, basedn, dn):
83                 self.db = Ldb(lp=cmdline_loadparm, session_info=system_session())
84                 self.basedn = basedn
85                 self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold()
86                 self.substvars = {"BASEDN": self.basedn}
87                 self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold)
88                 self.url = "tdb://" + self.file
89                 self._dn = dn
90
91             def dn(self, rdn):
92                 return self._dn(self.basedn, rdn)
93
94             def connect(self):
95                 return self.db.connect(self.url)
96
97             def setup_data(self, path):
98                 self.add_ldif(read_datafile(path))
99
100             def subst(self, text):
101                 return substitute_var(text, self.substvars)
102
103             def add_ldif(self, ldif):
104                 self.db.add_ldif(self.subst(ldif))
105
106             def modify_ldif(self, ldif):
107                 self.db.modify_ldif(self.subst(ldif))
108
109         self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn)
110         self.samba3 = Target("cn=Samba3Sam", make_dn)
111
112         self.samba3.connect()
113         self.samba4.connect()
114
115     def tearDown(self):
116         os.unlink(self.ldbfile)
117         os.unlink(self.samba3.file)
118         os.unlink(self.samba4.file)
119         super(MapBaseTestCase, self).tearDown()
120
121     def assertSidEquals(self, text, ndr_sid):
122         sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
123                                         str(ndr_sid[0]))
124         sid_obj2 = samba.dcerpc.security.dom_sid(text)
125         self.assertEquals(sid_obj1, sid_obj2)
126
127
128 class Samba3SamTestCase(MapBaseTestCase):
129
130     def setUp(self):
131         super(Samba3SamTestCase, self).setUp()
132         ldb = Ldb(self.ldburl, lp=cmdline_loadparm, session_info=system_session())
133         self.samba3.setup_data("samba3.ldif")
134         ldif = read_datafile("provision_samba3sam.ldif")
135         ldb.add_ldif(self.samba4.subst(ldif))
136         self.setup_modules(ldb, self.samba3, self.samba4)
137         del ldb
138         self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm, session_info=system_session())
139
140     def test_search_non_mapped(self):
141         """Looking up by non-mapped attribute"""
142         msg = self.ldb.search(expression="(cn=Administrator)")
143         self.assertEquals(len(msg), 1)
144         self.assertEquals(msg[0]["cn"], "Administrator")
145
146     def test_search_non_mapped(self):
147         """Looking up by mapped attribute"""
148         msg = self.ldb.search(expression="(name=Backup Operators)")
149         self.assertEquals(len(msg), 1)
150         self.assertEquals(str(msg[0]["name"]), "Backup Operators")
151
152     def test_old_name_of_renamed(self):
153         """Looking up by old name of renamed attribute"""
154         msg = self.ldb.search(expression="(displayName=Backup Operators)")
155         self.assertEquals(len(msg), 0)
156
157     def test_mapped_containing_sid(self):
158         """Looking up mapped entry containing SID"""
159         msg = self.ldb.search(expression="(cn=Replicator)")
160         self.assertEquals(len(msg), 1)
161         self.assertEquals(str(msg[0].dn), 
162                           "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
163         self.assertTrue("objectSid" in msg[0]) 
164         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
165                              msg[0]["objectSid"])
166         oc = set(msg[0]["objectClass"])
167         self.assertEquals(oc, set(["group"]))
168
169     def test_search_by_objclass(self):
170         """Looking up by objectClass"""
171         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
172         self.assertEquals(set([str(m.dn) for m in msg]), 
173                 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", 
174                      "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
175
176     def test_s3sam_modify(self):
177         # Adding a record that will be fallbacked
178         self.ldb.add({"dn": "cn=Foo", 
179             "foo": "bar", 
180             "blah": "Blie", 
181             "cn": "Foo", 
182             "showInAdvancedViewOnly": "TRUE"}
183             )
184
185         # Checking for existence of record (local)
186         # TODO: This record must be searched in the local database, which is 
187         # currently only supported for base searches
188         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
189         # TODO: Actually, this version should work as well but doesn't...
190         # 
191         #    
192         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", 
193                 scope=SCOPE_BASE, 
194                 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
195         self.assertEquals(len(msg), 1)
196         self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
197         self.assertEquals(str(msg[0]["foo"]), "bar")
198         self.assertEquals(str(msg[0]["blah"]), "Blie")
199
200         # Adding record that will be mapped
201         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
202                  "objectClass": "user",
203                  "unixName": "bin",
204                  "sambaUnicodePwd": "geheim",
205                  "cn": "Niemand"})
206
207         # Checking for existence of record (remote)
208         msg = self.ldb.search(expression="(unixName=bin)", 
209                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
210         self.assertEquals(len(msg), 1)
211         self.assertEquals(str(msg[0]["cn"]), "Niemand")
212         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
213
214         # Checking for existence of record (local && remote)
215         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
216                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
217         self.assertEquals(len(msg), 1)           # TODO: should check with more records
218         self.assertEquals(str(msg[0]["cn"]), "Niemand")
219         self.assertEquals(str(msg[0]["unixName"]), "bin")
220         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
221
222         # Checking for existence of record (local || remote)
223         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
224                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
225         #print "got %d replies" % len(msg)
226         self.assertEquals(len(msg), 1)        # TODO: should check with more records
227         self.assertEquals(str(msg[0]["cn"]), "Niemand")
228         self.assertEquals(str(msg[0]["unixName"]), "bin")
229         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
230
231         # Checking for data in destination database
232         msg = self.samba3.db.search(expression="(cn=Niemand)")
233         self.assertTrue(len(msg) >= 1)
234         self.assertEquals(str(msg[0]["sambaSID"]), 
235                 "S-1-5-21-4231626423-2410014848-2360679739-2001")
236         self.assertEquals(str(msg[0]["displayName"]), "Niemand")
237
238         # Adding attribute...
239         self.ldb.modify_ldif("""
240 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
241 changetype: modify
242 add: description
243 description: Blah
244 """)
245
246         # Checking whether changes are still there...
247         msg = self.ldb.search(expression="(cn=Niemand)")
248         self.assertTrue(len(msg) >= 1)
249         self.assertEquals(str(msg[0]["cn"]), "Niemand")
250         self.assertEquals(str(msg[0]["description"]), "Blah")
251
252         # Modifying attribute...
253         self.ldb.modify_ldif("""
254 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
255 changetype: modify
256 replace: description
257 description: Blie
258 """)
259
260         # Checking whether changes are still there...
261         msg = self.ldb.search(expression="(cn=Niemand)")
262         self.assertTrue(len(msg) >= 1)
263         self.assertEquals(str(msg[0]["description"]), "Blie")
264
265         # Deleting attribute...
266         self.ldb.modify_ldif("""
267 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
268 changetype: modify
269 delete: description
270 """)
271
272         # Checking whether changes are no longer there...
273         msg = self.ldb.search(expression="(cn=Niemand)")
274         self.assertTrue(len(msg) >= 1)
275         self.assertTrue(not "description" in msg[0])
276
277         # Renaming record...
278         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", 
279                         "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
280
281         # Checking whether DN has changed...
282         msg = self.ldb.search(expression="(cn=Niemand2)")
283         self.assertEquals(len(msg), 1)
284         self.assertEquals(str(msg[0].dn), 
285                           "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
286
287         # Deleting record...
288         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
289
290         # Checking whether record is gone...
291         msg = self.ldb.search(expression="(cn=Niemand2)")
292         self.assertEquals(len(msg), 0)
293
294
295 class MapTestCase(MapBaseTestCase):
296
297     def setUp(self):
298         super(MapTestCase, self).setUp()
299         ldb = Ldb(self.ldburl, lp=cmdline_loadparm, session_info=system_session())
300         ldif = read_datafile("provision_samba3sam.ldif")
301         ldb.add_ldif(self.samba4.subst(ldif))
302         self.setup_modules(ldb, self.samba3, self.samba4)
303         del ldb
304         self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm, session_info=system_session())
305
306     def test_map_search(self):
307         """Running search tests on mapped data."""
308         self.samba3.db.add({
309             "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
310             "objectclass": ["sambaDomain", "top"],
311             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
312             "sambaNextRid": "2000",
313             "sambaDomainName": "TESTS"
314             })
315
316         # Add a set of split records
317         self.ldb.add_ldif("""
318 dn: """+ self.samba4.dn("cn=Domain Users") + """
319 objectClass: group
320 cn: Domain Users
321 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
322 """)
323
324         # Add a set of split records
325         self.ldb.add_ldif("""
326 dn: """+ self.samba4.dn("cn=X") + """
327 objectClass: user
328 cn: X
329 codePage: x
330 revision: x
331 dnsHostName: x
332 nextRid: y
333 lastLogon: x
334 description: x
335 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
336 """)
337
338         self.ldb.add({
339             "dn": self.samba4.dn("cn=Y"),
340             "objectClass": "top",
341             "cn": "Y",
342             "codePage": "x",
343             "revision": "x",
344             "dnsHostName": "y",
345             "nextRid": "y",
346             "lastLogon": "y",
347             "description": "x"})
348
349         self.ldb.add({
350             "dn": self.samba4.dn("cn=Z"),
351             "objectClass": "top",
352             "cn": "Z",
353             "codePage": "x",
354             "revision": "y",
355             "dnsHostName": "z",
356             "nextRid": "y",
357             "lastLogon": "z",
358             "description": "y"})
359
360         # Add a set of remote records
361
362         self.samba3.db.add({
363             "dn": self.samba3.dn("cn=A"),
364             "objectClass": "posixAccount",
365             "cn": "A",
366             "sambaNextRid": "x",
367             "sambaBadPasswordCount": "x", 
368             "sambaLogonTime": "x",
369             "description": "x",
370             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
371             "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
372
373         self.samba3.db.add({
374             "dn": self.samba3.dn("cn=B"),
375             "objectClass": "top",
376             "cn": "B",
377             "sambaNextRid": "x",
378             "sambaBadPasswordCount": "x",
379             "sambaLogonTime": "y",
380             "description": "x"})
381
382         self.samba3.db.add({
383             "dn": self.samba3.dn("cn=C"),
384             "objectClass": "top",
385             "cn": "C",
386             "sambaNextRid": "x",
387             "sambaBadPasswordCount": "y",
388             "sambaLogonTime": "z",
389             "description": "y"})
390
391         # Testing search by DN
392
393         # Search remote record by local DN
394         dn = self.samba4.dn("cn=A")
395         res = self.ldb.search(dn, scope=SCOPE_BASE, 
396                 attrs=["dnsHostName", "lastLogon"])
397         self.assertEquals(len(res), 1)
398         self.assertEquals(str(res[0].dn), dn)
399         self.assertTrue(not "dnsHostName" in res[0])
400         self.assertEquals(str(res[0]["lastLogon"]), "x")
401
402         # Search remote record by remote DN
403         dn = self.samba3.dn("cn=A")
404         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
405                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
406         self.assertEquals(len(res), 1)
407         self.assertEquals(str(res[0].dn), dn)
408         self.assertTrue(not "dnsHostName" in res[0])
409         self.assertTrue(not "lastLogon" in res[0])
410         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
411
412         # Search split record by local DN
413         dn = self.samba4.dn("cn=X")
414         res = self.ldb.search(dn, scope=SCOPE_BASE, 
415                 attrs=["dnsHostName", "lastLogon"])
416         self.assertEquals(len(res), 1)
417         self.assertEquals(str(res[0].dn), dn)
418         self.assertEquals(str(res[0]["dnsHostName"]), "x")
419         self.assertEquals(str(res[0]["lastLogon"]), "x")
420
421         # Search split record by remote DN
422         dn = self.samba3.dn("cn=X")
423         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
424                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
425         self.assertEquals(len(res), 1)
426         self.assertEquals(str(res[0].dn), dn)
427         self.assertTrue(not "dnsHostName" in res[0])
428         self.assertTrue(not "lastLogon" in res[0])
429         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
430
431         # Testing search by attribute
432
433         # Search by ignored attribute
434         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, 
435                 attrs=["dnsHostName", "lastLogon"])
436         self.assertEquals(len(res), 2)
437         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
438         self.assertEquals(str(res[0]["dnsHostName"]), "y")
439         self.assertEquals(str(res[0]["lastLogon"]), "y")
440         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
441         self.assertEquals(str(res[1]["dnsHostName"]), "x")
442         self.assertEquals(str(res[1]["lastLogon"]), "x")
443
444         # Search by kept attribute
445         res = self.ldb.search(expression="(description=y)", 
446                 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
447         self.assertEquals(len(res), 2)
448         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
449         self.assertEquals(str(res[0]["dnsHostName"]), "z")
450         self.assertEquals(str(res[0]["lastLogon"]), "z")
451         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
452         self.assertTrue(not "dnsHostName" in res[1])
453         self.assertEquals(str(res[1]["lastLogon"]), "z")
454
455         # Search by renamed attribute
456         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
457                               attrs=["dnsHostName", "lastLogon"])
458         self.assertEquals(len(res), 2)
459         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
460         self.assertTrue(not "dnsHostName" in res[0])
461         self.assertEquals(str(res[0]["lastLogon"]), "y")
462         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
463         self.assertTrue(not "dnsHostName" in res[1])
464         self.assertEquals(str(res[1]["lastLogon"]), "x")
465
466         # Search by converted attribute
467         # TODO:
468         #   Using the SID directly in the parse tree leads to conversion
469         #   errors, letting the search fail with no results.
470         #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
471         res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
472         self.assertEquals(len(res), 4)
473         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
474         self.assertEquals(str(res[0]["dnsHostName"]), "x")
475         self.assertEquals(str(res[0]["lastLogon"]), "x")
476         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", 
477                              res[0]["objectSid"])
478         self.assertTrue("objectSid" in res[0])
479         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
480         self.assertTrue(not "dnsHostName" in res[1])
481         self.assertEquals(str(res[1]["lastLogon"]), "x")
482         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
483                              res[1]["objectSid"])
484         self.assertTrue("objectSid" in res[1])
485
486         # Search by generated attribute 
487         # In most cases, this even works when the mapping is missing
488         # a `convert_operator' by enumerating the remote db.
489         res = self.ldb.search(expression="(primaryGroupID=512)", 
490                            attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
491         self.assertEquals(len(res), 1)
492         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
493         self.assertTrue(not "dnsHostName" in res[0])
494         self.assertEquals(str(res[0]["lastLogon"]), "x")
495         self.assertEquals(str(res[0]["primaryGroupID"]), "512")
496
497         # Note that Xs "objectSid" seems to be fine in the previous search for
498         # "objectSid"...
499         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
500         #print len(res) + " results found"
501         #for i in range(len(res)):
502         #    for (obj in res[i]) {
503         #        print obj + ": " + res[i][obj]
504         #    }
505         #    print "---"
506         #    
507
508         # Search by remote name of renamed attribute */
509         res = self.ldb.search(expression="(sambaBadPasswordCount=*)", 
510                               attrs=["dnsHostName", "lastLogon"])
511         self.assertEquals(len(res), 0)
512
513         # Search by objectClass
514         attrs = ["dnsHostName", "lastLogon", "objectClass"]
515         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
516         self.assertEquals(len(res), 2)
517         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
518         self.assertEquals(str(res[0]["dnsHostName"]), "x")
519         self.assertEquals(str(res[0]["lastLogon"]), "x")
520         self.assertEquals(str(res[0]["objectClass"][0]), "user")
521         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
522         self.assertTrue(not "dnsHostName" in res[1])
523         self.assertEquals(str(res[1]["lastLogon"]), "x")
524         self.assertEquals(str(res[1]["objectClass"][0]), "user")
525
526         # Prove that the objectClass is actually used for the search
527         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
528                               attrs=attrs)
529         self.assertEquals(len(res), 3)
530         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
531         self.assertTrue(not "dnsHostName" in res[0])
532         self.assertEquals(str(res[0]["lastLogon"]), "y")
533         self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
534         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
535         self.assertEquals(str(res[1]["dnsHostName"]), "x")
536         self.assertEquals(str(res[1]["lastLogon"]), "x")
537         self.assertEquals(str(res[1]["objectClass"][0]), "user")
538         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
539         self.assertTrue(not "dnsHostName" in res[2])
540         self.assertEquals(str(res[2]["lastLogon"]), "x")
541         self.assertEquals(res[2]["objectClass"][0], "user")
542
543         # Testing search by parse tree
544
545         # Search by conjunction of local attributes
546         res = self.ldb.search(expression="(&(codePage=x)(revision=x))", 
547                               attrs=["dnsHostName", "lastLogon"])
548         self.assertEquals(len(res), 2)
549         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
550         self.assertEquals(str(res[0]["dnsHostName"]), "y")
551         self.assertEquals(str(res[0]["lastLogon"]), "y")
552         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
553         self.assertEquals(str(res[1]["dnsHostName"]), "x")
554         self.assertEquals(str(res[1]["lastLogon"]), "x")
555
556         # Search by conjunction of remote attributes
557         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", 
558                               attrs=["dnsHostName", "lastLogon"])
559         self.assertEquals(len(res), 2)
560         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
561         self.assertEquals(str(res[0]["dnsHostName"]), "x")
562         self.assertEquals(str(res[0]["lastLogon"]), "x")
563         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
564         self.assertTrue(not "dnsHostName" in res[1])
565         self.assertEquals(str(res[1]["lastLogon"]), "x")
566         
567         # Search by conjunction of local and remote attribute 
568         res = self.ldb.search(expression="(&(codePage=x)(description=x))", 
569                               attrs=["dnsHostName", "lastLogon"])
570         self.assertEquals(len(res), 2)
571         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
572         self.assertEquals(str(res[0]["dnsHostName"]), "y")
573         self.assertEquals(str(res[0]["lastLogon"]), "y")
574         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
575         self.assertEquals(str(res[1]["dnsHostName"]), "x")
576         self.assertEquals(str(res[1]["lastLogon"]), "x")
577
578         # Search by conjunction of local and remote attribute w/o match
579         attrs = ["dnsHostName", "lastLogon"]
580         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", 
581                               attrs=attrs)
582         self.assertEquals(len(res), 0)
583         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", 
584                               attrs=attrs)
585         self.assertEquals(len(res), 0)
586
587         # Search by disjunction of local attributes
588         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", 
589                               attrs=["dnsHostName", "lastLogon"])
590         self.assertEquals(len(res), 2)
591         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
592         self.assertEquals(str(res[0]["dnsHostName"]), "y")
593         self.assertEquals(str(res[0]["lastLogon"]), "y")
594         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
595         self.assertEquals(str(res[1]["dnsHostName"]), "x")
596         self.assertEquals(str(res[1]["lastLogon"]), "x")
597
598         # Search by disjunction of remote attributes
599         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", 
600                               attrs=["dnsHostName", "lastLogon"])
601         self.assertEquals(len(res), 3)
602         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
603         self.assertFalse("dnsHostName" in res[0])
604         self.assertEquals(str(res[0]["lastLogon"]), "y")
605         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
606         self.assertEquals(str(res[1]["dnsHostName"]), "x")
607         self.assertEquals(str(res[1]["lastLogon"]), "x")
608         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
609         self.assertFalse("dnsHostName" in res[2])
610         self.assertEquals(str(res[2]["lastLogon"]), "x")
611
612         # Search by disjunction of local and remote attribute
613         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", 
614                               attrs=["dnsHostName", "lastLogon"])
615         self.assertEquals(len(res), 3)
616         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
617         self.assertEquals(str(res[0]["dnsHostName"]), "y")
618         self.assertEquals(str(res[0]["lastLogon"]), "y")
619         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
620         self.assertFalse("dnsHostName" in res[1])
621         self.assertEquals(str(res[1]["lastLogon"]), "y")
622         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
623         self.assertEquals(str(res[2]["dnsHostName"]), "x")
624         self.assertEquals(str(res[2]["lastLogon"]), "x")
625
626         # Search by disjunction of local and remote attribute w/o match
627         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", 
628                               attrs=["dnsHostName", "lastLogon"])
629         self.assertEquals(len(res), 0)
630
631         # Search by negated local attribute
632         res = self.ldb.search(expression="(!(revision=x))", 
633                               attrs=["dnsHostName", "lastLogon"])
634         self.assertEquals(len(res), 6)
635         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
636         self.assertTrue(not "dnsHostName" in res[0])
637         self.assertEquals(str(res[0]["lastLogon"]), "y")
638         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
639         self.assertTrue(not "dnsHostName" in res[1])
640         self.assertEquals(str(res[1]["lastLogon"]), "x")
641         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
642         self.assertEquals(str(res[2]["dnsHostName"]), "z")
643         self.assertEquals(str(res[2]["lastLogon"]), "z")
644         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
645         self.assertTrue(not "dnsHostName" in res[3])
646         self.assertEquals(str(res[3]["lastLogon"]), "z")
647
648         # Search by negated remote attribute
649         res = self.ldb.search(expression="(!(description=x))", 
650                               attrs=["dnsHostName", "lastLogon"])
651         self.assertEquals(len(res), 4)
652         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
653         self.assertEquals(str(res[0]["dnsHostName"]), "z")
654         self.assertEquals(str(res[0]["lastLogon"]), "z")
655         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
656         self.assertTrue(not "dnsHostName" in res[1])
657         self.assertEquals(str(res[1]["lastLogon"]), "z")
658
659         # Search by negated conjunction of local attributes
660         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", 
661                               attrs=["dnsHostName", "lastLogon"])
662         self.assertEquals(len(res), 6)
663         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
664         self.assertTrue(not "dnsHostName" in res[0])
665         self.assertEquals(str(res[0]["lastLogon"]), "y")
666         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
667         self.assertTrue(not "dnsHostName" in res[1])
668         self.assertEquals(str(res[1]["lastLogon"]), "x")
669         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
670         self.assertEquals(str(res[2]["dnsHostName"]), "z")
671         self.assertEquals(str(res[2]["lastLogon"]), "z")
672         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
673         self.assertTrue(not "dnsHostName" in res[3])
674         self.assertEquals(str(res[3]["lastLogon"]), "z")
675
676         # Search by negated conjunction of remote attributes
677         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", 
678                               attrs=["dnsHostName", "lastLogon"])
679         self.assertEquals(len(res), 6)
680         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
681         self.assertEquals(str(res[0]["dnsHostName"]), "y")
682         self.assertEquals(str(res[0]["lastLogon"]), "y")
683         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
684         self.assertTrue(not "dnsHostName" in res[1])
685         self.assertEquals(str(res[1]["lastLogon"]), "y")
686         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
687         self.assertEquals(str(res[2]["dnsHostName"]), "z")
688         self.assertEquals(str(res[2]["lastLogon"]), "z")
689         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
690         self.assertTrue(not "dnsHostName" in res[3])
691         self.assertEquals(str(res[3]["lastLogon"]), "z")
692
693         # Search by negated conjunction of local and remote attribute
694         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", 
695                               attrs=["dnsHostName", "lastLogon"])
696         self.assertEquals(len(res), 6)
697         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
698         self.assertTrue(not "dnsHostName" in res[0])
699         self.assertEquals(str(res[0]["lastLogon"]), "y")
700         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
701         self.assertTrue(not "dnsHostName" in res[1])
702         self.assertEquals(str(res[1]["lastLogon"]), "x")
703         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
704         self.assertEquals(str(res[2]["dnsHostName"]), "z")
705         self.assertEquals(str(res[2]["lastLogon"]), "z")
706         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
707         self.assertTrue(not "dnsHostName" in res[3])
708         self.assertEquals(str(res[3]["lastLogon"]), "z")
709
710         # Search by negated disjunction of local attributes
711         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", 
712                               attrs=["dnsHostName", "lastLogon"])
713         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
714         self.assertTrue(not "dnsHostName" in res[0])
715         self.assertEquals(str(res[0]["lastLogon"]), "y")
716         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
717         self.assertTrue(not "dnsHostName" in res[1])
718         self.assertEquals(str(res[1]["lastLogon"]), "x")
719         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
720         self.assertEquals(str(res[2]["dnsHostName"]), "z")
721         self.assertEquals(str(res[2]["lastLogon"]), "z")
722         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
723         self.assertTrue(not "dnsHostName" in res[3])
724         self.assertEquals(str(res[3]["lastLogon"]), "z")
725
726         # Search by negated disjunction of remote attributes
727         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", 
728                               attrs=["dnsHostName", "lastLogon"])
729         self.assertEquals(len(res), 5)
730         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
731         self.assertEquals(str(res[0]["dnsHostName"]), "y")
732         self.assertEquals(str(res[0]["lastLogon"]), "y")
733         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
734         self.assertEquals(str(res[1]["dnsHostName"]), "z")
735         self.assertEquals(str(res[1]["lastLogon"]), "z")
736         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
737         self.assertTrue(not "dnsHostName" in res[2])
738         self.assertEquals(str(res[2]["lastLogon"]), "z")
739
740         # Search by negated disjunction of local and remote attribute
741         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", 
742                               attrs=["dnsHostName", "lastLogon"])
743         self.assertEquals(len(res), 5)
744         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
745         self.assertTrue(not "dnsHostName" in res[0])
746         self.assertEquals(str(res[0]["lastLogon"]), "x")
747         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
748         self.assertEquals(str(res[1]["dnsHostName"]), "z")
749         self.assertEquals(str(res[1]["lastLogon"]), "z")
750         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
751         self.assertTrue(not "dnsHostName" in res[2])
752         self.assertEquals(str(res[2]["lastLogon"]), "z")
753
754         # Search by complex parse tree
755         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
756         self.assertEquals(len(res), 7)
757         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
758         self.assertTrue(not "dnsHostName" in res[0])
759         self.assertEquals(str(res[0]["lastLogon"]), "y")
760         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
761         self.assertEquals(str(res[1]["dnsHostName"]), "x")
762         self.assertEquals(str(res[1]["lastLogon"]), "x")
763         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
764         self.assertTrue(not "dnsHostName" in res[2])
765         self.assertEquals(str(res[2]["lastLogon"]), "x")
766         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
767         self.assertEquals(str(res[3]["dnsHostName"]), "z")
768         self.assertEquals(str(res[3]["lastLogon"]), "z")
769         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
770         self.assertTrue(not "dnsHostName" in res[4])
771         self.assertEquals(str(res[4]["lastLogon"]), "z")
772
773         # Clean up
774         dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
775         for dn in dns:
776             self.ldb.delete(dn)
777
778     def test_map_modify_local(self):
779         """Modification of local records."""
780         # Add local record
781         dn = "cn=test,dc=idealx,dc=org"
782         self.ldb.add({"dn": dn, 
783                  "cn": "test",
784                  "foo": "bar",
785                  "revision": "1",
786                  "description": "test"})
787         # Check it's there
788         attrs = ["foo", "revision", "description"]
789         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
790         self.assertEquals(len(res), 1)
791         self.assertEquals(str(res[0].dn), dn)
792         self.assertEquals(str(res[0]["foo"]), "bar")
793         self.assertEquals(str(res[0]["revision"]), "1")
794         self.assertEquals(str(res[0]["description"]), "test")
795         # Check it's not in the local db
796         res = self.samba4.db.search(expression="(cn=test)", 
797                                     scope=SCOPE_DEFAULT, attrs=attrs)
798         self.assertEquals(len(res), 0)
799         # Check it's not in the remote db
800         res = self.samba3.db.search(expression="(cn=test)", 
801                                     scope=SCOPE_DEFAULT, attrs=attrs)
802         self.assertEquals(len(res), 0)
803
804         # Modify local record
805         ldif = """
806 dn: """ + dn + """
807 replace: foo
808 foo: baz
809 replace: description
810 description: foo
811 """
812         self.ldb.modify_ldif(ldif)
813         # Check in local db
814         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
815         self.assertEquals(len(res), 1)
816         self.assertEquals(str(res[0].dn), dn)
817         self.assertEquals(str(res[0]["foo"]), "baz")
818         self.assertEquals(str(res[0]["revision"]), "1")
819         self.assertEquals(str(res[0]["description"]), "foo")
820
821         # Rename local record
822         dn2 = "cn=toast,dc=idealx,dc=org"
823         self.ldb.rename(dn, dn2)
824         # Check in local db
825         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
826         self.assertEquals(len(res), 1)
827         self.assertEquals(str(res[0].dn), dn2)
828         self.assertEquals(str(res[0]["foo"]), "baz")
829         self.assertEquals(str(res[0]["revision"]), "1")
830         self.assertEquals(str(res[0]["description"]), "foo")
831
832         # Delete local record
833         self.ldb.delete(dn2)
834         # Check it's gone
835         res = self.ldb.search(dn2, scope=SCOPE_BASE)
836         self.assertEquals(len(res), 0)
837
838     def test_map_modify_remote_remote(self):
839         """Modification of remote data of remote records"""
840         # Add remote record
841         dn = self.samba4.dn("cn=test")
842         dn2 = self.samba3.dn("cn=test")
843         self.samba3.db.add({"dn": dn2, 
844                    "cn": "test",
845                    "description": "foo",
846                    "sambaBadPasswordCount": "3",
847                    "sambaNextRid": "1001"})
848         # Check it's there
849         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
850                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
851         self.assertEquals(len(res), 1)
852         self.assertEquals(str(res[0].dn), dn2)
853         self.assertEquals(str(res[0]["description"]), "foo")
854         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
855         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
856         # Check in mapped db
857         attrs = ["description", "badPwdCount", "nextRid"]
858         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
859         self.assertEquals(len(res), 1)
860         self.assertEquals(str(res[0].dn), dn)
861         self.assertEquals(str(res[0]["description"]), "foo")
862         self.assertEquals(str(res[0]["badPwdCount"]), "3")
863         self.assertEquals(str(res[0]["nextRid"]), "1001")
864         # Check in local db
865         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
866         self.assertEquals(len(res), 0)
867
868         # Modify remote data of remote record
869         ldif = """
870 dn: """ + dn + """
871 replace: description
872 description: test
873 replace: badPwdCount
874 badPwdCount: 4
875 """
876         self.ldb.modify_ldif(ldif)
877         # Check in mapped db
878         res = self.ldb.search(dn, scope=SCOPE_BASE, 
879                 attrs=["description", "badPwdCount", "nextRid"])
880         self.assertEquals(len(res), 1)
881         self.assertEquals(str(res[0].dn), dn)
882         self.assertEquals(str(res[0]["description"]), "test")
883         self.assertEquals(str(res[0]["badPwdCount"]), "4")
884         self.assertEquals(str(res[0]["nextRid"]), "1001")
885         # Check in remote db
886         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
887                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
888         self.assertEquals(len(res), 1)
889         self.assertEquals(str(res[0].dn), dn2)
890         self.assertEquals(str(res[0]["description"]), "test")
891         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
892         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
893
894         # Rename remote record
895         dn2 = self.samba4.dn("cn=toast")
896         self.ldb.rename(dn, dn2)
897         # Check in mapped db
898         dn = dn2
899         res = self.ldb.search(dn, scope=SCOPE_BASE, 
900                 attrs=["description", "badPwdCount", "nextRid"])
901         self.assertEquals(len(res), 1)
902         self.assertEquals(str(res[0].dn), dn)
903         self.assertEquals(str(res[0]["description"]), "test")
904         self.assertEquals(str(res[0]["badPwdCount"]), "4")
905         self.assertEquals(str(res[0]["nextRid"]), "1001")
906         # Check in remote db 
907         dn2 = self.samba3.dn("cn=toast")
908         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
909                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
910         self.assertEquals(len(res), 1)
911         self.assertEquals(str(res[0].dn), dn2)
912         self.assertEquals(str(res[0]["description"]), "test")
913         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
914         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
915
916         # Delete remote record
917         self.ldb.delete(dn)
918         # Check in mapped db that it's removed
919         res = self.ldb.search(dn, scope=SCOPE_BASE)
920         self.assertEquals(len(res), 0)
921         # Check in remote db
922         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
923         self.assertEquals(len(res), 0)
924
925     def test_map_modify_remote_local(self):
926         """Modification of local data of remote records"""
927         # Add remote record (same as before)
928         dn = self.samba4.dn("cn=test")
929         dn2 = self.samba3.dn("cn=test")
930         self.samba3.db.add({"dn": dn2, 
931                    "cn": "test",
932                    "description": "foo",
933                    "sambaBadPasswordCount": "3",
934                    "sambaNextRid": "1001"})
935
936         # Modify local data of remote record
937         ldif = """
938 dn: """ + dn + """
939 add: revision
940 revision: 1
941 replace: description
942 description: test
943
944 """
945         self.ldb.modify_ldif(ldif)
946         # Check in mapped db
947         attrs = ["revision", "description"]
948         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
949         self.assertEquals(len(res), 1)
950         self.assertEquals(str(res[0].dn), dn)
951         self.assertEquals(str(res[0]["description"]), "test")
952         self.assertEquals(str(res[0]["revision"]), "1")
953         # Check in remote db
954         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
955         self.assertEquals(len(res), 1)
956         self.assertEquals(str(res[0].dn), dn2)
957         self.assertEquals(str(res[0]["description"]), "test")
958         self.assertTrue(not "revision" in res[0])
959         # Check in local db
960         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
961         self.assertEquals(len(res), 1)
962         self.assertEquals(str(res[0].dn), dn)
963         self.assertTrue(not "description" in res[0])
964         self.assertEquals(str(res[0]["revision"]), "1")
965
966         # Delete (newly) split record
967         self.ldb.delete(dn)
968
969     def test_map_modify_split(self):
970         """Testing modification of split records"""
971         # Add split record
972         dn = self.samba4.dn("cn=test")
973         dn2 = self.samba3.dn("cn=test")
974         self.ldb.add({
975             "dn": dn,
976             "cn": "test",
977             "description": "foo",
978             "badPwdCount": "3",
979             "nextRid": "1001",
980             "revision": "1"})
981         # Check it's there
982         attrs = ["description", "badPwdCount", "nextRid", "revision"]
983         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
984         self.assertEquals(len(res), 1)
985         self.assertEquals(str(res[0].dn), dn)
986         self.assertEquals(str(res[0]["description"]), "foo")
987         self.assertEquals(str(res[0]["badPwdCount"]), "3")
988         self.assertEquals(str(res[0]["nextRid"]), "1001")
989         self.assertEquals(str(res[0]["revision"]), "1")
990         # Check in local db
991         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
992         self.assertEquals(len(res), 1)
993         self.assertEquals(str(res[0].dn), dn)
994         self.assertTrue(not "description" in res[0])
995         self.assertTrue(not "badPwdCount" in res[0])
996         self.assertTrue(not "nextRid" in res[0])
997         self.assertEquals(str(res[0]["revision"]), "1")
998         # Check in remote db
999         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
1000                  "revision"]
1001         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1002         self.assertEquals(len(res), 1)
1003         self.assertEquals(str(res[0].dn), dn2)
1004         self.assertEquals(str(res[0]["description"]), "foo")
1005         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1006         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1007         self.assertTrue(not "revision" in res[0])
1008
1009         # Modify of split record
1010         ldif = """
1011 dn: """ + dn + """
1012 replace: description
1013 description: test
1014 replace: badPwdCount
1015 badPwdCount: 4
1016 replace: revision
1017 revision: 2
1018 """
1019         self.ldb.modify_ldif(ldif)
1020         # Check in mapped db
1021         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1022         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1023         self.assertEquals(len(res), 1)
1024         self.assertEquals(str(res[0].dn), dn)
1025         self.assertEquals(str(res[0]["description"]), "test")
1026         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1027         self.assertEquals(str(res[0]["nextRid"]), "1001")
1028         self.assertEquals(str(res[0]["revision"]), "2")
1029         # Check in local db
1030         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1031         self.assertEquals(len(res), 1)
1032         self.assertEquals(str(res[0].dn), dn)
1033         self.assertTrue(not "description" in res[0])
1034         self.assertTrue(not "badPwdCount" in res[0])
1035         self.assertTrue(not "nextRid" in res[0])
1036         self.assertEquals(str(res[0]["revision"]), "2")
1037         # Check in remote db
1038         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
1039                  "revision"]
1040         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1041         self.assertEquals(len(res), 1)
1042         self.assertEquals(str(res[0].dn), dn2)
1043         self.assertEquals(str(res[0]["description"]), "test")
1044         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1045         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1046         self.assertTrue(not "revision" in res[0])
1047
1048         # Rename split record
1049         dn2 = self.samba4.dn("cn=toast")
1050         self.ldb.rename(dn, dn2)
1051         # Check in mapped db
1052         dn = dn2
1053         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1054         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1055         self.assertEquals(len(res), 1)
1056         self.assertEquals(str(res[0].dn), dn)
1057         self.assertEquals(str(res[0]["description"]), "test")
1058         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1059         self.assertEquals(str(res[0]["nextRid"]), "1001")
1060         self.assertEquals(str(res[0]["revision"]), "2")
1061         # Check in local db
1062         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1063         self.assertEquals(len(res), 1)
1064         self.assertEquals(str(res[0].dn), dn)
1065         self.assertTrue(not "description" in res[0])
1066         self.assertTrue(not "badPwdCount" in res[0])
1067         self.assertTrue(not "nextRid" in res[0])
1068         self.assertEquals(str(res[0]["revision"]), "2")
1069         # Check in remote db
1070         dn2 = self.samba3.dn("cn=toast")
1071         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
1072           attrs=["description", "sambaBadPasswordCount", "sambaNextRid", 
1073                  "revision"])
1074         self.assertEquals(len(res), 1)
1075         self.assertEquals(str(res[0].dn), dn2)
1076         self.assertEquals(str(res[0]["description"]), "test")
1077         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1078         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1079         self.assertTrue(not "revision" in res[0])
1080
1081         # Delete split record
1082         self.ldb.delete(dn)
1083         # Check in mapped db
1084         res = self.ldb.search(dn, scope=SCOPE_BASE)
1085         self.assertEquals(len(res), 0)
1086         # Check in local db
1087         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1088         self.assertEquals(len(res), 0)
1089         # Check in remote db
1090         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1091         self.assertEquals(len(res), 0)