8ea566c7b3599d554f73b54dfb2e7fdd9cf9528e
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
24
25 import os
26 import ldb
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir
30
31 datadir = os.path.join(os.path.dirname(__file__), 
32                        "../../../../../testdata/samba3")
33
34 def read_datafile(filename):
35     return open(os.path.join(datadir, filename), 'r').read()
36
37 def ldb_debug(l, text):
38     print text
39
40
41 class MapBaseTestCase(TestCaseInTempDir):
42     """Base test case for mapping tests."""
43
44     def setup_modules(self, ldb, s3, s4):
45         ldb.add({"dn": "@MAP=samba3sam",
46                  "@FROM": s4.basedn,
47                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
48
49         ldb.add({"dn": "@MODULES",
50                  "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
51
52         ldb.add({"dn": "@PARTITION",
53             "partition": ["%s:%s" % (s4.basedn, s4.url), 
54                           "%s:%s" % (s3.basedn, s3.url)],
55             "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
56
57     def setUp(self):
58         super(MapBaseTestCase, self).setUp()
59
60         def make_dn(basedn, rdn):
61             return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
62
63         def make_s4dn(basedn, rdn):
64             return "%s,%s" % (rdn, basedn)
65
66         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
67         self.ldburl = "tdb://" + self.ldbfile
68
69         tempdir = self.tempdir
70
71         class Target:
72             """Simple helper class that contains data for a specific SAM 
73             connection."""
74             def __init__(self, file, basedn, dn):
75                 self.file = os.path.join(tempdir, file)
76                 self.url = "tdb://" + self.file
77                 self.basedn = basedn
78                 self.substvars = {"BASEDN": self.basedn}
79                 self.db = Ldb()
80                 self._dn = dn
81
82             def dn(self, rdn):
83                 return self._dn(self.basedn, rdn)
84
85             def connect(self):
86                 return self.db.connect(self.url)
87
88             def setup_data(self, path):
89                 self.add_ldif(read_datafile(path))
90
91             def subst(self, text):
92                 return substitute_var(text, self.substvars)
93
94             def add_ldif(self, ldif):
95                 self.db.add_ldif(self.subst(ldif))
96
97             def modify_ldif(self, ldif):
98                 self.db.modify_ldif(self.subst(ldif))
99
100         self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
101         self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
102         self.templates = Target("templates.ldb", "cn=templates", None)
103
104         self.samba3.connect()
105         self.templates.connect()
106         self.samba4.connect()
107
108     def tearDown(self):
109         os.unlink(self.ldbfile)
110         os.unlink(self.samba3.file)
111         os.unlink(self.templates.file)
112         os.unlink(self.samba4.file)
113         super(MapBaseTestCase, self).tearDown()
114
115
116 class Samba3SamTestCase(MapBaseTestCase):
117
118     def setUp(self):
119         super(Samba3SamTestCase, self).setUp()
120         ldb = Ldb(self.ldburl)
121         self.samba3.setup_data("samba3.ldif")
122         self.templates.setup_data("provision_samba3sam_templates.ldif")
123         ldif = read_datafile("provision_samba3sam.ldif")
124         ldb.add_ldif(self.samba4.subst(ldif))
125         self.setup_modules(ldb, self.samba3, self.samba4)
126         del ldb
127         self.ldb = Ldb(self.ldburl)
128
129     def test_search_non_mapped(self):
130         """Looking up by non-mapped attribute"""
131         msg = self.ldb.search(expression="(cn=Administrator)")
132         self.assertEquals(len(msg), 1)
133         self.assertEquals(msg[0]["cn"], "Administrator")
134
135     def test_search_non_mapped(self):
136         """Looking up by mapped attribute"""
137         msg = self.ldb.search(expression="(name=Backup Operators)")
138         self.assertEquals(len(msg), 1)
139         self.assertEquals(msg[0]["name"], "Backup Operators")
140
141     def test_old_name_of_renamed(self):
142         """Looking up by old name of renamed attribute"""
143         msg = self.ldb.search(expression="(displayName=Backup Operators)")
144         self.assertEquals(len(msg), 0)
145
146     def test_mapped_containing_sid(self):
147         """Looking up mapped entry containing SID"""
148         msg = self.ldb.search(expression="(cn=Replicator)")
149         self.assertEquals(len(msg), 1)
150         self.assertEquals(str(msg[0].dn), 
151                           "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
152         self.assertTrue("objectSid" in msg[0]) 
153         # FIXME: NDR unpack msg[0]["objectSid"] before comparing:
154         # self.assertEquals(msg[0]["objectSid"], 
155         #                   "S-1-5-21-4231626423-2410014848-2360679739-552")
156         # Check mapping of objectClass
157         oc = set(msg[0]["objectClass"])
158         self.assertEquals(oc, set(["group"]))
159
160     def test_search_by_objclass(self):
161         """Looking up by objectClass"""
162         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
163         self.assertEquals(set([str(m.dn) for m in msg]), 
164                 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", 
165                      "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
166
167     def test_s3sam_modify(self):
168         # Adding a record that will be fallbacked
169         self.ldb.add({"dn": "cn=Foo", 
170             "foo": "bar", 
171             "blah": "Blie", 
172             "cn": "Foo", 
173             "showInAdvancedViewOnly": "TRUE"}
174             )
175
176         # Checking for existence of record (local)
177         # TODO: This record must be searched in the local database, which is 
178         # currently only supported for base searches
179         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
180         # TODO: Actually, this version should work as well but doesn't...
181         # 
182         #    
183         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", 
184                 scope=SCOPE_BASE, 
185                 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
186         self.assertEquals(len(msg), 1)
187         self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
188         self.assertEquals(msg[0]["foo"], "bar")
189         self.assertEquals(msg[0]["blah"], "Blie")
190
191         # Adding record that will be mapped
192         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
193                  "objectClass": "user",
194                  "unixName": "bin",
195                  "sambaUnicodePwd": "geheim",
196                  "cn": "Niemand"})
197
198         # Checking for existence of record (remote)
199         msg = self.ldb.search(expression="(unixName=bin)", 
200                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
201         self.assertEquals(len(msg), 1)
202         self.assertEquals(msg[0]["cn"], "Niemand")
203         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
204
205         # Checking for existence of record (local && remote)
206         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
207                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
208         self.assertEquals(len(msg), 1)           # TODO: should check with more records
209         self.assertEquals(msg[0]["cn"], "Niemand")
210         self.assertEquals(msg[0]["unixName"], "bin")
211         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
212
213         # Checking for existence of record (local || remote)
214         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
215                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
216         #print "got %d replies" % len(msg)
217         self.assertEquals(len(msg), 1)        # TODO: should check with more records
218         self.assertEquals(msg[0]["cn"], "Niemand")
219         self.assertEquals(msg[0]["unixName"], "bin")
220         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
221
222         # Checking for data in destination database
223         msg = self.samba3.db.search(expression="(cn=Niemand)")
224         self.assertTrue(len(msg) >= 1)
225         self.assertEquals(msg[0]["sambaSID"], 
226                 "S-1-5-21-4231626423-2410014848-2360679739-2001")
227         self.assertEquals(msg[0]["displayName"], "Niemand")
228
229         # Adding attribute...
230         self.ldb.modify_ldif("""
231 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
232 changetype: modify
233 add: description
234 description: Blah
235 """)
236
237         # Checking whether changes are still there...
238         msg = self.ldb.search(expression="(cn=Niemand)")
239         self.assertTrue(len(msg) >= 1)
240         self.assertEquals(msg[0]["cn"], "Niemand")
241         self.assertEquals(msg[0]["description"], "Blah")
242
243         # Modifying attribute...
244         self.ldb.modify_ldif("""
245 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
246 changetype: modify
247 replace: description
248 description: Blie
249 """)
250
251         # Checking whether changes are still there...
252         msg = self.ldb.search(expression="(cn=Niemand)")
253         self.assertTrue(len(msg) >= 1)
254         self.assertEquals(msg[0]["description"], "Blie")
255
256         # Deleting attribute...
257         self.ldb.modify_ldif("""
258 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
259 changetype: modify
260 delete: description
261 """)
262
263         # Checking whether changes are no longer there...
264         msg = self.ldb.search(expression="(cn=Niemand)")
265         self.assertTrue(len(msg) >= 1)
266         self.assertTrue(not "description" in msg[0])
267
268         # Renaming record...
269         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", 
270                         "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
271
272         # Checking whether DN has changed...
273         msg = self.ldb.search(expression="(cn=Niemand2)")
274         self.assertEquals(len(msg), 1)
275         self.assertEquals(str(msg[0].dn), 
276                           "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
277
278         # Deleting record...
279         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
280
281         # Checking whether record is gone...
282         msg = self.ldb.search(expression="(cn=Niemand2)")
283         self.assertEquals(len(msg), 0)
284
285
286 class MapTestCase(MapBaseTestCase):
287
288     def setUp(self):
289         super(MapTestCase, self).setUp()
290         ldb = Ldb(self.ldburl)
291         self.templates.setup_data("provision_samba3sam_templates.ldif")
292         ldif = read_datafile("provision_samba3sam.ldif")
293         ldb.add_ldif(self.samba4.subst(ldif))
294         self.setup_modules(ldb, self.samba3, self.samba4)
295         del ldb
296         self.ldb = Ldb(self.ldburl)
297
298     def test_map_search(self):
299         """Running search tests on mapped data."""
300         ldif = """
301 """
302         self.samba3.db.add({
303             "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
304             "objectclass": ["sambaDomain", "top"],
305             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
306             "sambaNextRid": "2000",
307             "sambaDomainName": "TESTS"
308             })
309
310         # Add a set of split records
311         self.ldb.add({
312             "dn": self.samba4.dn("cn=X"),
313             "objectClass": "user",
314             "cn": "X",
315             "codePage": "x",
316             "revision": "x",
317             "dnsHostName": "x",
318             "nextRid": "y",
319             "lastLogon": "x",
320             "description": "x",
321             "objectSid": "S-1-5-21-4231626423-2410014848-2360679739-552",
322             "primaryGroupID": "1-5-21-4231626423-2410014848-2360679739-512"})
323
324         self.ldb.add({
325             "dn": self.samba4.dn("cn=Y"),
326             "objectClass": "top",
327             "cn": "Y",
328             "codePage": "x",
329             "revision": "x",
330             "dnsHostName": "y",
331             "nextRid": "y",
332             "lastLogon": "y",
333             "description": "x"})
334
335         self.ldb.add({
336             "dn": self.samba4.dn("cn=Z"),
337             "objectClass": "top",
338             "cn": "Z",
339             "codePage": "x",
340             "revision": "y",
341             "dnsHostName": "z",
342             "nextRid": "y",
343             "lastLogon": "z",
344             "description": "y"})
345
346         # Add a set of remote records
347
348         self.samba3.db.add({
349             "dn": self.samba3.dn("cn=A"),
350             "objectClass": "posixAccount",
351             "cn": "A",
352             "sambaNextRid": "x",
353             "sambaBadPasswordCount": "x", 
354             "sambaLogonTime": "x",
355             "description": "x",
356             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
357             "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
358
359         self.samba3.db.add({
360             "dn": self.samba3.dn("cn=B"),
361             "objectClass": "top",
362             "cn": "B",
363             "sambaNextRid": "x",
364             "sambaBadPasswordCount": "x",
365             "sambaLogonTime": "y",
366             "description": "x"})
367
368         self.samba3.db.add({
369             "dn": self.samba3.dn("cn=C"),
370             "objectClass": "top",
371             "cn": "C",
372             "sambaNextRid": "x",
373             "sambaBadPasswordCount": "y",
374             "sambaLogonTime": "z",
375             "description": "y"})
376
377         # Testing search by DN
378
379         # Search remote record by local DN
380         dn = self.samba4.dn("cn=A")
381         res = self.ldb.search(dn, scope=SCOPE_BASE, 
382                 attrs=["dnsHostName", "lastLogon"])
383         self.assertEquals(len(res), 1)
384         self.assertEquals(str(res[0].dn), dn)
385         self.assertTrue(not "dnsHostName" in res[0])
386         self.assertEquals(res[0]["lastLogon"], "x")
387
388         # Search remote record by remote DN
389         dn = self.samba3.dn("cn=A")
390         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
391                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
392         self.assertEquals(len(res), 1)
393         self.assertEquals(str(res[0].dn), dn)
394         self.assertTrue(not "dnsHostName" in res[0])
395         self.assertTrue(not "lastLogon" in res[0])
396         self.assertEquals(res[0]["sambaLogonTime"], "x")
397
398         # Search split record by local DN
399         dn = self.samba4.dn("cn=X")
400         res = self.ldb.search(dn, scope=SCOPE_BASE, 
401                 attrs=["dnsHostName", "lastLogon"])
402         self.assertEquals(len(res), 1)
403         self.assertEquals(str(res[0].dn), dn)
404         self.assertEquals(res[0]["dnsHostName"], "x")
405         self.assertEquals(res[0]["lastLogon"], "x")
406
407         # Search split record by remote DN
408         dn = self.samba3.dn("cn=X")
409         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
410                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
411         self.assertEquals(len(res), 1)
412         self.assertEquals(str(res[0].dn), dn)
413         self.assertTrue(not "dnsHostName" in res[0])
414         self.assertTrue(not "lastLogon" in res[0])
415         self.assertEquals(res[0]["sambaLogonTime"], "x")
416
417         # Testing search by attribute
418
419         # Search by ignored attribute
420         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, 
421                 attrs=["dnsHostName", "lastLogon"])
422         self.assertEquals(len(res), 2)
423         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
424         self.assertEquals(res[0]["dnsHostName"], "y")
425         self.assertEquals(res[0]["lastLogon"], "y")
426         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
427         self.assertEquals(res[1]["dnsHostName"], "x")
428         self.assertEquals(res[1]["lastLogon"], "x")
429
430         # Search by kept attribute
431         res = self.ldb.search(expression="(description=y)", 
432                 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
433         self.assertEquals(len(res), 2)
434         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
435         self.assertEquals(res[0]["dnsHostName"], "z")
436         self.assertEquals(res[0]["lastLogon"], "z")
437         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
438         self.assertTrue(not "dnsHostName" in res[1])
439         self.assertEquals(res[1]["lastLogon"], "z")
440
441         # Search by renamed attribute
442         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
443                               attrs=["dnsHostName", "lastLogon"])
444         self.assertEquals(len(res), 2)
445         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
446         self.assertTrue(not "dnsHostName" in res[0])
447         self.assertEquals(res[0]["lastLogon"], "y")
448         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
449         self.assertTrue(not "dnsHostName" in res[1])
450         self.assertEquals(res[1]["lastLogon"], "x")
451
452         # Search by converted attribute
453         # TODO:
454         #   Using the SID directly in the parse tree leads to conversion
455         #   errors, letting the search fail with no results.
456         #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
457         res = self.ldb.search(expression="(objectSid=*)", base=None,
458                               attrs=["dnsHostName", "lastLogon", "objectSid"])
459         self.assertEquals(len(res), 3)
460         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
461         self.assertEquals(res[0]["dnsHostName"], "x")
462         self.assertEquals(res[0]["lastLogon"], "x")
463         self.assertEquals(res[0]["objectSid"], 
464                           "S-1-5-21-4231626423-2410014848-2360679739-552")
465         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
466         self.assertTrue(not "dnsHostName" in res[1])
467         self.assertEquals(res[1]["lastLogon"], "x")
468         self.assertEquals(res[1]["objectSid"], 
469                           "S-1-5-21-4231626423-2410014848-2360679739-552")
470
471         # Search by generated attribute 
472         # In most cases, this even works when the mapping is missing
473         # a `convert_operator' by enumerating the remote db.
474         res = self.ldb.search(expression="(primaryGroupID=512)", 
475                            attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
476         self.assertEquals(len(res), 1)
477         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
478         self.assertTrue(not "dnsHostName" in res[0])
479         self.assertEquals(res[0]["lastLogon"], "x")
480         self.assertEquals(res[0]["primaryGroupID"], "512")
481
482         # TODO: There should actually be two results, A and X.  The
483         # primaryGroupID of X seems to get corrupted somewhere, and the
484         # objectSid isn't available during the generation of remote (!) data,
485         # which can be observed with the following search.  Also note that Xs
486         # objectSid seems to be fine in the previous search for objectSid... */
487         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
488         #print len(res) + " results found"
489         #for i in range(len(res)):
490         #    for (obj in res[i]) {
491         #        print obj + ": " + res[i][obj]
492         #    }
493         #    print "---"
494         #    
495
496         # Search by remote name of renamed attribute */
497         res = self.ldb.search(expression="(sambaBadPasswordCount=*)", 
498                               attrs=["dnsHostName", "lastLogon"])
499         self.assertEquals(len(res), 0)
500
501         # Search by objectClass
502         attrs = ["dnsHostName", "lastLogon", "objectClass"]
503         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
504         self.assertEquals(len(res), 2)
505         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
506         self.assertEquals(res[0]["dnsHostName"], "x")
507         self.assertEquals(res[0]["lastLogon"], "x")
508         self.assertEquals(res[0]["objectClass"][0], "user")
509         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
510         self.assertTrue(not "dnsHostName" in res[1])
511         self.assertEquals(res[1]["lastLogon"], "x")
512         self.assertEquals(res[1]["objectClass"][0], "user")
513
514         # Prove that the objectClass is actually used for the search
515         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
516                               attrs=attrs)
517         self.assertEquals(len(res), 3)
518         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
519         self.assertTrue(not "dnsHostName" in res[0])
520         self.assertEquals(res[0]["lastLogon"], "y")
521         self.assertEquals(set(res[0]["objectClass"]), set(["user"]))
522         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
523         self.assertEquals(res[1]["dnsHostName"], "x")
524         self.assertEquals(res[1]["lastLogon"], "x")
525         self.assertEquals(res[1]["objectClass"][0], "user")
526         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
527         self.assertTrue(not "dnsHostName" in res[2])
528         self.assertEquals(res[2]["lastLogon"], "x")
529         self.assertEquals(res[2]["objectClass"][0], "user")
530
531         # Testing search by parse tree
532
533         # Search by conjunction of local attributes
534         res = self.ldb.search(expression="(&(codePage=x)(revision=x))", 
535                               attrs=["dnsHostName", "lastLogon"])
536         self.assertEquals(len(res), 2)
537         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
538         self.assertEquals(res[0]["dnsHostName"], "y")
539         self.assertEquals(res[0]["lastLogon"], "y")
540         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
541         self.assertEquals(res[1]["dnsHostName"], "x")
542         self.assertEquals(res[1]["lastLogon"], "x")
543
544         # Search by conjunction of remote attributes
545         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", 
546                               attrs=["dnsHostName", "lastLogon"])
547         self.assertEquals(len(res), 2)
548         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
549         self.assertEquals(res[0]["dnsHostName"], "x")
550         self.assertEquals(res[0]["lastLogon"], "x")
551         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
552         self.assertTrue(not "dnsHostName" in res[1])
553         self.assertEquals(res[1]["lastLogon"], "x")
554         
555         # Search by conjunction of local and remote attribute 
556         res = self.ldb.search(expression="(&(codePage=x)(description=x))", 
557                               attrs=["dnsHostName", "lastLogon"])
558         self.assertEquals(len(res), 2)
559         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
560         self.assertEquals(res[0]["dnsHostName"], "y")
561         self.assertEquals(res[0]["lastLogon"], "y")
562         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
563         self.assertEquals(res[1]["dnsHostName"], "x")
564         self.assertEquals(res[1]["lastLogon"], "x")
565
566         # Search by conjunction of local and remote attribute w/o match
567         attrs = ["dnsHostName", "lastLogon"]
568         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", 
569                               attrs=attrs)
570         self.assertEquals(len(res), 0)
571         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", 
572                               attrs=attrs)
573         self.assertEquals(len(res), 0)
574
575         # Search by disjunction of local attributes
576         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", 
577                               attrs=["dnsHostName", "lastLogon"])
578         self.assertEquals(len(res), 2)
579         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
580         self.assertEquals(res[0]["dnsHostName"], "y")
581         self.assertEquals(res[0]["lastLogon"], "y")
582         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
583         self.assertEquals(res[1]["dnsHostName"], "x")
584         self.assertEquals(res[1]["lastLogon"], "x")
585
586         # Search by disjunction of remote attributes
587         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", 
588                               attrs=["dnsHostName", "lastLogon"])
589         self.assertEquals(len(res), 3)
590         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
591         self.assertTrue("dnsHostName" in res[0])
592         self.assertEquals(res[0]["lastLogon"], "y")
593         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
594         self.assertEquals(res[1]["dnsHostName"], "x")
595         self.assertEquals(res[1]["lastLogon"], "x")
596         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
597         self.assertTrue("dnsHostName" in res[2])
598         self.assertEquals(res[2]["lastLogon"], "x")
599
600         # Search by disjunction of local and remote attribute
601         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", 
602                               attrs=["dnsHostName", "lastLogon"])
603         self.assertEquals(len(res), 3)
604         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
605         self.assertEquals(res[0]["dnsHostName"], "y")
606         self.assertEquals(res[0]["lastLogon"], "y")
607         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
608         self.assertTrue("dnsHostName" in res[1])
609         self.assertEquals(res[1]["lastLogon"], "y")
610         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
611         self.assertEquals(res[2]["dnsHostName"], "x")
612         self.assertEquals(res[2]["lastLogon"], "x")
613
614         # Search by disjunction of local and remote attribute w/o match
615         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", 
616                               attrs=["dnsHostName", "lastLogon"])
617         self.assertEquals(len(res), 0)
618
619         # Search by negated local attribute
620         res = self.ldb.search(expression="(!(revision=x))", 
621                               attrs=["dnsHostName", "lastLogon"])
622         self.assertEquals(len(res), 5)
623         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
624         self.assertTrue(not "dnsHostName" in res[0])
625         self.assertEquals(res[0]["lastLogon"], "y")
626         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
627         self.assertTrue(not "dnsHostName" in res[1])
628         self.assertEquals(res[1]["lastLogon"], "x")
629         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
630         self.assertEquals(res[2]["dnsHostName"], "z")
631         self.assertEquals(res[2]["lastLogon"], "z")
632         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
633         self.assertTrue(not "dnsHostName" in res[3])
634         self.assertEquals(res[3]["lastLogon"], "z")
635
636         # Search by negated remote attribute
637         res = self.ldb.search(expression="(!(description=x))", 
638                               attrs=["dnsHostName", "lastLogon"])
639         self.assertEquals(len(res), 3)
640         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
641         self.assertEquals(res[0]["dnsHostName"], "z")
642         self.assertEquals(res[0]["lastLogon"], "z")
643         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
644         self.assertTrue(not "dnsHostName" in res[1])
645         self.assertEquals(res[1]["lastLogon"], "z")
646
647         # Search by negated conjunction of local attributes
648         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", 
649                               attrs=["dnsHostName", "lastLogon"])
650         self.assertEquals(len(res), 5)
651         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
652         self.assertTrue(not "dnsHostName" in res[0])
653         self.assertEquals(res[0]["lastLogon"], "y")
654         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
655         self.assertTrue(not "dnsHostName" in res[1])
656         self.assertEquals(res[1]["lastLogon"], "x")
657         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
658         self.assertEquals(res[2]["dnsHostName"], "z")
659         self.assertEquals(res[2]["lastLogon"], "z")
660         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
661         self.assertTrue(not "dnsHostName" in res[3])
662         self.assertEquals(res[3]["lastLogon"], "z")
663
664         # Search by negated conjunction of remote attributes
665         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", 
666                               attrs=["dnsHostName", "lastLogon"])
667         self.assertEquals(len(res), 5)
668         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
669         self.assertEquals(res[0]["dnsHostName"], "y")
670         self.assertEquals(res[0]["lastLogon"], "y")
671         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
672         self.assertTrue(not "dnsHostName" in res[1])
673         self.assertEquals(res[1]["lastLogon"], "y")
674         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
675         self.assertEquals(res[2]["dnsHostName"], "z")
676         self.assertEquals(res[2]["lastLogon"], "z")
677         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
678         self.assertTrue(not "dnsHostName" in res[3])
679         self.assertEquals(res[3]["lastLogon"], "z")
680
681         # Search by negated conjunction of local and remote attribute
682         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", 
683                               attrs=["dnsHostName", "lastLogon"])
684         self.assertEquals(len(res), 5)
685         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
686         self.assertTrue(not "dnsHostName" in res[0])
687         self.assertEquals(res[0]["lastLogon"], "y")
688         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
689         self.assertTrue(not "dnsHostName" in res[1])
690         self.assertEquals(res[1]["lastLogon"], "x")
691         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
692         self.assertEquals(res[2]["dnsHostName"], "z")
693         self.assertEquals(res[2]["lastLogon"], "z")
694         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
695         self.assertTrue(not "dnsHostName" in res[3])
696         self.assertEquals(res[3]["lastLogon"], "z")
697
698         # Search by negated disjunction of local attributes
699         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", 
700                               attrs=["dnsHostName", "lastLogon"])
701         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
702         self.assertTrue(not "dnsHostName" in res[0])
703         self.assertEquals(res[0]["lastLogon"], "y")
704         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
705         self.assertTrue(not "dnsHostName" in res[1])
706         self.assertEquals(res[1]["lastLogon"], "x")
707         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
708         self.assertEquals(res[2]["dnsHostName"], "z")
709         self.assertEquals(res[2]["lastLogon"], "z")
710         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
711         self.assertTrue(not "dnsHostName" in res[3])
712         self.assertEquals(res[3]["lastLogon"], "z")
713
714         # Search by negated disjunction of remote attributes
715         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", 
716                               attrs=["dnsHostName", "lastLogon"])
717         self.assertEquals(len(res), 4)
718         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
719         self.assertEquals(res[0]["dnsHostName"], "y")
720         self.assertEquals(res[0]["lastLogon"], "y")
721         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
722         self.assertEquals(res[1]["dnsHostName"], "z")
723         self.assertEquals(res[1]["lastLogon"], "z")
724         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
725         self.assertTrue(not "dnsHostName" in res[2])
726         self.assertEquals(res[2]["lastLogon"], "z")
727
728         # Search by negated disjunction of local and remote attribute
729         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", 
730                               attrs=["dnsHostName", "lastLogon"])
731         self.assertEquals(len(res), 4)
732         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
733         self.assertTrue(not "dnsHostName" in res[0])
734         self.assertEquals(res[0]["lastLogon"], "x")
735         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
736         self.assertEquals(res[1]["dnsHostName"], "z")
737         self.assertEquals(res[1]["lastLogon"], "z")
738         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
739         self.assertTrue(not "dnsHostName" in res[2])
740         self.assertEquals(res[2]["lastLogon"], "z")
741
742         # Search by complex parse tree
743         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
744         self.assertEquals(len(res), 6)
745         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
746         self.assertTrue(not "dnsHostName" in res[0])
747         self.assertEquals(res[0]["lastLogon"], "y")
748         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
749         self.assertEquals(res[1]["dnsHostName"], "x")
750         self.assertEquals(res[1]["lastLogon"], "x")
751         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
752         self.assertTrue(not "dnsHostName" in res[2])
753         self.assertEquals(res[2]["lastLogon"], "x")
754         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
755         self.assertEquals(res[3]["dnsHostName"], "z")
756         self.assertEquals(res[3]["lastLogon"], "z")
757         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
758         self.assertTrue(not "dnsHostName" in res[4])
759         self.assertEquals(res[4]["lastLogon"], "z")
760
761         # Clean up
762         dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
763         for dn in dns:
764             self.ldb.delete(dn)
765
766     def test_map_modify_local(self):
767         """Modification of local records."""
768         # Add local record
769         dn = "cn=test,dc=idealx,dc=org"
770         self.ldb.add({"dn": dn, 
771                  "cn": "test",
772                  "foo": "bar",
773                  "revision": "1",
774                  "description": "test"})
775         # Check it's there
776         attrs = ["foo", "revision", "description"]
777         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
778         self.assertEquals(len(res), 1)
779         self.assertEquals(str(res[0].dn), dn)
780         self.assertEquals(res[0]["foo"], "bar")
781         self.assertEquals(res[0]["revision"], "1")
782         self.assertEquals(res[0]["description"], "test")
783         # Check it's not in the local db
784         res = self.samba4.db.search(expression="(cn=test)", 
785                                     scope=SCOPE_DEFAULT, attrs=attrs)
786         self.assertEquals(len(res), 0)
787         # Check it's not in the remote db
788         res = self.samba3.db.search(expression="(cn=test)", 
789                                     scope=SCOPE_DEFAULT, attrs=attrs)
790         self.assertEquals(len(res), 0)
791
792         # Modify local record
793         ldif = """
794 dn: """ + dn + """
795 replace: foo
796 foo: baz
797 replace: description
798 description: foo
799 """
800         self.ldb.modify_ldif(ldif)
801         # Check in local db
802         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
803         self.assertEquals(len(res), 1)
804         self.assertEquals(str(res[0].dn), dn)
805         self.assertEquals(res[0]["foo"], "baz")
806         self.assertEquals(res[0]["revision"], "1")
807         self.assertEquals(res[0]["description"], "foo")
808
809         # Rename local record
810         dn2 = "cn=toast,dc=idealx,dc=org"
811         self.ldb.rename(dn, dn2)
812         # Check in local db
813         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
814         self.assertEquals(len(res), 1)
815         self.assertEquals(str(res[0].dn), dn2)
816         self.assertEquals(res[0]["foo"], "baz")
817         self.assertEquals(res[0]["revision"], "1")
818         self.assertEquals(res[0]["description"], "foo")
819
820         # Delete local record
821         self.ldb.delete(dn2)
822         # Check it's gone
823         res = self.ldb.search(dn2, scope=SCOPE_BASE)
824         self.assertEquals(len(res), 0)
825
826     def test_map_modify_remote_remote(self):
827         """Modification of remote data of remote records"""
828         # Add remote record
829         dn = self.samba4.dn("cn=test")
830         dn2 = self.samba3.dn("cn=test")
831         self.samba3.db.add({"dn": dn2, 
832                    "cn": "test",
833                    "description": "foo",
834                    "sambaBadPasswordCount": "3",
835                    "sambaNextRid": "1001"})
836         # Check it's there
837         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
838                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
839         self.assertEquals(len(res), 1)
840         self.assertEquals(str(res[0].dn), dn2)
841         self.assertEquals(res[0]["description"], "foo")
842         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
843         self.assertEquals(res[0]["sambaNextRid"], "1001")
844         # Check in mapped db
845         attrs = ["description", "badPwdCount", "nextRid"]
846         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
847         self.assertEquals(len(res), 1)
848         self.assertEquals(str(res[0].dn), dn)
849         self.assertEquals(res[0]["description"], "foo")
850         self.assertEquals(res[0]["badPwdCount"], "3")
851         self.assertEquals(res[0]["nextRid"], "1001")
852         # Check in local db
853         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
854         self.assertEquals(len(res), 0)
855
856         # Modify remote data of remote record
857         ldif = """
858 dn: """ + dn + """
859 replace: description
860 description: test
861 replace: badPwdCount
862 badPwdCount: 4
863 """
864         self.ldb.modify_ldif(ldif)
865         # Check in mapped db
866         res = self.ldb.search(dn, scope=SCOPE_BASE, 
867                 attrs=["description", "badPwdCount", "nextRid"])
868         self.assertEquals(len(res), 1)
869         self.assertEquals(str(res[0].dn), dn)
870         self.assertEquals(res[0]["description"], "test")
871         self.assertEquals(res[0]["badPwdCount"], "4")
872         self.assertEquals(res[0]["nextRid"], "1001")
873         # Check in remote db
874         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
875                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
876         self.assertEquals(len(res), 1)
877         self.assertEquals(str(res[0].dn), dn2)
878         self.assertEquals(res[0]["description"], "test")
879         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
880         self.assertEquals(res[0]["sambaNextRid"], "1001")
881
882         # Rename remote record
883         dn2 = self.samba4.dn("cn=toast")
884         self.ldb.rename(dn, dn2)
885         # Check in mapped db
886         dn = dn2
887         res = self.ldb.search(dn, scope=SCOPE_BASE, 
888                 attrs=["description", "badPwdCount", "nextRid"])
889         self.assertEquals(len(res), 1)
890         self.assertEquals(str(res[0].dn), dn)
891         self.assertEquals(res[0]["description"], "test")
892         self.assertEquals(res[0]["badPwdCount"], "4")
893         self.assertEquals(res[0]["nextRid"], "1001")
894         # Check in remote db 
895         dn2 = self.samba3.dn("cn=toast")
896         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
897                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
898         self.assertEquals(len(res), 1)
899         self.assertEquals(str(res[0].dn), dn2)
900         self.assertEquals(res[0]["description"], "test")
901         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
902         self.assertEquals(res[0]["sambaNextRid"], "1001")
903
904         # Delete remote record
905         self.ldb.delete(dn)
906         # Check in mapped db that it's removed
907         res = self.ldb.search(dn, scope=SCOPE_BASE)
908         self.assertEquals(len(res), 0)
909         # Check in remote db
910         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
911         self.assertEquals(len(res), 0)
912
913     def test_map_modify_remote_local(self):
914         """Modification of local data of remote records"""
915         # Add remote record (same as before)
916         dn = self.samba4.dn("cn=test")
917         dn2 = self.samba3.dn("cn=test")
918         self.samba3.db.add({"dn": dn2, 
919                    "cn": "test",
920                    "description": "foo",
921                    "sambaBadPasswordCount": "3",
922                    "sambaNextRid": "1001"})
923
924         # Modify local data of remote record
925         ldif = """
926 dn: """ + dn + """
927 add: revision
928 revision: 1
929 replace: description
930 description: test
931
932 """
933         self.ldb.modify_ldif(ldif)
934         # Check in mapped db
935         attrs = ["revision", "description"]
936         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
937         self.assertEquals(len(res), 1)
938         self.assertEquals(str(res[0].dn), dn)
939         self.assertEquals(res[0]["description"], "test")
940         self.assertEquals(res[0]["revision"], "1")
941         # Check in remote db
942         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
943         self.assertEquals(len(res), 1)
944         self.assertEquals(str(res[0].dn), dn2)
945         self.assertEquals(res[0]["description"], "test")
946         self.assertTrue(not "revision" in res[0])
947         # Check in local db
948         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
949         self.assertEquals(len(res), 1)
950         self.assertEquals(str(res[0].dn), dn)
951         self.assertTrue(not "description" in res[0])
952         self.assertEquals(res[0]["revision"], "1")
953
954         # Delete (newly) split record
955         self.ldb.delete(dn)
956
957     def test_map_modify_split(self):
958         """Testing modification of split records"""
959         # Add split record
960         dn = self.samba4.dn("cn=test")
961         dn2 = self.samba3.dn("cn=test")
962         self.ldb.add({
963             "dn": dn,
964             "cn": "test",
965             "description": "foo",
966             "badPwdCount": "3",
967             "nextRid": "1001",
968             "revision": "1"})
969         # Check it's there
970         attrs = ["description", "badPwdCount", "nextRid", "revision"]
971         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
972         self.assertEquals(len(res), 1)
973         self.assertEquals(str(res[0].dn), dn)
974         self.assertEquals(res[0]["description"], "foo")
975         self.assertEquals(res[0]["badPwdCount"], "3")
976         self.assertEquals(res[0]["nextRid"], "1001")
977         self.assertEquals(res[0]["revision"], "1")
978         # Check in local db
979         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
980         self.assertEquals(len(res), 1)
981         self.assertEquals(str(res[0].dn), dn)
982         self.assertTrue(not "description" in res[0])
983         self.assertTrue(not "badPwdCount" in res[0])
984         self.assertTrue(not "nextRid" in res[0])
985         self.assertEquals(res[0]["revision"], "1")
986         # Check in remote db
987         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
988                  "revision"]
989         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
990         self.assertEquals(len(res), 1)
991         self.assertEquals(str(res[0].dn), dn2)
992         self.assertEquals(res[0]["description"], "foo")
993         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
994         self.assertEquals(res[0]["sambaNextRid"], "1001")
995         self.assertTrue(not "revision" in res[0])
996
997         # Modify of split record
998         ldif = """
999 dn: """ + dn + """
1000 replace: description
1001 description: test
1002 replace: badPwdCount
1003 badPwdCount: 4
1004 replace: revision
1005 revision: 2
1006 """
1007         self.ldb.modify_ldif(ldif)
1008         # Check in mapped db
1009         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1010         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1011         self.assertEquals(len(res), 1)
1012         self.assertEquals(str(res[0].dn), dn)
1013         self.assertEquals(res[0]["description"], "test")
1014         self.assertEquals(res[0]["badPwdCount"], "4")
1015         self.assertEquals(res[0]["nextRid"], "1001")
1016         self.assertEquals(res[0]["revision"], "2")
1017         # Check in local db
1018         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1019         self.assertEquals(len(res), 1)
1020         self.assertEquals(str(res[0].dn), dn)
1021         self.assertTrue(not "description" in res[0])
1022         self.assertTrue(not "badPwdCount" in res[0])
1023         self.assertTrue(not "nextRid" in res[0])
1024         self.assertEquals(res[0]["revision"], "2")
1025         # Check in remote db
1026         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
1027                  "revision"]
1028         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1029         self.assertEquals(len(res), 1)
1030         self.assertEquals(str(res[0].dn), dn2)
1031         self.assertEquals(res[0]["description"], "test")
1032         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1033         self.assertEquals(res[0]["sambaNextRid"], "1001")
1034         self.assertTrue(not "revision" in res[0])
1035
1036         # Rename split record
1037         dn2 = self.samba4.dn("cn=toast")
1038         self.ldb.rename(dn, dn2)
1039         # Check in mapped db
1040         dn = dn2
1041         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1042         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1043         self.assertEquals(len(res), 1)
1044         self.assertEquals(str(res[0].dn), dn)
1045         self.assertEquals(res[0]["description"], "test")
1046         self.assertEquals(res[0]["badPwdCount"], "4")
1047         self.assertEquals(res[0]["nextRid"], "1001")
1048         self.assertEquals(res[0]["revision"], "2")
1049         # Check in local db
1050         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1051         self.assertEquals(len(res), 1)
1052         self.assertEquals(str(res[0].dn), dn)
1053         self.assertTrue(not "description" in res[0])
1054         self.assertTrue(not "badPwdCount" in res[0])
1055         self.assertTrue(not "nextRid" in res[0])
1056         self.assertEquals(res[0]["revision"], "2")
1057         # Check in remote db
1058         dn2 = self.samba3.dn("cn=toast")
1059         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
1060           attrs=["description", "sambaBadPasswordCount", "sambaNextRid", 
1061                  "revision"])
1062         self.assertEquals(len(res), 1)
1063         self.assertEquals(str(res[0].dn), dn2)
1064         self.assertEquals(res[0]["description"], "test")
1065         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1066         self.assertEquals(res[0]["sambaNextRid"], "1001")
1067         self.assertTrue(not "revision" in res[0])
1068
1069         # Delete split record
1070         self.ldb.delete(dn)
1071         # Check in mapped db
1072         res = self.ldb.search(dn, scope=SCOPE_BASE)
1073         self.assertEquals(len(res), 0)
1074         # Check in local db
1075         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1076         self.assertEquals(len(res), 0)
1077         # Check in remote db
1078         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1079         self.assertEquals(len(res), 0)