s4:dsdb Add default modules list to samba3sam
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
24
25 import os
26 import ldb
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm
30 import samba.dcerpc.security
31 import samba.ndr
32
33 datadir = os.path.join(os.path.dirname(__file__), 
34                        "../../../../../testdata/samba3")
35
36 def read_datafile(filename):
37     return open(os.path.join(datadir, filename), 'r').read()
38
39 def ldb_debug(l, text):
40     print text
41
42
43 class MapBaseTestCase(TestCaseInTempDir):
44     """Base test case for mapping tests."""
45
46     def setup_modules(self, ldb, s3, s4):
47         ldb.add({"dn": "@MAP=samba3sam",
48                  "@FROM": s4.basedn,
49                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
50
51         ldb.add({"dn": "@MODULES",
52                  "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
53
54         ldb.add({"dn": "@PARTITION",
55             "partition": ["%s" % (s4.basedn_casefold), 
56                           "%s" % (s3.basedn_casefold)],
57             "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
58             "modules": "*:"})
59
60     def setUp(self):
61         super(MapBaseTestCase, self).setUp()
62
63         def make_dn(basedn, rdn):
64             return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
65
66         def make_s4dn(basedn, rdn):
67             return "%s,%s" % (rdn, basedn)
68
69         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
70         self.ldburl = "tdb://" + self.ldbfile
71
72         tempdir = self.tempdir
73
74         class Target:
75             """Simple helper class that contains data for a specific SAM 
76             connection."""
77             def __init__(self, basedn, dn):
78                 self.db = Ldb(lp=cmdline_loadparm)
79                 self.basedn = basedn
80                 self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold()
81                 self.substvars = {"BASEDN": self.basedn}
82                 self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold)
83                 self.url = "tdb://" + self.file
84                 self._dn = dn
85
86             def dn(self, rdn):
87                 return self._dn(self.basedn, rdn)
88
89             def connect(self):
90                 return self.db.connect(self.url)
91
92             def setup_data(self, path):
93                 self.add_ldif(read_datafile(path))
94
95             def subst(self, text):
96                 return substitute_var(text, self.substvars)
97
98             def add_ldif(self, ldif):
99                 self.db.add_ldif(self.subst(ldif))
100
101             def modify_ldif(self, ldif):
102                 self.db.modify_ldif(self.subst(ldif))
103
104         self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn)
105         self.samba3 = Target("cn=Samba3Sam", make_dn)
106
107         self.samba3.connect()
108         self.samba4.connect()
109
110     def tearDown(self):
111         os.unlink(self.ldbfile)
112         os.unlink(self.samba3.file)
113         os.unlink(self.samba4.file)
114         super(MapBaseTestCase, self).tearDown()
115
116     def assertSidEquals(self, text, ndr_sid):
117         sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
118                                         str(ndr_sid[0]))
119         sid_obj2 = samba.dcerpc.security.dom_sid(text)
120         self.assertEquals(sid_obj1, sid_obj2)
121
122
123 class Samba3SamTestCase(MapBaseTestCase):
124
125     def setUp(self):
126         super(Samba3SamTestCase, self).setUp()
127         ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
128         self.samba3.setup_data("samba3.ldif")
129         ldif = read_datafile("provision_samba3sam.ldif")
130         ldb.add_ldif(self.samba4.subst(ldif))
131         self.setup_modules(ldb, self.samba3, self.samba4)
132         del ldb
133         self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
134
135     def test_search_non_mapped(self):
136         """Looking up by non-mapped attribute"""
137         msg = self.ldb.search(expression="(cn=Administrator)")
138         self.assertEquals(len(msg), 1)
139         self.assertEquals(msg[0]["cn"], "Administrator")
140
141     def test_search_non_mapped(self):
142         """Looking up by mapped attribute"""
143         msg = self.ldb.search(expression="(name=Backup Operators)")
144         self.assertEquals(len(msg), 1)
145         self.assertEquals(str(msg[0]["name"]), "Backup Operators")
146
147     def test_old_name_of_renamed(self):
148         """Looking up by old name of renamed attribute"""
149         msg = self.ldb.search(expression="(displayName=Backup Operators)")
150         self.assertEquals(len(msg), 0)
151
152     def test_mapped_containing_sid(self):
153         """Looking up mapped entry containing SID"""
154         msg = self.ldb.search(expression="(cn=Replicator)")
155         self.assertEquals(len(msg), 1)
156         self.assertEquals(str(msg[0].dn), 
157                           "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
158         self.assertTrue("objectSid" in msg[0]) 
159         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
160                              msg[0]["objectSid"])
161         oc = set(msg[0]["objectClass"])
162         self.assertEquals(oc, set(["group"]))
163
164     def test_search_by_objclass(self):
165         """Looking up by objectClass"""
166         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
167         self.assertEquals(set([str(m.dn) for m in msg]), 
168                 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", 
169                      "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
170
171     def test_s3sam_modify(self):
172         # Adding a record that will be fallbacked
173         self.ldb.add({"dn": "cn=Foo", 
174             "foo": "bar", 
175             "blah": "Blie", 
176             "cn": "Foo", 
177             "showInAdvancedViewOnly": "TRUE"}
178             )
179
180         # Checking for existence of record (local)
181         # TODO: This record must be searched in the local database, which is 
182         # currently only supported for base searches
183         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
184         # TODO: Actually, this version should work as well but doesn't...
185         # 
186         #    
187         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", 
188                 scope=SCOPE_BASE, 
189                 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
190         self.assertEquals(len(msg), 1)
191         self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
192         self.assertEquals(str(msg[0]["foo"]), "bar")
193         self.assertEquals(str(msg[0]["blah"]), "Blie")
194
195         # Adding record that will be mapped
196         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
197                  "objectClass": "user",
198                  "unixName": "bin",
199                  "sambaUnicodePwd": "geheim",
200                  "cn": "Niemand"})
201
202         # Checking for existence of record (remote)
203         msg = self.ldb.search(expression="(unixName=bin)", 
204                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
205         self.assertEquals(len(msg), 1)
206         self.assertEquals(str(msg[0]["cn"]), "Niemand")
207         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
208
209         # Checking for existence of record (local && remote)
210         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
211                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
212         self.assertEquals(len(msg), 1)           # TODO: should check with more records
213         self.assertEquals(str(msg[0]["cn"]), "Niemand")
214         self.assertEquals(str(msg[0]["unixName"]), "bin")
215         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
216
217         # Checking for existence of record (local || remote)
218         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
219                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
220         #print "got %d replies" % len(msg)
221         self.assertEquals(len(msg), 1)        # TODO: should check with more records
222         self.assertEquals(str(msg[0]["cn"]), "Niemand")
223         self.assertEquals(str(msg[0]["unixName"]), "bin")
224         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
225
226         # Checking for data in destination database
227         msg = self.samba3.db.search(expression="(cn=Niemand)")
228         self.assertTrue(len(msg) >= 1)
229         self.assertEquals(str(msg[0]["sambaSID"]), 
230                 "S-1-5-21-4231626423-2410014848-2360679739-2001")
231         self.assertEquals(str(msg[0]["displayName"]), "Niemand")
232
233         # Adding attribute...
234         self.ldb.modify_ldif("""
235 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
236 changetype: modify
237 add: description
238 description: Blah
239 """)
240
241         # Checking whether changes are still there...
242         msg = self.ldb.search(expression="(cn=Niemand)")
243         self.assertTrue(len(msg) >= 1)
244         self.assertEquals(str(msg[0]["cn"]), "Niemand")
245         self.assertEquals(str(msg[0]["description"]), "Blah")
246
247         # Modifying attribute...
248         self.ldb.modify_ldif("""
249 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
250 changetype: modify
251 replace: description
252 description: Blie
253 """)
254
255         # Checking whether changes are still there...
256         msg = self.ldb.search(expression="(cn=Niemand)")
257         self.assertTrue(len(msg) >= 1)
258         self.assertEquals(str(msg[0]["description"]), "Blie")
259
260         # Deleting attribute...
261         self.ldb.modify_ldif("""
262 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
263 changetype: modify
264 delete: description
265 """)
266
267         # Checking whether changes are no longer there...
268         msg = self.ldb.search(expression="(cn=Niemand)")
269         self.assertTrue(len(msg) >= 1)
270         self.assertTrue(not "description" in msg[0])
271
272         # Renaming record...
273         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", 
274                         "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
275
276         # Checking whether DN has changed...
277         msg = self.ldb.search(expression="(cn=Niemand2)")
278         self.assertEquals(len(msg), 1)
279         self.assertEquals(str(msg[0].dn), 
280                           "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
281
282         # Deleting record...
283         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
284
285         # Checking whether record is gone...
286         msg = self.ldb.search(expression="(cn=Niemand2)")
287         self.assertEquals(len(msg), 0)
288
289
290 class MapTestCase(MapBaseTestCase):
291
292     def setUp(self):
293         super(MapTestCase, self).setUp()
294         ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
295         ldif = read_datafile("provision_samba3sam.ldif")
296         ldb.add_ldif(self.samba4.subst(ldif))
297         self.setup_modules(ldb, self.samba3, self.samba4)
298         del ldb
299         self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
300
301     def test_map_search(self):
302         """Running search tests on mapped data."""
303         self.samba3.db.add({
304             "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
305             "objectclass": ["sambaDomain", "top"],
306             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
307             "sambaNextRid": "2000",
308             "sambaDomainName": "TESTS"
309             })
310
311         # Add a set of split records
312         self.ldb.add_ldif("""
313 dn: """+ self.samba4.dn("cn=Domain Users") + """
314 objectClass: group
315 cn: Domain Users
316 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
317 """)
318
319         # Add a set of split records
320         self.ldb.add_ldif("""
321 dn: """+ self.samba4.dn("cn=X") + """
322 objectClass: user
323 cn: X
324 codePage: x
325 revision: x
326 dnsHostName: x
327 nextRid: y
328 lastLogon: x
329 description: x
330 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
331 """)
332
333         self.ldb.add({
334             "dn": self.samba4.dn("cn=Y"),
335             "objectClass": "top",
336             "cn": "Y",
337             "codePage": "x",
338             "revision": "x",
339             "dnsHostName": "y",
340             "nextRid": "y",
341             "lastLogon": "y",
342             "description": "x"})
343
344         self.ldb.add({
345             "dn": self.samba4.dn("cn=Z"),
346             "objectClass": "top",
347             "cn": "Z",
348             "codePage": "x",
349             "revision": "y",
350             "dnsHostName": "z",
351             "nextRid": "y",
352             "lastLogon": "z",
353             "description": "y"})
354
355         # Add a set of remote records
356
357         self.samba3.db.add({
358             "dn": self.samba3.dn("cn=A"),
359             "objectClass": "posixAccount",
360             "cn": "A",
361             "sambaNextRid": "x",
362             "sambaBadPasswordCount": "x", 
363             "sambaLogonTime": "x",
364             "description": "x",
365             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
366             "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
367
368         self.samba3.db.add({
369             "dn": self.samba3.dn("cn=B"),
370             "objectClass": "top",
371             "cn": "B",
372             "sambaNextRid": "x",
373             "sambaBadPasswordCount": "x",
374             "sambaLogonTime": "y",
375             "description": "x"})
376
377         self.samba3.db.add({
378             "dn": self.samba3.dn("cn=C"),
379             "objectClass": "top",
380             "cn": "C",
381             "sambaNextRid": "x",
382             "sambaBadPasswordCount": "y",
383             "sambaLogonTime": "z",
384             "description": "y"})
385
386         # Testing search by DN
387
388         # Search remote record by local DN
389         dn = self.samba4.dn("cn=A")
390         res = self.ldb.search(dn, scope=SCOPE_BASE, 
391                 attrs=["dnsHostName", "lastLogon"])
392         self.assertEquals(len(res), 1)
393         self.assertEquals(str(res[0].dn), dn)
394         self.assertTrue(not "dnsHostName" in res[0])
395         self.assertEquals(str(res[0]["lastLogon"]), "x")
396
397         # Search remote record by remote DN
398         dn = self.samba3.dn("cn=A")
399         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
400                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
401         self.assertEquals(len(res), 1)
402         self.assertEquals(str(res[0].dn), dn)
403         self.assertTrue(not "dnsHostName" in res[0])
404         self.assertTrue(not "lastLogon" in res[0])
405         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
406
407         # Search split record by local DN
408         dn = self.samba4.dn("cn=X")
409         res = self.ldb.search(dn, scope=SCOPE_BASE, 
410                 attrs=["dnsHostName", "lastLogon"])
411         self.assertEquals(len(res), 1)
412         self.assertEquals(str(res[0].dn), dn)
413         self.assertEquals(str(res[0]["dnsHostName"]), "x")
414         self.assertEquals(str(res[0]["lastLogon"]), "x")
415
416         # Search split record by remote DN
417         dn = self.samba3.dn("cn=X")
418         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
419                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
420         self.assertEquals(len(res), 1)
421         self.assertEquals(str(res[0].dn), dn)
422         self.assertTrue(not "dnsHostName" in res[0])
423         self.assertTrue(not "lastLogon" in res[0])
424         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
425
426         # Testing search by attribute
427
428         # Search by ignored attribute
429         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, 
430                 attrs=["dnsHostName", "lastLogon"])
431         self.assertEquals(len(res), 2)
432         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
433         self.assertEquals(str(res[0]["dnsHostName"]), "y")
434         self.assertEquals(str(res[0]["lastLogon"]), "y")
435         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
436         self.assertEquals(str(res[1]["dnsHostName"]), "x")
437         self.assertEquals(str(res[1]["lastLogon"]), "x")
438
439         # Search by kept attribute
440         res = self.ldb.search(expression="(description=y)", 
441                 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
442         self.assertEquals(len(res), 2)
443         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
444         self.assertEquals(str(res[0]["dnsHostName"]), "z")
445         self.assertEquals(str(res[0]["lastLogon"]), "z")
446         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
447         self.assertTrue(not "dnsHostName" in res[1])
448         self.assertEquals(str(res[1]["lastLogon"]), "z")
449
450         # Search by renamed attribute
451         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
452                               attrs=["dnsHostName", "lastLogon"])
453         self.assertEquals(len(res), 2)
454         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
455         self.assertTrue(not "dnsHostName" in res[0])
456         self.assertEquals(str(res[0]["lastLogon"]), "y")
457         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
458         self.assertTrue(not "dnsHostName" in res[1])
459         self.assertEquals(str(res[1]["lastLogon"]), "x")
460
461         # Search by converted attribute
462         # TODO:
463         #   Using the SID directly in the parse tree leads to conversion
464         #   errors, letting the search fail with no results.
465         #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
466         res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
467         self.assertEquals(len(res), 4)
468         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
469         self.assertEquals(str(res[0]["dnsHostName"]), "x")
470         self.assertEquals(str(res[0]["lastLogon"]), "x")
471         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", 
472                              res[0]["objectSid"])
473         self.assertTrue("objectSid" in res[0])
474         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
475         self.assertTrue(not "dnsHostName" in res[1])
476         self.assertEquals(str(res[1]["lastLogon"]), "x")
477         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
478                              res[1]["objectSid"])
479         self.assertTrue("objectSid" in res[1])
480
481         # Search by generated attribute 
482         # In most cases, this even works when the mapping is missing
483         # a `convert_operator' by enumerating the remote db.
484         res = self.ldb.search(expression="(primaryGroupID=512)", 
485                            attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
486         self.assertEquals(len(res), 1)
487         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
488         self.assertTrue(not "dnsHostName" in res[0])
489         self.assertEquals(str(res[0]["lastLogon"]), "x")
490         self.assertEquals(str(res[0]["primaryGroupID"]), "512")
491
492         # Note that Xs "objectSid" seems to be fine in the previous search for
493         # "objectSid"...
494         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
495         #print len(res) + " results found"
496         #for i in range(len(res)):
497         #    for (obj in res[i]) {
498         #        print obj + ": " + res[i][obj]
499         #    }
500         #    print "---"
501         #    
502
503         # Search by remote name of renamed attribute */
504         res = self.ldb.search(expression="(sambaBadPasswordCount=*)", 
505                               attrs=["dnsHostName", "lastLogon"])
506         self.assertEquals(len(res), 0)
507
508         # Search by objectClass
509         attrs = ["dnsHostName", "lastLogon", "objectClass"]
510         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
511         self.assertEquals(len(res), 2)
512         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
513         self.assertEquals(str(res[0]["dnsHostName"]), "x")
514         self.assertEquals(str(res[0]["lastLogon"]), "x")
515         self.assertEquals(str(res[0]["objectClass"][0]), "user")
516         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
517         self.assertTrue(not "dnsHostName" in res[1])
518         self.assertEquals(str(res[1]["lastLogon"]), "x")
519         self.assertEquals(str(res[1]["objectClass"][0]), "user")
520
521         # Prove that the objectClass is actually used for the search
522         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
523                               attrs=attrs)
524         self.assertEquals(len(res), 3)
525         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
526         self.assertTrue(not "dnsHostName" in res[0])
527         self.assertEquals(str(res[0]["lastLogon"]), "y")
528         self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
529         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
530         self.assertEquals(str(res[1]["dnsHostName"]), "x")
531         self.assertEquals(str(res[1]["lastLogon"]), "x")
532         self.assertEquals(str(res[1]["objectClass"][0]), "user")
533         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
534         self.assertTrue(not "dnsHostName" in res[2])
535         self.assertEquals(str(res[2]["lastLogon"]), "x")
536         self.assertEquals(res[2]["objectClass"][0], "user")
537
538         # Testing search by parse tree
539
540         # Search by conjunction of local attributes
541         res = self.ldb.search(expression="(&(codePage=x)(revision=x))", 
542                               attrs=["dnsHostName", "lastLogon"])
543         self.assertEquals(len(res), 2)
544         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
545         self.assertEquals(str(res[0]["dnsHostName"]), "y")
546         self.assertEquals(str(res[0]["lastLogon"]), "y")
547         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
548         self.assertEquals(str(res[1]["dnsHostName"]), "x")
549         self.assertEquals(str(res[1]["lastLogon"]), "x")
550
551         # Search by conjunction of remote attributes
552         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", 
553                               attrs=["dnsHostName", "lastLogon"])
554         self.assertEquals(len(res), 2)
555         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
556         self.assertEquals(str(res[0]["dnsHostName"]), "x")
557         self.assertEquals(str(res[0]["lastLogon"]), "x")
558         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
559         self.assertTrue(not "dnsHostName" in res[1])
560         self.assertEquals(str(res[1]["lastLogon"]), "x")
561         
562         # Search by conjunction of local and remote attribute 
563         res = self.ldb.search(expression="(&(codePage=x)(description=x))", 
564                               attrs=["dnsHostName", "lastLogon"])
565         self.assertEquals(len(res), 2)
566         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
567         self.assertEquals(str(res[0]["dnsHostName"]), "y")
568         self.assertEquals(str(res[0]["lastLogon"]), "y")
569         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
570         self.assertEquals(str(res[1]["dnsHostName"]), "x")
571         self.assertEquals(str(res[1]["lastLogon"]), "x")
572
573         # Search by conjunction of local and remote attribute w/o match
574         attrs = ["dnsHostName", "lastLogon"]
575         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", 
576                               attrs=attrs)
577         self.assertEquals(len(res), 0)
578         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", 
579                               attrs=attrs)
580         self.assertEquals(len(res), 0)
581
582         # Search by disjunction of local attributes
583         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", 
584                               attrs=["dnsHostName", "lastLogon"])
585         self.assertEquals(len(res), 2)
586         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
587         self.assertEquals(str(res[0]["dnsHostName"]), "y")
588         self.assertEquals(str(res[0]["lastLogon"]), "y")
589         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
590         self.assertEquals(str(res[1]["dnsHostName"]), "x")
591         self.assertEquals(str(res[1]["lastLogon"]), "x")
592
593         # Search by disjunction of remote attributes
594         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", 
595                               attrs=["dnsHostName", "lastLogon"])
596         self.assertEquals(len(res), 3)
597         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
598         self.assertFalse("dnsHostName" in res[0])
599         self.assertEquals(str(res[0]["lastLogon"]), "y")
600         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
601         self.assertEquals(str(res[1]["dnsHostName"]), "x")
602         self.assertEquals(str(res[1]["lastLogon"]), "x")
603         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
604         self.assertFalse("dnsHostName" in res[2])
605         self.assertEquals(str(res[2]["lastLogon"]), "x")
606
607         # Search by disjunction of local and remote attribute
608         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", 
609                               attrs=["dnsHostName", "lastLogon"])
610         self.assertEquals(len(res), 3)
611         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
612         self.assertEquals(str(res[0]["dnsHostName"]), "y")
613         self.assertEquals(str(res[0]["lastLogon"]), "y")
614         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
615         self.assertFalse("dnsHostName" in res[1])
616         self.assertEquals(str(res[1]["lastLogon"]), "y")
617         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
618         self.assertEquals(str(res[2]["dnsHostName"]), "x")
619         self.assertEquals(str(res[2]["lastLogon"]), "x")
620
621         # Search by disjunction of local and remote attribute w/o match
622         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", 
623                               attrs=["dnsHostName", "lastLogon"])
624         self.assertEquals(len(res), 0)
625
626         # Search by negated local attribute
627         res = self.ldb.search(expression="(!(revision=x))", 
628                               attrs=["dnsHostName", "lastLogon"])
629         self.assertEquals(len(res), 6)
630         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
631         self.assertTrue(not "dnsHostName" in res[0])
632         self.assertEquals(str(res[0]["lastLogon"]), "y")
633         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
634         self.assertTrue(not "dnsHostName" in res[1])
635         self.assertEquals(str(res[1]["lastLogon"]), "x")
636         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
637         self.assertEquals(str(res[2]["dnsHostName"]), "z")
638         self.assertEquals(str(res[2]["lastLogon"]), "z")
639         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
640         self.assertTrue(not "dnsHostName" in res[3])
641         self.assertEquals(str(res[3]["lastLogon"]), "z")
642
643         # Search by negated remote attribute
644         res = self.ldb.search(expression="(!(description=x))", 
645                               attrs=["dnsHostName", "lastLogon"])
646         self.assertEquals(len(res), 4)
647         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
648         self.assertEquals(str(res[0]["dnsHostName"]), "z")
649         self.assertEquals(str(res[0]["lastLogon"]), "z")
650         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
651         self.assertTrue(not "dnsHostName" in res[1])
652         self.assertEquals(str(res[1]["lastLogon"]), "z")
653
654         # Search by negated conjunction of local attributes
655         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", 
656                               attrs=["dnsHostName", "lastLogon"])
657         self.assertEquals(len(res), 6)
658         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
659         self.assertTrue(not "dnsHostName" in res[0])
660         self.assertEquals(str(res[0]["lastLogon"]), "y")
661         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
662         self.assertTrue(not "dnsHostName" in res[1])
663         self.assertEquals(str(res[1]["lastLogon"]), "x")
664         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
665         self.assertEquals(str(res[2]["dnsHostName"]), "z")
666         self.assertEquals(str(res[2]["lastLogon"]), "z")
667         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
668         self.assertTrue(not "dnsHostName" in res[3])
669         self.assertEquals(str(res[3]["lastLogon"]), "z")
670
671         # Search by negated conjunction of remote attributes
672         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", 
673                               attrs=["dnsHostName", "lastLogon"])
674         self.assertEquals(len(res), 6)
675         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
676         self.assertEquals(str(res[0]["dnsHostName"]), "y")
677         self.assertEquals(str(res[0]["lastLogon"]), "y")
678         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
679         self.assertTrue(not "dnsHostName" in res[1])
680         self.assertEquals(str(res[1]["lastLogon"]), "y")
681         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
682         self.assertEquals(str(res[2]["dnsHostName"]), "z")
683         self.assertEquals(str(res[2]["lastLogon"]), "z")
684         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
685         self.assertTrue(not "dnsHostName" in res[3])
686         self.assertEquals(str(res[3]["lastLogon"]), "z")
687
688         # Search by negated conjunction of local and remote attribute
689         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", 
690                               attrs=["dnsHostName", "lastLogon"])
691         self.assertEquals(len(res), 6)
692         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
693         self.assertTrue(not "dnsHostName" in res[0])
694         self.assertEquals(str(res[0]["lastLogon"]), "y")
695         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
696         self.assertTrue(not "dnsHostName" in res[1])
697         self.assertEquals(str(res[1]["lastLogon"]), "x")
698         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
699         self.assertEquals(str(res[2]["dnsHostName"]), "z")
700         self.assertEquals(str(res[2]["lastLogon"]), "z")
701         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
702         self.assertTrue(not "dnsHostName" in res[3])
703         self.assertEquals(str(res[3]["lastLogon"]), "z")
704
705         # Search by negated disjunction of local attributes
706         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", 
707                               attrs=["dnsHostName", "lastLogon"])
708         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
709         self.assertTrue(not "dnsHostName" in res[0])
710         self.assertEquals(str(res[0]["lastLogon"]), "y")
711         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
712         self.assertTrue(not "dnsHostName" in res[1])
713         self.assertEquals(str(res[1]["lastLogon"]), "x")
714         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
715         self.assertEquals(str(res[2]["dnsHostName"]), "z")
716         self.assertEquals(str(res[2]["lastLogon"]), "z")
717         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
718         self.assertTrue(not "dnsHostName" in res[3])
719         self.assertEquals(str(res[3]["lastLogon"]), "z")
720
721         # Search by negated disjunction of remote attributes
722         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", 
723                               attrs=["dnsHostName", "lastLogon"])
724         self.assertEquals(len(res), 5)
725         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
726         self.assertEquals(str(res[0]["dnsHostName"]), "y")
727         self.assertEquals(str(res[0]["lastLogon"]), "y")
728         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
729         self.assertEquals(str(res[1]["dnsHostName"]), "z")
730         self.assertEquals(str(res[1]["lastLogon"]), "z")
731         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
732         self.assertTrue(not "dnsHostName" in res[2])
733         self.assertEquals(str(res[2]["lastLogon"]), "z")
734
735         # Search by negated disjunction of local and remote attribute
736         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", 
737                               attrs=["dnsHostName", "lastLogon"])
738         self.assertEquals(len(res), 5)
739         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
740         self.assertTrue(not "dnsHostName" in res[0])
741         self.assertEquals(str(res[0]["lastLogon"]), "x")
742         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
743         self.assertEquals(str(res[1]["dnsHostName"]), "z")
744         self.assertEquals(str(res[1]["lastLogon"]), "z")
745         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
746         self.assertTrue(not "dnsHostName" in res[2])
747         self.assertEquals(str(res[2]["lastLogon"]), "z")
748
749         # Search by complex parse tree
750         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
751         self.assertEquals(len(res), 7)
752         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
753         self.assertTrue(not "dnsHostName" in res[0])
754         self.assertEquals(str(res[0]["lastLogon"]), "y")
755         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
756         self.assertEquals(str(res[1]["dnsHostName"]), "x")
757         self.assertEquals(str(res[1]["lastLogon"]), "x")
758         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
759         self.assertTrue(not "dnsHostName" in res[2])
760         self.assertEquals(str(res[2]["lastLogon"]), "x")
761         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
762         self.assertEquals(str(res[3]["dnsHostName"]), "z")
763         self.assertEquals(str(res[3]["lastLogon"]), "z")
764         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
765         self.assertTrue(not "dnsHostName" in res[4])
766         self.assertEquals(str(res[4]["lastLogon"]), "z")
767
768         # Clean up
769         dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
770         for dn in dns:
771             self.ldb.delete(dn)
772
773     def test_map_modify_local(self):
774         """Modification of local records."""
775         # Add local record
776         dn = "cn=test,dc=idealx,dc=org"
777         self.ldb.add({"dn": dn, 
778                  "cn": "test",
779                  "foo": "bar",
780                  "revision": "1",
781                  "description": "test"})
782         # Check it's there
783         attrs = ["foo", "revision", "description"]
784         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
785         self.assertEquals(len(res), 1)
786         self.assertEquals(str(res[0].dn), dn)
787         self.assertEquals(str(res[0]["foo"]), "bar")
788         self.assertEquals(str(res[0]["revision"]), "1")
789         self.assertEquals(str(res[0]["description"]), "test")
790         # Check it's not in the local db
791         res = self.samba4.db.search(expression="(cn=test)", 
792                                     scope=SCOPE_DEFAULT, attrs=attrs)
793         self.assertEquals(len(res), 0)
794         # Check it's not in the remote db
795         res = self.samba3.db.search(expression="(cn=test)", 
796                                     scope=SCOPE_DEFAULT, attrs=attrs)
797         self.assertEquals(len(res), 0)
798
799         # Modify local record
800         ldif = """
801 dn: """ + dn + """
802 replace: foo
803 foo: baz
804 replace: description
805 description: foo
806 """
807         self.ldb.modify_ldif(ldif)
808         # Check in local db
809         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
810         self.assertEquals(len(res), 1)
811         self.assertEquals(str(res[0].dn), dn)
812         self.assertEquals(str(res[0]["foo"]), "baz")
813         self.assertEquals(str(res[0]["revision"]), "1")
814         self.assertEquals(str(res[0]["description"]), "foo")
815
816         # Rename local record
817         dn2 = "cn=toast,dc=idealx,dc=org"
818         self.ldb.rename(dn, dn2)
819         # Check in local db
820         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
821         self.assertEquals(len(res), 1)
822         self.assertEquals(str(res[0].dn), dn2)
823         self.assertEquals(str(res[0]["foo"]), "baz")
824         self.assertEquals(str(res[0]["revision"]), "1")
825         self.assertEquals(str(res[0]["description"]), "foo")
826
827         # Delete local record
828         self.ldb.delete(dn2)
829         # Check it's gone
830         res = self.ldb.search(dn2, scope=SCOPE_BASE)
831         self.assertEquals(len(res), 0)
832
833     def test_map_modify_remote_remote(self):
834         """Modification of remote data of remote records"""
835         # Add remote record
836         dn = self.samba4.dn("cn=test")
837         dn2 = self.samba3.dn("cn=test")
838         self.samba3.db.add({"dn": dn2, 
839                    "cn": "test",
840                    "description": "foo",
841                    "sambaBadPasswordCount": "3",
842                    "sambaNextRid": "1001"})
843         # Check it's there
844         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
845                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
846         self.assertEquals(len(res), 1)
847         self.assertEquals(str(res[0].dn), dn2)
848         self.assertEquals(str(res[0]["description"]), "foo")
849         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
850         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
851         # Check in mapped db
852         attrs = ["description", "badPwdCount", "nextRid"]
853         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
854         self.assertEquals(len(res), 1)
855         self.assertEquals(str(res[0].dn), dn)
856         self.assertEquals(str(res[0]["description"]), "foo")
857         self.assertEquals(str(res[0]["badPwdCount"]), "3")
858         self.assertEquals(str(res[0]["nextRid"]), "1001")
859         # Check in local db
860         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
861         self.assertEquals(len(res), 0)
862
863         # Modify remote data of remote record
864         ldif = """
865 dn: """ + dn + """
866 replace: description
867 description: test
868 replace: badPwdCount
869 badPwdCount: 4
870 """
871         self.ldb.modify_ldif(ldif)
872         # Check in mapped db
873         res = self.ldb.search(dn, scope=SCOPE_BASE, 
874                 attrs=["description", "badPwdCount", "nextRid"])
875         self.assertEquals(len(res), 1)
876         self.assertEquals(str(res[0].dn), dn)
877         self.assertEquals(str(res[0]["description"]), "test")
878         self.assertEquals(str(res[0]["badPwdCount"]), "4")
879         self.assertEquals(str(res[0]["nextRid"]), "1001")
880         # Check in remote db
881         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
882                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
883         self.assertEquals(len(res), 1)
884         self.assertEquals(str(res[0].dn), dn2)
885         self.assertEquals(str(res[0]["description"]), "test")
886         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
887         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
888
889         # Rename remote record
890         dn2 = self.samba4.dn("cn=toast")
891         self.ldb.rename(dn, dn2)
892         # Check in mapped db
893         dn = dn2
894         res = self.ldb.search(dn, scope=SCOPE_BASE, 
895                 attrs=["description", "badPwdCount", "nextRid"])
896         self.assertEquals(len(res), 1)
897         self.assertEquals(str(res[0].dn), dn)
898         self.assertEquals(str(res[0]["description"]), "test")
899         self.assertEquals(str(res[0]["badPwdCount"]), "4")
900         self.assertEquals(str(res[0]["nextRid"]), "1001")
901         # Check in remote db 
902         dn2 = self.samba3.dn("cn=toast")
903         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
904                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
905         self.assertEquals(len(res), 1)
906         self.assertEquals(str(res[0].dn), dn2)
907         self.assertEquals(str(res[0]["description"]), "test")
908         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
909         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
910
911         # Delete remote record
912         self.ldb.delete(dn)
913         # Check in mapped db that it's removed
914         res = self.ldb.search(dn, scope=SCOPE_BASE)
915         self.assertEquals(len(res), 0)
916         # Check in remote db
917         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
918         self.assertEquals(len(res), 0)
919
920     def test_map_modify_remote_local(self):
921         """Modification of local data of remote records"""
922         # Add remote record (same as before)
923         dn = self.samba4.dn("cn=test")
924         dn2 = self.samba3.dn("cn=test")
925         self.samba3.db.add({"dn": dn2, 
926                    "cn": "test",
927                    "description": "foo",
928                    "sambaBadPasswordCount": "3",
929                    "sambaNextRid": "1001"})
930
931         # Modify local data of remote record
932         ldif = """
933 dn: """ + dn + """
934 add: revision
935 revision: 1
936 replace: description
937 description: test
938
939 """
940         self.ldb.modify_ldif(ldif)
941         # Check in mapped db
942         attrs = ["revision", "description"]
943         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
944         self.assertEquals(len(res), 1)
945         self.assertEquals(str(res[0].dn), dn)
946         self.assertEquals(str(res[0]["description"]), "test")
947         self.assertEquals(str(res[0]["revision"]), "1")
948         # Check in remote db
949         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
950         self.assertEquals(len(res), 1)
951         self.assertEquals(str(res[0].dn), dn2)
952         self.assertEquals(str(res[0]["description"]), "test")
953         self.assertTrue(not "revision" in res[0])
954         # Check in local db
955         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
956         self.assertEquals(len(res), 1)
957         self.assertEquals(str(res[0].dn), dn)
958         self.assertTrue(not "description" in res[0])
959         self.assertEquals(str(res[0]["revision"]), "1")
960
961         # Delete (newly) split record
962         self.ldb.delete(dn)
963
964     def test_map_modify_split(self):
965         """Testing modification of split records"""
966         # Add split record
967         dn = self.samba4.dn("cn=test")
968         dn2 = self.samba3.dn("cn=test")
969         self.ldb.add({
970             "dn": dn,
971             "cn": "test",
972             "description": "foo",
973             "badPwdCount": "3",
974             "nextRid": "1001",
975             "revision": "1"})
976         # Check it's there
977         attrs = ["description", "badPwdCount", "nextRid", "revision"]
978         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
979         self.assertEquals(len(res), 1)
980         self.assertEquals(str(res[0].dn), dn)
981         self.assertEquals(str(res[0]["description"]), "foo")
982         self.assertEquals(str(res[0]["badPwdCount"]), "3")
983         self.assertEquals(str(res[0]["nextRid"]), "1001")
984         self.assertEquals(str(res[0]["revision"]), "1")
985         # Check in local db
986         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
987         self.assertEquals(len(res), 1)
988         self.assertEquals(str(res[0].dn), dn)
989         self.assertTrue(not "description" in res[0])
990         self.assertTrue(not "badPwdCount" in res[0])
991         self.assertTrue(not "nextRid" in res[0])
992         self.assertEquals(str(res[0]["revision"]), "1")
993         # Check in remote db
994         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
995                  "revision"]
996         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
997         self.assertEquals(len(res), 1)
998         self.assertEquals(str(res[0].dn), dn2)
999         self.assertEquals(str(res[0]["description"]), "foo")
1000         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1001         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1002         self.assertTrue(not "revision" in res[0])
1003
1004         # Modify of split record
1005         ldif = """
1006 dn: """ + dn + """
1007 replace: description
1008 description: test
1009 replace: badPwdCount
1010 badPwdCount: 4
1011 replace: revision
1012 revision: 2
1013 """
1014         self.ldb.modify_ldif(ldif)
1015         # Check in mapped db
1016         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1017         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1018         self.assertEquals(len(res), 1)
1019         self.assertEquals(str(res[0].dn), dn)
1020         self.assertEquals(str(res[0]["description"]), "test")
1021         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1022         self.assertEquals(str(res[0]["nextRid"]), "1001")
1023         self.assertEquals(str(res[0]["revision"]), "2")
1024         # Check in local db
1025         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1026         self.assertEquals(len(res), 1)
1027         self.assertEquals(str(res[0].dn), dn)
1028         self.assertTrue(not "description" in res[0])
1029         self.assertTrue(not "badPwdCount" in res[0])
1030         self.assertTrue(not "nextRid" in res[0])
1031         self.assertEquals(str(res[0]["revision"]), "2")
1032         # Check in remote db
1033         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
1034                  "revision"]
1035         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1036         self.assertEquals(len(res), 1)
1037         self.assertEquals(str(res[0].dn), dn2)
1038         self.assertEquals(str(res[0]["description"]), "test")
1039         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1040         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1041         self.assertTrue(not "revision" in res[0])
1042
1043         # Rename split record
1044         dn2 = self.samba4.dn("cn=toast")
1045         self.ldb.rename(dn, dn2)
1046         # Check in mapped db
1047         dn = dn2
1048         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1049         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1050         self.assertEquals(len(res), 1)
1051         self.assertEquals(str(res[0].dn), dn)
1052         self.assertEquals(str(res[0]["description"]), "test")
1053         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1054         self.assertEquals(str(res[0]["nextRid"]), "1001")
1055         self.assertEquals(str(res[0]["revision"]), "2")
1056         # Check in local db
1057         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1058         self.assertEquals(len(res), 1)
1059         self.assertEquals(str(res[0].dn), dn)
1060         self.assertTrue(not "description" in res[0])
1061         self.assertTrue(not "badPwdCount" in res[0])
1062         self.assertTrue(not "nextRid" in res[0])
1063         self.assertEquals(str(res[0]["revision"]), "2")
1064         # Check in remote db
1065         dn2 = self.samba3.dn("cn=toast")
1066         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
1067           attrs=["description", "sambaBadPasswordCount", "sambaNextRid", 
1068                  "revision"])
1069         self.assertEquals(len(res), 1)
1070         self.assertEquals(str(res[0].dn), dn2)
1071         self.assertEquals(str(res[0]["description"]), "test")
1072         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1073         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1074         self.assertTrue(not "revision" in res[0])
1075
1076         # Delete split record
1077         self.ldb.delete(dn)
1078         # Check in mapped db
1079         res = self.ldb.search(dn, scope=SCOPE_BASE)
1080         self.assertEquals(len(res), 0)
1081         # Check in local db
1082         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1083         self.assertEquals(len(res), 0)
1084         # Check in remote db
1085         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1086         self.assertEquals(len(res), 0)