8d4047b0b8cd6dc36f87b3149296945c5920c332
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
24
25 import os
26 import ldb
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm
30 import samba.dcerpc.security
31 import samba.ndr
32 from samba.auth import system_session
33 from samba import param
34
35 datadir = os.path.join(os.path.dirname(__file__), 
36                        "../../../../../testdata/samba3")
37
38 def read_datafile(filename):
39     return open(os.path.join(datadir, filename), 'r').read()
40
41 def ldb_debug(l, text):
42     print text
43
44
45 class MapBaseTestCase(TestCaseInTempDir):
46     """Base test case for mapping tests."""
47
48     def setup_modules(self, ldb, s3, s4):
49         ldb.add({"dn": "@MAP=samba3sam",
50                  "@FROM": s4.basedn,
51                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
52
53         ldb.add({"dn": "@MODULES",
54                  "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,partition"})
55
56         ldb.add({"dn": "@PARTITION",
57             "partition": ["%s" % (s4.basedn_casefold), 
58                           "%s" % (s3.basedn_casefold)],
59             "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
60             "modules": "*:"})
61
62     def setUp(self):
63         cmdline_loadparm.set("sid generator", "backend")
64         super(MapBaseTestCase, self).setUp()
65
66         def make_dn(basedn, rdn):
67             return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
68
69         def make_s4dn(basedn, rdn):
70             return "%s,%s" % (rdn, basedn)
71
72         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
73         self.ldburl = "tdb://" + self.ldbfile
74
75         tempdir = self.tempdir
76
77         class Target:
78             """Simple helper class that contains data for a specific SAM 
79             connection."""
80             def __init__(self, basedn, dn):
81                 self.db = Ldb(lp=cmdline_loadparm, session_info=system_session())
82                 self.basedn = basedn
83                 self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold()
84                 self.substvars = {"BASEDN": self.basedn}
85                 self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold)
86                 self.url = "tdb://" + self.file
87                 self._dn = dn
88
89             def dn(self, rdn):
90                 return self._dn(self.basedn, rdn)
91
92             def connect(self):
93                 return self.db.connect(self.url)
94
95             def setup_data(self, path):
96                 self.add_ldif(read_datafile(path))
97
98             def subst(self, text):
99                 return substitute_var(text, self.substvars)
100
101             def add_ldif(self, ldif):
102                 self.db.add_ldif(self.subst(ldif))
103
104             def modify_ldif(self, ldif):
105                 self.db.modify_ldif(self.subst(ldif))
106
107         self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn)
108         self.samba3 = Target("cn=Samba3Sam", make_dn)
109
110         self.samba3.connect()
111         self.samba4.connect()
112
113     def tearDown(self):
114         os.unlink(self.ldbfile)
115         os.unlink(self.samba3.file)
116         os.unlink(self.samba4.file)
117         super(MapBaseTestCase, self).tearDown()
118
119     def assertSidEquals(self, text, ndr_sid):
120         sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
121                                         str(ndr_sid[0]))
122         sid_obj2 = samba.dcerpc.security.dom_sid(text)
123         self.assertEquals(sid_obj1, sid_obj2)
124
125
126 class Samba3SamTestCase(MapBaseTestCase):
127
128     def setUp(self):
129         super(Samba3SamTestCase, self).setUp()
130         ldb = Ldb(self.ldburl, lp=cmdline_loadparm, session_info=system_session())
131         self.samba3.setup_data("samba3.ldif")
132         ldif = read_datafile("provision_samba3sam.ldif")
133         ldb.add_ldif(self.samba4.subst(ldif))
134         self.setup_modules(ldb, self.samba3, self.samba4)
135         del ldb
136         self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm, session_info=system_session())
137
138     def test_search_non_mapped(self):
139         """Looking up by non-mapped attribute"""
140         msg = self.ldb.search(expression="(cn=Administrator)")
141         self.assertEquals(len(msg), 1)
142         self.assertEquals(msg[0]["cn"], "Administrator")
143
144     def test_search_non_mapped(self):
145         """Looking up by mapped attribute"""
146         msg = self.ldb.search(expression="(name=Backup Operators)")
147         self.assertEquals(len(msg), 1)
148         self.assertEquals(str(msg[0]["name"]), "Backup Operators")
149
150     def test_old_name_of_renamed(self):
151         """Looking up by old name of renamed attribute"""
152         msg = self.ldb.search(expression="(displayName=Backup Operators)")
153         self.assertEquals(len(msg), 0)
154
155     def test_mapped_containing_sid(self):
156         """Looking up mapped entry containing SID"""
157         msg = self.ldb.search(expression="(cn=Replicator)")
158         self.assertEquals(len(msg), 1)
159         self.assertEquals(str(msg[0].dn), 
160                           "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
161         self.assertTrue("objectSid" in msg[0]) 
162         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
163                              msg[0]["objectSid"])
164         oc = set(msg[0]["objectClass"])
165         self.assertEquals(oc, set(["group"]))
166
167     def test_search_by_objclass(self):
168         """Looking up by objectClass"""
169         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
170         self.assertEquals(set([str(m.dn) for m in msg]), 
171                 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", 
172                      "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
173
174     def test_s3sam_modify(self):
175         # Adding a record that will be fallbacked
176         self.ldb.add({"dn": "cn=Foo", 
177             "foo": "bar", 
178             "blah": "Blie", 
179             "cn": "Foo", 
180             "showInAdvancedViewOnly": "TRUE"}
181             )
182
183         # Checking for existence of record (local)
184         # TODO: This record must be searched in the local database, which is 
185         # currently only supported for base searches
186         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
187         # TODO: Actually, this version should work as well but doesn't...
188         # 
189         #    
190         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", 
191                 scope=SCOPE_BASE, 
192                 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
193         self.assertEquals(len(msg), 1)
194         self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
195         self.assertEquals(str(msg[0]["foo"]), "bar")
196         self.assertEquals(str(msg[0]["blah"]), "Blie")
197
198         # Adding record that will be mapped
199         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
200                  "objectClass": "user",
201                  "unixName": "bin",
202                  "sambaUnicodePwd": "geheim",
203                  "cn": "Niemand"})
204
205         # Checking for existence of record (remote)
206         msg = self.ldb.search(expression="(unixName=bin)", 
207                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
208         self.assertEquals(len(msg), 1)
209         self.assertEquals(str(msg[0]["cn"]), "Niemand")
210         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
211
212         # Checking for existence of record (local && remote)
213         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
214                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
215         self.assertEquals(len(msg), 1)           # TODO: should check with more records
216         self.assertEquals(str(msg[0]["cn"]), "Niemand")
217         self.assertEquals(str(msg[0]["unixName"]), "bin")
218         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
219
220         # Checking for existence of record (local || remote)
221         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
222                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
223         #print "got %d replies" % len(msg)
224         self.assertEquals(len(msg), 1)        # TODO: should check with more records
225         self.assertEquals(str(msg[0]["cn"]), "Niemand")
226         self.assertEquals(str(msg[0]["unixName"]), "bin")
227         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
228
229         # Checking for data in destination database
230         msg = self.samba3.db.search(expression="(cn=Niemand)")
231         self.assertTrue(len(msg) >= 1)
232         self.assertEquals(str(msg[0]["sambaSID"]), 
233                 "S-1-5-21-4231626423-2410014848-2360679739-2001")
234         self.assertEquals(str(msg[0]["displayName"]), "Niemand")
235
236         # Adding attribute...
237         self.ldb.modify_ldif("""
238 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
239 changetype: modify
240 add: description
241 description: Blah
242 """)
243
244         # Checking whether changes are still there...
245         msg = self.ldb.search(expression="(cn=Niemand)")
246         self.assertTrue(len(msg) >= 1)
247         self.assertEquals(str(msg[0]["cn"]), "Niemand")
248         self.assertEquals(str(msg[0]["description"]), "Blah")
249
250         # Modifying attribute...
251         self.ldb.modify_ldif("""
252 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
253 changetype: modify
254 replace: description
255 description: Blie
256 """)
257
258         # Checking whether changes are still there...
259         msg = self.ldb.search(expression="(cn=Niemand)")
260         self.assertTrue(len(msg) >= 1)
261         self.assertEquals(str(msg[0]["description"]), "Blie")
262
263         # Deleting attribute...
264         self.ldb.modify_ldif("""
265 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
266 changetype: modify
267 delete: description
268 """)
269
270         # Checking whether changes are no longer there...
271         msg = self.ldb.search(expression="(cn=Niemand)")
272         self.assertTrue(len(msg) >= 1)
273         self.assertTrue(not "description" in msg[0])
274
275         # Renaming record...
276         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", 
277                         "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
278
279         # Checking whether DN has changed...
280         msg = self.ldb.search(expression="(cn=Niemand2)")
281         self.assertEquals(len(msg), 1)
282         self.assertEquals(str(msg[0].dn), 
283                           "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
284
285         # Deleting record...
286         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
287
288         # Checking whether record is gone...
289         msg = self.ldb.search(expression="(cn=Niemand2)")
290         self.assertEquals(len(msg), 0)
291
292
293 class MapTestCase(MapBaseTestCase):
294
295     def setUp(self):
296         super(MapTestCase, self).setUp()
297         ldb = Ldb(self.ldburl, lp=cmdline_loadparm, session_info=system_session())
298         ldif = read_datafile("provision_samba3sam.ldif")
299         ldb.add_ldif(self.samba4.subst(ldif))
300         self.setup_modules(ldb, self.samba3, self.samba4)
301         del ldb
302         self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm, session_info=system_session())
303
304     def test_map_search(self):
305         """Running search tests on mapped data."""
306         self.samba3.db.add({
307             "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
308             "objectclass": ["sambaDomain", "top"],
309             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
310             "sambaNextRid": "2000",
311             "sambaDomainName": "TESTS"
312             })
313
314         # Add a set of split records
315         self.ldb.add_ldif("""
316 dn: """+ self.samba4.dn("cn=Domain Users") + """
317 objectClass: group
318 cn: Domain Users
319 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
320 """)
321
322         # Add a set of split records
323         self.ldb.add_ldif("""
324 dn: """+ self.samba4.dn("cn=X") + """
325 objectClass: user
326 cn: X
327 codePage: x
328 revision: x
329 dnsHostName: x
330 nextRid: y
331 lastLogon: x
332 description: x
333 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
334 """)
335
336         self.ldb.add({
337             "dn": self.samba4.dn("cn=Y"),
338             "objectClass": "top",
339             "cn": "Y",
340             "codePage": "x",
341             "revision": "x",
342             "dnsHostName": "y",
343             "nextRid": "y",
344             "lastLogon": "y",
345             "description": "x"})
346
347         self.ldb.add({
348             "dn": self.samba4.dn("cn=Z"),
349             "objectClass": "top",
350             "cn": "Z",
351             "codePage": "x",
352             "revision": "y",
353             "dnsHostName": "z",
354             "nextRid": "y",
355             "lastLogon": "z",
356             "description": "y"})
357
358         # Add a set of remote records
359
360         self.samba3.db.add({
361             "dn": self.samba3.dn("cn=A"),
362             "objectClass": "posixAccount",
363             "cn": "A",
364             "sambaNextRid": "x",
365             "sambaBadPasswordCount": "x", 
366             "sambaLogonTime": "x",
367             "description": "x",
368             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
369             "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
370
371         self.samba3.db.add({
372             "dn": self.samba3.dn("cn=B"),
373             "objectClass": "top",
374             "cn": "B",
375             "sambaNextRid": "x",
376             "sambaBadPasswordCount": "x",
377             "sambaLogonTime": "y",
378             "description": "x"})
379
380         self.samba3.db.add({
381             "dn": self.samba3.dn("cn=C"),
382             "objectClass": "top",
383             "cn": "C",
384             "sambaNextRid": "x",
385             "sambaBadPasswordCount": "y",
386             "sambaLogonTime": "z",
387             "description": "y"})
388
389         # Testing search by DN
390
391         # Search remote record by local DN
392         dn = self.samba4.dn("cn=A")
393         res = self.ldb.search(dn, scope=SCOPE_BASE, 
394                 attrs=["dnsHostName", "lastLogon"])
395         self.assertEquals(len(res), 1)
396         self.assertEquals(str(res[0].dn), dn)
397         self.assertTrue(not "dnsHostName" in res[0])
398         self.assertEquals(str(res[0]["lastLogon"]), "x")
399
400         # Search remote record by remote DN
401         dn = self.samba3.dn("cn=A")
402         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
403                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
404         self.assertEquals(len(res), 1)
405         self.assertEquals(str(res[0].dn), dn)
406         self.assertTrue(not "dnsHostName" in res[0])
407         self.assertTrue(not "lastLogon" in res[0])
408         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
409
410         # Search split record by local DN
411         dn = self.samba4.dn("cn=X")
412         res = self.ldb.search(dn, scope=SCOPE_BASE, 
413                 attrs=["dnsHostName", "lastLogon"])
414         self.assertEquals(len(res), 1)
415         self.assertEquals(str(res[0].dn), dn)
416         self.assertEquals(str(res[0]["dnsHostName"]), "x")
417         self.assertEquals(str(res[0]["lastLogon"]), "x")
418
419         # Search split record by remote DN
420         dn = self.samba3.dn("cn=X")
421         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
422                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
423         self.assertEquals(len(res), 1)
424         self.assertEquals(str(res[0].dn), dn)
425         self.assertTrue(not "dnsHostName" in res[0])
426         self.assertTrue(not "lastLogon" in res[0])
427         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
428
429         # Testing search by attribute
430
431         # Search by ignored attribute
432         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, 
433                 attrs=["dnsHostName", "lastLogon"])
434         self.assertEquals(len(res), 2)
435         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
436         self.assertEquals(str(res[0]["dnsHostName"]), "y")
437         self.assertEquals(str(res[0]["lastLogon"]), "y")
438         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
439         self.assertEquals(str(res[1]["dnsHostName"]), "x")
440         self.assertEquals(str(res[1]["lastLogon"]), "x")
441
442         # Search by kept attribute
443         res = self.ldb.search(expression="(description=y)", 
444                 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
445         self.assertEquals(len(res), 2)
446         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
447         self.assertEquals(str(res[0]["dnsHostName"]), "z")
448         self.assertEquals(str(res[0]["lastLogon"]), "z")
449         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
450         self.assertTrue(not "dnsHostName" in res[1])
451         self.assertEquals(str(res[1]["lastLogon"]), "z")
452
453         # Search by renamed attribute
454         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
455                               attrs=["dnsHostName", "lastLogon"])
456         self.assertEquals(len(res), 2)
457         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
458         self.assertTrue(not "dnsHostName" in res[0])
459         self.assertEquals(str(res[0]["lastLogon"]), "y")
460         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
461         self.assertTrue(not "dnsHostName" in res[1])
462         self.assertEquals(str(res[1]["lastLogon"]), "x")
463
464         # Search by converted attribute
465         # TODO:
466         #   Using the SID directly in the parse tree leads to conversion
467         #   errors, letting the search fail with no results.
468         #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
469         res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
470         self.assertEquals(len(res), 4)
471         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
472         self.assertEquals(str(res[0]["dnsHostName"]), "x")
473         self.assertEquals(str(res[0]["lastLogon"]), "x")
474         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", 
475                              res[0]["objectSid"])
476         self.assertTrue("objectSid" in res[0])
477         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
478         self.assertTrue(not "dnsHostName" in res[1])
479         self.assertEquals(str(res[1]["lastLogon"]), "x")
480         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
481                              res[1]["objectSid"])
482         self.assertTrue("objectSid" in res[1])
483
484         # Search by generated attribute 
485         # In most cases, this even works when the mapping is missing
486         # a `convert_operator' by enumerating the remote db.
487         res = self.ldb.search(expression="(primaryGroupID=512)", 
488                            attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
489         self.assertEquals(len(res), 1)
490         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
491         self.assertTrue(not "dnsHostName" in res[0])
492         self.assertEquals(str(res[0]["lastLogon"]), "x")
493         self.assertEquals(str(res[0]["primaryGroupID"]), "512")
494
495         # Note that Xs "objectSid" seems to be fine in the previous search for
496         # "objectSid"...
497         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
498         #print len(res) + " results found"
499         #for i in range(len(res)):
500         #    for (obj in res[i]) {
501         #        print obj + ": " + res[i][obj]
502         #    }
503         #    print "---"
504         #    
505
506         # Search by remote name of renamed attribute */
507         res = self.ldb.search(expression="(sambaBadPasswordCount=*)", 
508                               attrs=["dnsHostName", "lastLogon"])
509         self.assertEquals(len(res), 0)
510
511         # Search by objectClass
512         attrs = ["dnsHostName", "lastLogon", "objectClass"]
513         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
514         self.assertEquals(len(res), 2)
515         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
516         self.assertEquals(str(res[0]["dnsHostName"]), "x")
517         self.assertEquals(str(res[0]["lastLogon"]), "x")
518         self.assertEquals(str(res[0]["objectClass"][0]), "user")
519         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
520         self.assertTrue(not "dnsHostName" in res[1])
521         self.assertEquals(str(res[1]["lastLogon"]), "x")
522         self.assertEquals(str(res[1]["objectClass"][0]), "user")
523
524         # Prove that the objectClass is actually used for the search
525         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
526                               attrs=attrs)
527         self.assertEquals(len(res), 3)
528         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
529         self.assertTrue(not "dnsHostName" in res[0])
530         self.assertEquals(str(res[0]["lastLogon"]), "y")
531         self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
532         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
533         self.assertEquals(str(res[1]["dnsHostName"]), "x")
534         self.assertEquals(str(res[1]["lastLogon"]), "x")
535         self.assertEquals(str(res[1]["objectClass"][0]), "user")
536         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
537         self.assertTrue(not "dnsHostName" in res[2])
538         self.assertEquals(str(res[2]["lastLogon"]), "x")
539         self.assertEquals(res[2]["objectClass"][0], "user")
540
541         # Testing search by parse tree
542
543         # Search by conjunction of local attributes
544         res = self.ldb.search(expression="(&(codePage=x)(revision=x))", 
545                               attrs=["dnsHostName", "lastLogon"])
546         self.assertEquals(len(res), 2)
547         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
548         self.assertEquals(str(res[0]["dnsHostName"]), "y")
549         self.assertEquals(str(res[0]["lastLogon"]), "y")
550         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
551         self.assertEquals(str(res[1]["dnsHostName"]), "x")
552         self.assertEquals(str(res[1]["lastLogon"]), "x")
553
554         # Search by conjunction of remote attributes
555         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", 
556                               attrs=["dnsHostName", "lastLogon"])
557         self.assertEquals(len(res), 2)
558         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
559         self.assertEquals(str(res[0]["dnsHostName"]), "x")
560         self.assertEquals(str(res[0]["lastLogon"]), "x")
561         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
562         self.assertTrue(not "dnsHostName" in res[1])
563         self.assertEquals(str(res[1]["lastLogon"]), "x")
564         
565         # Search by conjunction of local and remote attribute 
566         res = self.ldb.search(expression="(&(codePage=x)(description=x))", 
567                               attrs=["dnsHostName", "lastLogon"])
568         self.assertEquals(len(res), 2)
569         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
570         self.assertEquals(str(res[0]["dnsHostName"]), "y")
571         self.assertEquals(str(res[0]["lastLogon"]), "y")
572         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
573         self.assertEquals(str(res[1]["dnsHostName"]), "x")
574         self.assertEquals(str(res[1]["lastLogon"]), "x")
575
576         # Search by conjunction of local and remote attribute w/o match
577         attrs = ["dnsHostName", "lastLogon"]
578         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", 
579                               attrs=attrs)
580         self.assertEquals(len(res), 0)
581         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", 
582                               attrs=attrs)
583         self.assertEquals(len(res), 0)
584
585         # Search by disjunction of local attributes
586         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", 
587                               attrs=["dnsHostName", "lastLogon"])
588         self.assertEquals(len(res), 2)
589         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
590         self.assertEquals(str(res[0]["dnsHostName"]), "y")
591         self.assertEquals(str(res[0]["lastLogon"]), "y")
592         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
593         self.assertEquals(str(res[1]["dnsHostName"]), "x")
594         self.assertEquals(str(res[1]["lastLogon"]), "x")
595
596         # Search by disjunction of remote attributes
597         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", 
598                               attrs=["dnsHostName", "lastLogon"])
599         self.assertEquals(len(res), 3)
600         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
601         self.assertFalse("dnsHostName" in res[0])
602         self.assertEquals(str(res[0]["lastLogon"]), "y")
603         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
604         self.assertEquals(str(res[1]["dnsHostName"]), "x")
605         self.assertEquals(str(res[1]["lastLogon"]), "x")
606         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
607         self.assertFalse("dnsHostName" in res[2])
608         self.assertEquals(str(res[2]["lastLogon"]), "x")
609
610         # Search by disjunction of local and remote attribute
611         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", 
612                               attrs=["dnsHostName", "lastLogon"])
613         self.assertEquals(len(res), 3)
614         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
615         self.assertEquals(str(res[0]["dnsHostName"]), "y")
616         self.assertEquals(str(res[0]["lastLogon"]), "y")
617         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
618         self.assertFalse("dnsHostName" in res[1])
619         self.assertEquals(str(res[1]["lastLogon"]), "y")
620         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
621         self.assertEquals(str(res[2]["dnsHostName"]), "x")
622         self.assertEquals(str(res[2]["lastLogon"]), "x")
623
624         # Search by disjunction of local and remote attribute w/o match
625         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", 
626                               attrs=["dnsHostName", "lastLogon"])
627         self.assertEquals(len(res), 0)
628
629         # Search by negated local attribute
630         res = self.ldb.search(expression="(!(revision=x))", 
631                               attrs=["dnsHostName", "lastLogon"])
632         self.assertEquals(len(res), 6)
633         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
634         self.assertTrue(not "dnsHostName" in res[0])
635         self.assertEquals(str(res[0]["lastLogon"]), "y")
636         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
637         self.assertTrue(not "dnsHostName" in res[1])
638         self.assertEquals(str(res[1]["lastLogon"]), "x")
639         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
640         self.assertEquals(str(res[2]["dnsHostName"]), "z")
641         self.assertEquals(str(res[2]["lastLogon"]), "z")
642         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
643         self.assertTrue(not "dnsHostName" in res[3])
644         self.assertEquals(str(res[3]["lastLogon"]), "z")
645
646         # Search by negated remote attribute
647         res = self.ldb.search(expression="(!(description=x))", 
648                               attrs=["dnsHostName", "lastLogon"])
649         self.assertEquals(len(res), 4)
650         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
651         self.assertEquals(str(res[0]["dnsHostName"]), "z")
652         self.assertEquals(str(res[0]["lastLogon"]), "z")
653         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
654         self.assertTrue(not "dnsHostName" in res[1])
655         self.assertEquals(str(res[1]["lastLogon"]), "z")
656
657         # Search by negated conjunction of local attributes
658         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", 
659                               attrs=["dnsHostName", "lastLogon"])
660         self.assertEquals(len(res), 6)
661         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
662         self.assertTrue(not "dnsHostName" in res[0])
663         self.assertEquals(str(res[0]["lastLogon"]), "y")
664         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
665         self.assertTrue(not "dnsHostName" in res[1])
666         self.assertEquals(str(res[1]["lastLogon"]), "x")
667         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
668         self.assertEquals(str(res[2]["dnsHostName"]), "z")
669         self.assertEquals(str(res[2]["lastLogon"]), "z")
670         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
671         self.assertTrue(not "dnsHostName" in res[3])
672         self.assertEquals(str(res[3]["lastLogon"]), "z")
673
674         # Search by negated conjunction of remote attributes
675         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", 
676                               attrs=["dnsHostName", "lastLogon"])
677         self.assertEquals(len(res), 6)
678         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
679         self.assertEquals(str(res[0]["dnsHostName"]), "y")
680         self.assertEquals(str(res[0]["lastLogon"]), "y")
681         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
682         self.assertTrue(not "dnsHostName" in res[1])
683         self.assertEquals(str(res[1]["lastLogon"]), "y")
684         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
685         self.assertEquals(str(res[2]["dnsHostName"]), "z")
686         self.assertEquals(str(res[2]["lastLogon"]), "z")
687         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
688         self.assertTrue(not "dnsHostName" in res[3])
689         self.assertEquals(str(res[3]["lastLogon"]), "z")
690
691         # Search by negated conjunction of local and remote attribute
692         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", 
693                               attrs=["dnsHostName", "lastLogon"])
694         self.assertEquals(len(res), 6)
695         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
696         self.assertTrue(not "dnsHostName" in res[0])
697         self.assertEquals(str(res[0]["lastLogon"]), "y")
698         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
699         self.assertTrue(not "dnsHostName" in res[1])
700         self.assertEquals(str(res[1]["lastLogon"]), "x")
701         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
702         self.assertEquals(str(res[2]["dnsHostName"]), "z")
703         self.assertEquals(str(res[2]["lastLogon"]), "z")
704         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
705         self.assertTrue(not "dnsHostName" in res[3])
706         self.assertEquals(str(res[3]["lastLogon"]), "z")
707
708         # Search by negated disjunction of local attributes
709         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", 
710                               attrs=["dnsHostName", "lastLogon"])
711         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
712         self.assertTrue(not "dnsHostName" in res[0])
713         self.assertEquals(str(res[0]["lastLogon"]), "y")
714         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
715         self.assertTrue(not "dnsHostName" in res[1])
716         self.assertEquals(str(res[1]["lastLogon"]), "x")
717         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
718         self.assertEquals(str(res[2]["dnsHostName"]), "z")
719         self.assertEquals(str(res[2]["lastLogon"]), "z")
720         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
721         self.assertTrue(not "dnsHostName" in res[3])
722         self.assertEquals(str(res[3]["lastLogon"]), "z")
723
724         # Search by negated disjunction of remote attributes
725         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", 
726                               attrs=["dnsHostName", "lastLogon"])
727         self.assertEquals(len(res), 5)
728         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
729         self.assertEquals(str(res[0]["dnsHostName"]), "y")
730         self.assertEquals(str(res[0]["lastLogon"]), "y")
731         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
732         self.assertEquals(str(res[1]["dnsHostName"]), "z")
733         self.assertEquals(str(res[1]["lastLogon"]), "z")
734         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
735         self.assertTrue(not "dnsHostName" in res[2])
736         self.assertEquals(str(res[2]["lastLogon"]), "z")
737
738         # Search by negated disjunction of local and remote attribute
739         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", 
740                               attrs=["dnsHostName", "lastLogon"])
741         self.assertEquals(len(res), 5)
742         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
743         self.assertTrue(not "dnsHostName" in res[0])
744         self.assertEquals(str(res[0]["lastLogon"]), "x")
745         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
746         self.assertEquals(str(res[1]["dnsHostName"]), "z")
747         self.assertEquals(str(res[1]["lastLogon"]), "z")
748         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
749         self.assertTrue(not "dnsHostName" in res[2])
750         self.assertEquals(str(res[2]["lastLogon"]), "z")
751
752         # Search by complex parse tree
753         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
754         self.assertEquals(len(res), 7)
755         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
756         self.assertTrue(not "dnsHostName" in res[0])
757         self.assertEquals(str(res[0]["lastLogon"]), "y")
758         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
759         self.assertEquals(str(res[1]["dnsHostName"]), "x")
760         self.assertEquals(str(res[1]["lastLogon"]), "x")
761         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
762         self.assertTrue(not "dnsHostName" in res[2])
763         self.assertEquals(str(res[2]["lastLogon"]), "x")
764         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
765         self.assertEquals(str(res[3]["dnsHostName"]), "z")
766         self.assertEquals(str(res[3]["lastLogon"]), "z")
767         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
768         self.assertTrue(not "dnsHostName" in res[4])
769         self.assertEquals(str(res[4]["lastLogon"]), "z")
770
771         # Clean up
772         dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
773         for dn in dns:
774             self.ldb.delete(dn)
775
776     def test_map_modify_local(self):
777         """Modification of local records."""
778         # Add local record
779         dn = "cn=test,dc=idealx,dc=org"
780         self.ldb.add({"dn": dn, 
781                  "cn": "test",
782                  "foo": "bar",
783                  "revision": "1",
784                  "description": "test"})
785         # Check it's there
786         attrs = ["foo", "revision", "description"]
787         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
788         self.assertEquals(len(res), 1)
789         self.assertEquals(str(res[0].dn), dn)
790         self.assertEquals(str(res[0]["foo"]), "bar")
791         self.assertEquals(str(res[0]["revision"]), "1")
792         self.assertEquals(str(res[0]["description"]), "test")
793         # Check it's not in the local db
794         res = self.samba4.db.search(expression="(cn=test)", 
795                                     scope=SCOPE_DEFAULT, attrs=attrs)
796         self.assertEquals(len(res), 0)
797         # Check it's not in the remote db
798         res = self.samba3.db.search(expression="(cn=test)", 
799                                     scope=SCOPE_DEFAULT, attrs=attrs)
800         self.assertEquals(len(res), 0)
801
802         # Modify local record
803         ldif = """
804 dn: """ + dn + """
805 replace: foo
806 foo: baz
807 replace: description
808 description: foo
809 """
810         self.ldb.modify_ldif(ldif)
811         # Check in local db
812         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
813         self.assertEquals(len(res), 1)
814         self.assertEquals(str(res[0].dn), dn)
815         self.assertEquals(str(res[0]["foo"]), "baz")
816         self.assertEquals(str(res[0]["revision"]), "1")
817         self.assertEquals(str(res[0]["description"]), "foo")
818
819         # Rename local record
820         dn2 = "cn=toast,dc=idealx,dc=org"
821         self.ldb.rename(dn, dn2)
822         # Check in local db
823         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
824         self.assertEquals(len(res), 1)
825         self.assertEquals(str(res[0].dn), dn2)
826         self.assertEquals(str(res[0]["foo"]), "baz")
827         self.assertEquals(str(res[0]["revision"]), "1")
828         self.assertEquals(str(res[0]["description"]), "foo")
829
830         # Delete local record
831         self.ldb.delete(dn2)
832         # Check it's gone
833         res = self.ldb.search(dn2, scope=SCOPE_BASE)
834         self.assertEquals(len(res), 0)
835
836     def test_map_modify_remote_remote(self):
837         """Modification of remote data of remote records"""
838         # Add remote record
839         dn = self.samba4.dn("cn=test")
840         dn2 = self.samba3.dn("cn=test")
841         self.samba3.db.add({"dn": dn2, 
842                    "cn": "test",
843                    "description": "foo",
844                    "sambaBadPasswordCount": "3",
845                    "sambaNextRid": "1001"})
846         # Check it's there
847         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
848                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
849         self.assertEquals(len(res), 1)
850         self.assertEquals(str(res[0].dn), dn2)
851         self.assertEquals(str(res[0]["description"]), "foo")
852         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
853         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
854         # Check in mapped db
855         attrs = ["description", "badPwdCount", "nextRid"]
856         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
857         self.assertEquals(len(res), 1)
858         self.assertEquals(str(res[0].dn), dn)
859         self.assertEquals(str(res[0]["description"]), "foo")
860         self.assertEquals(str(res[0]["badPwdCount"]), "3")
861         self.assertEquals(str(res[0]["nextRid"]), "1001")
862         # Check in local db
863         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
864         self.assertEquals(len(res), 0)
865
866         # Modify remote data of remote record
867         ldif = """
868 dn: """ + dn + """
869 replace: description
870 description: test
871 replace: badPwdCount
872 badPwdCount: 4
873 """
874         self.ldb.modify_ldif(ldif)
875         # Check in mapped db
876         res = self.ldb.search(dn, scope=SCOPE_BASE, 
877                 attrs=["description", "badPwdCount", "nextRid"])
878         self.assertEquals(len(res), 1)
879         self.assertEquals(str(res[0].dn), dn)
880         self.assertEquals(str(res[0]["description"]), "test")
881         self.assertEquals(str(res[0]["badPwdCount"]), "4")
882         self.assertEquals(str(res[0]["nextRid"]), "1001")
883         # Check in remote db
884         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
885                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
886         self.assertEquals(len(res), 1)
887         self.assertEquals(str(res[0].dn), dn2)
888         self.assertEquals(str(res[0]["description"]), "test")
889         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
890         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
891
892         # Rename remote record
893         dn2 = self.samba4.dn("cn=toast")
894         self.ldb.rename(dn, dn2)
895         # Check in mapped db
896         dn = dn2
897         res = self.ldb.search(dn, scope=SCOPE_BASE, 
898                 attrs=["description", "badPwdCount", "nextRid"])
899         self.assertEquals(len(res), 1)
900         self.assertEquals(str(res[0].dn), dn)
901         self.assertEquals(str(res[0]["description"]), "test")
902         self.assertEquals(str(res[0]["badPwdCount"]), "4")
903         self.assertEquals(str(res[0]["nextRid"]), "1001")
904         # Check in remote db 
905         dn2 = self.samba3.dn("cn=toast")
906         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
907                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
908         self.assertEquals(len(res), 1)
909         self.assertEquals(str(res[0].dn), dn2)
910         self.assertEquals(str(res[0]["description"]), "test")
911         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
912         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
913
914         # Delete remote record
915         self.ldb.delete(dn)
916         # Check in mapped db that it's removed
917         res = self.ldb.search(dn, scope=SCOPE_BASE)
918         self.assertEquals(len(res), 0)
919         # Check in remote db
920         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
921         self.assertEquals(len(res), 0)
922
923     def test_map_modify_remote_local(self):
924         """Modification of local data of remote records"""
925         # Add remote record (same as before)
926         dn = self.samba4.dn("cn=test")
927         dn2 = self.samba3.dn("cn=test")
928         self.samba3.db.add({"dn": dn2, 
929                    "cn": "test",
930                    "description": "foo",
931                    "sambaBadPasswordCount": "3",
932                    "sambaNextRid": "1001"})
933
934         # Modify local data of remote record
935         ldif = """
936 dn: """ + dn + """
937 add: revision
938 revision: 1
939 replace: description
940 description: test
941
942 """
943         self.ldb.modify_ldif(ldif)
944         # Check in mapped db
945         attrs = ["revision", "description"]
946         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
947         self.assertEquals(len(res), 1)
948         self.assertEquals(str(res[0].dn), dn)
949         self.assertEquals(str(res[0]["description"]), "test")
950         self.assertEquals(str(res[0]["revision"]), "1")
951         # Check in remote db
952         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
953         self.assertEquals(len(res), 1)
954         self.assertEquals(str(res[0].dn), dn2)
955         self.assertEquals(str(res[0]["description"]), "test")
956         self.assertTrue(not "revision" in res[0])
957         # Check in local db
958         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
959         self.assertEquals(len(res), 1)
960         self.assertEquals(str(res[0].dn), dn)
961         self.assertTrue(not "description" in res[0])
962         self.assertEquals(str(res[0]["revision"]), "1")
963
964         # Delete (newly) split record
965         self.ldb.delete(dn)
966
967     def test_map_modify_split(self):
968         """Testing modification of split records"""
969         # Add split record
970         dn = self.samba4.dn("cn=test")
971         dn2 = self.samba3.dn("cn=test")
972         self.ldb.add({
973             "dn": dn,
974             "cn": "test",
975             "description": "foo",
976             "badPwdCount": "3",
977             "nextRid": "1001",
978             "revision": "1"})
979         # Check it's there
980         attrs = ["description", "badPwdCount", "nextRid", "revision"]
981         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
982         self.assertEquals(len(res), 1)
983         self.assertEquals(str(res[0].dn), dn)
984         self.assertEquals(str(res[0]["description"]), "foo")
985         self.assertEquals(str(res[0]["badPwdCount"]), "3")
986         self.assertEquals(str(res[0]["nextRid"]), "1001")
987         self.assertEquals(str(res[0]["revision"]), "1")
988         # Check in local db
989         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
990         self.assertEquals(len(res), 1)
991         self.assertEquals(str(res[0].dn), dn)
992         self.assertTrue(not "description" in res[0])
993         self.assertTrue(not "badPwdCount" in res[0])
994         self.assertTrue(not "nextRid" in res[0])
995         self.assertEquals(str(res[0]["revision"]), "1")
996         # Check in remote db
997         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
998                  "revision"]
999         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1000         self.assertEquals(len(res), 1)
1001         self.assertEquals(str(res[0].dn), dn2)
1002         self.assertEquals(str(res[0]["description"]), "foo")
1003         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1004         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1005         self.assertTrue(not "revision" in res[0])
1006
1007         # Modify of split record
1008         ldif = """
1009 dn: """ + dn + """
1010 replace: description
1011 description: test
1012 replace: badPwdCount
1013 badPwdCount: 4
1014 replace: revision
1015 revision: 2
1016 """
1017         self.ldb.modify_ldif(ldif)
1018         # Check in mapped db
1019         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1020         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1021         self.assertEquals(len(res), 1)
1022         self.assertEquals(str(res[0].dn), dn)
1023         self.assertEquals(str(res[0]["description"]), "test")
1024         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1025         self.assertEquals(str(res[0]["nextRid"]), "1001")
1026         self.assertEquals(str(res[0]["revision"]), "2")
1027         # Check in local db
1028         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1029         self.assertEquals(len(res), 1)
1030         self.assertEquals(str(res[0].dn), dn)
1031         self.assertTrue(not "description" in res[0])
1032         self.assertTrue(not "badPwdCount" in res[0])
1033         self.assertTrue(not "nextRid" in res[0])
1034         self.assertEquals(str(res[0]["revision"]), "2")
1035         # Check in remote db
1036         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
1037                  "revision"]
1038         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1039         self.assertEquals(len(res), 1)
1040         self.assertEquals(str(res[0].dn), dn2)
1041         self.assertEquals(str(res[0]["description"]), "test")
1042         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1043         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1044         self.assertTrue(not "revision" in res[0])
1045
1046         # Rename split record
1047         dn2 = self.samba4.dn("cn=toast")
1048         self.ldb.rename(dn, dn2)
1049         # Check in mapped db
1050         dn = dn2
1051         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1052         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1053         self.assertEquals(len(res), 1)
1054         self.assertEquals(str(res[0].dn), dn)
1055         self.assertEquals(str(res[0]["description"]), "test")
1056         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1057         self.assertEquals(str(res[0]["nextRid"]), "1001")
1058         self.assertEquals(str(res[0]["revision"]), "2")
1059         # Check in local db
1060         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1061         self.assertEquals(len(res), 1)
1062         self.assertEquals(str(res[0].dn), dn)
1063         self.assertTrue(not "description" in res[0])
1064         self.assertTrue(not "badPwdCount" in res[0])
1065         self.assertTrue(not "nextRid" in res[0])
1066         self.assertEquals(str(res[0]["revision"]), "2")
1067         # Check in remote db
1068         dn2 = self.samba3.dn("cn=toast")
1069         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
1070           attrs=["description", "sambaBadPasswordCount", "sambaNextRid", 
1071                  "revision"])
1072         self.assertEquals(len(res), 1)
1073         self.assertEquals(str(res[0].dn), dn2)
1074         self.assertEquals(str(res[0]["description"]), "test")
1075         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1076         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1077         self.assertTrue(not "revision" in res[0])
1078
1079         # Delete split record
1080         self.ldb.delete(dn)
1081         # Check in mapped db
1082         res = self.ldb.search(dn, scope=SCOPE_BASE)
1083         self.assertEquals(len(res), 0)
1084         # Check in local db
1085         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1086         self.assertEquals(len(res), 0)
1087         # Check in remote db
1088         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1089         self.assertEquals(len(res), 0)