fe96b8822133ea7a8b1bf6f22cf5bbdaa0ba4552
[samba.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
24
25 import os
26 import ldb
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm
30 import samba.dcerpc.security
31 import samba.ndr
32
33 datadir = os.path.join(os.path.dirname(__file__), 
34                        "../../../../../testdata/samba3")
35
36 def read_datafile(filename):
37     return open(os.path.join(datadir, filename), 'r').read()
38
39 def ldb_debug(l, text):
40     print text
41
42
43 class MapBaseTestCase(TestCaseInTempDir):
44     """Base test case for mapping tests."""
45
46     def setup_modules(self, ldb, s3, s4):
47         ldb.add({"dn": "@MAP=samba3sam",
48                  "@FROM": s4.basedn,
49                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
50
51         ldb.add({"dn": "@MODULES",
52                  "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
53
54         ldb.add({"dn": "@PARTITION",
55             "partition": ["%s:%s" % (s4.basedn, s4.url), 
56                           "%s:%s" % (s3.basedn, s3.url)],
57             "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
58
59     def setUp(self):
60         super(MapBaseTestCase, self).setUp()
61
62         def make_dn(basedn, rdn):
63             return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
64
65         def make_s4dn(basedn, rdn):
66             return "%s,%s" % (rdn, basedn)
67
68         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
69         self.ldburl = "tdb://" + self.ldbfile
70
71         tempdir = self.tempdir
72
73         class Target:
74             """Simple helper class that contains data for a specific SAM 
75             connection."""
76             def __init__(self, file, basedn, dn):
77                 self.file = os.path.join(tempdir, file)
78                 self.url = "tdb://" + self.file
79                 self.basedn = basedn
80                 self.substvars = {"BASEDN": self.basedn}
81                 self.db = Ldb(lp=cmdline_loadparm)
82                 self._dn = dn
83
84             def dn(self, rdn):
85                 return self._dn(self.basedn, rdn)
86
87             def connect(self):
88                 return self.db.connect(self.url)
89
90             def setup_data(self, path):
91                 self.add_ldif(read_datafile(path))
92
93             def subst(self, text):
94                 return substitute_var(text, self.substvars)
95
96             def add_ldif(self, ldif):
97                 self.db.add_ldif(self.subst(ldif))
98
99             def modify_ldif(self, ldif):
100                 self.db.modify_ldif(self.subst(ldif))
101
102         self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
103         self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
104         self.templates = Target("templates.ldb", "cn=templates", None)
105
106         self.samba3.connect()
107         self.templates.connect()
108         self.samba4.connect()
109
110     def tearDown(self):
111         os.unlink(self.ldbfile)
112         os.unlink(self.samba3.file)
113         os.unlink(self.templates.file)
114         os.unlink(self.samba4.file)
115         super(MapBaseTestCase, self).tearDown()
116
117     def assertSidEquals(self, text, ndr_sid):
118         sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
119                                         str(ndr_sid[0]))
120         sid_obj2 = samba.dcerpc.security.dom_sid(text)
121         self.assertEquals(sid_obj1, sid_obj2)
122
123
124 class Samba3SamTestCase(MapBaseTestCase):
125
126     def setUp(self):
127         super(Samba3SamTestCase, self).setUp()
128         ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
129         self.samba3.setup_data("samba3.ldif")
130         self.templates.setup_data("provision_samba3sam_templates.ldif")
131         ldif = read_datafile("provision_samba3sam.ldif")
132         ldb.add_ldif(self.samba4.subst(ldif))
133         self.setup_modules(ldb, self.samba3, self.samba4)
134         del ldb
135         self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
136
137     def test_search_non_mapped(self):
138         """Looking up by non-mapped attribute"""
139         msg = self.ldb.search(expression="(cn=Administrator)")
140         self.assertEquals(len(msg), 1)
141         self.assertEquals(msg[0]["cn"], "Administrator")
142
143     def test_search_non_mapped(self):
144         """Looking up by mapped attribute"""
145         msg = self.ldb.search(expression="(name=Backup Operators)")
146         self.assertEquals(len(msg), 1)
147         self.assertEquals(str(msg[0]["name"]), "Backup Operators")
148
149     def test_old_name_of_renamed(self):
150         """Looking up by old name of renamed attribute"""
151         msg = self.ldb.search(expression="(displayName=Backup Operators)")
152         self.assertEquals(len(msg), 0)
153
154     def test_mapped_containing_sid(self):
155         """Looking up mapped entry containing SID"""
156         msg = self.ldb.search(expression="(cn=Replicator)")
157         self.assertEquals(len(msg), 1)
158         self.assertEquals(str(msg[0].dn), 
159                           "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
160         self.assertTrue("objectSid" in msg[0]) 
161         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
162                              msg[0]["objectSid"])
163         oc = set(msg[0]["objectClass"])
164         self.assertEquals(oc, set(["group"]))
165
166     def test_search_by_objclass(self):
167         """Looking up by objectClass"""
168         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
169         self.assertEquals(set([str(m.dn) for m in msg]), 
170                 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", 
171                      "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
172
173     def test_s3sam_modify(self):
174         # Adding a record that will be fallbacked
175         self.ldb.add({"dn": "cn=Foo", 
176             "foo": "bar", 
177             "blah": "Blie", 
178             "cn": "Foo", 
179             "showInAdvancedViewOnly": "TRUE"}
180             )
181
182         # Checking for existence of record (local)
183         # TODO: This record must be searched in the local database, which is 
184         # currently only supported for base searches
185         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
186         # TODO: Actually, this version should work as well but doesn't...
187         # 
188         #    
189         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", 
190                 scope=SCOPE_BASE, 
191                 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
192         self.assertEquals(len(msg), 1)
193         self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
194         self.assertEquals(str(msg[0]["foo"]), "bar")
195         self.assertEquals(str(msg[0]["blah"]), "Blie")
196
197         # Adding record that will be mapped
198         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
199                  "objectClass": "user",
200                  "unixName": "bin",
201                  "sambaUnicodePwd": "geheim",
202                  "cn": "Niemand"})
203
204         # Checking for existence of record (remote)
205         msg = self.ldb.search(expression="(unixName=bin)", 
206                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
207         self.assertEquals(len(msg), 1)
208         self.assertEquals(str(msg[0]["cn"]), "Niemand")
209         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
210
211         # Checking for existence of record (local && remote)
212         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
213                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
214         self.assertEquals(len(msg), 1)           # TODO: should check with more records
215         self.assertEquals(str(msg[0]["cn"]), "Niemand")
216         self.assertEquals(str(msg[0]["unixName"]), "bin")
217         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
218
219         # Checking for existence of record (local || remote)
220         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
221                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
222         #print "got %d replies" % len(msg)
223         self.assertEquals(len(msg), 1)        # TODO: should check with more records
224         self.assertEquals(str(msg[0]["cn"]), "Niemand")
225         self.assertEquals(str(msg[0]["unixName"]), "bin")
226         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
227
228         # Checking for data in destination database
229         msg = self.samba3.db.search(expression="(cn=Niemand)")
230         self.assertTrue(len(msg) >= 1)
231         self.assertEquals(str(msg[0]["sambaSID"]), 
232                 "S-1-5-21-4231626423-2410014848-2360679739-2001")
233         self.assertEquals(str(msg[0]["displayName"]), "Niemand")
234
235         # Adding attribute...
236         self.ldb.modify_ldif("""
237 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
238 changetype: modify
239 add: description
240 description: Blah
241 """)
242
243         # Checking whether changes are still there...
244         msg = self.ldb.search(expression="(cn=Niemand)")
245         self.assertTrue(len(msg) >= 1)
246         self.assertEquals(str(msg[0]["cn"]), "Niemand")
247         self.assertEquals(str(msg[0]["description"]), "Blah")
248
249         # Modifying attribute...
250         self.ldb.modify_ldif("""
251 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
252 changetype: modify
253 replace: description
254 description: Blie
255 """)
256
257         # Checking whether changes are still there...
258         msg = self.ldb.search(expression="(cn=Niemand)")
259         self.assertTrue(len(msg) >= 1)
260         self.assertEquals(str(msg[0]["description"]), "Blie")
261
262         # Deleting attribute...
263         self.ldb.modify_ldif("""
264 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
265 changetype: modify
266 delete: description
267 """)
268
269         # Checking whether changes are no longer there...
270         msg = self.ldb.search(expression="(cn=Niemand)")
271         self.assertTrue(len(msg) >= 1)
272         self.assertTrue(not "description" in msg[0])
273
274         # Renaming record...
275         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", 
276                         "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
277
278         # Checking whether DN has changed...
279         msg = self.ldb.search(expression="(cn=Niemand2)")
280         self.assertEquals(len(msg), 1)
281         self.assertEquals(str(msg[0].dn), 
282                           "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
283
284         # Deleting record...
285         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
286
287         # Checking whether record is gone...
288         msg = self.ldb.search(expression="(cn=Niemand2)")
289         self.assertEquals(len(msg), 0)
290
291
292 class MapTestCase(MapBaseTestCase):
293
294     def setUp(self):
295         super(MapTestCase, self).setUp()
296         ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
297         self.templates.setup_data("provision_samba3sam_templates.ldif")
298         ldif = read_datafile("provision_samba3sam.ldif")
299         ldb.add_ldif(self.samba4.subst(ldif))
300         self.setup_modules(ldb, self.samba3, self.samba4)
301         del ldb
302         self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
303
304     def test_map_search(self):
305         """Running search tests on mapped data."""
306         self.samba3.db.add({
307             "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
308             "objectclass": ["sambaDomain", "top"],
309             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
310             "sambaNextRid": "2000",
311             "sambaDomainName": "TESTS"
312             })
313
314         # Add a set of split records
315         self.ldb.add_ldif("""
316 dn: """+ self.samba4.dn("cn=X") + """
317 objectClass: user
318 cn: X
319 codePage: x
320 revision: x
321 dnsHostName: x
322 nextRid: y
323 lastLogon: x
324 description: x
325 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
326 """)
327
328         self.ldb.add({
329             "dn": self.samba4.dn("cn=Y"),
330             "objectClass": "top",
331             "cn": "Y",
332             "codePage": "x",
333             "revision": "x",
334             "dnsHostName": "y",
335             "nextRid": "y",
336             "lastLogon": "y",
337             "description": "x"})
338
339         self.ldb.add({
340             "dn": self.samba4.dn("cn=Z"),
341             "objectClass": "top",
342             "cn": "Z",
343             "codePage": "x",
344             "revision": "y",
345             "dnsHostName": "z",
346             "nextRid": "y",
347             "lastLogon": "z",
348             "description": "y"})
349
350         # Add a set of remote records
351
352         self.samba3.db.add({
353             "dn": self.samba3.dn("cn=A"),
354             "objectClass": "posixAccount",
355             "cn": "A",
356             "sambaNextRid": "x",
357             "sambaBadPasswordCount": "x", 
358             "sambaLogonTime": "x",
359             "description": "x",
360             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
361             "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
362
363         self.samba3.db.add({
364             "dn": self.samba3.dn("cn=B"),
365             "objectClass": "top",
366             "cn": "B",
367             "sambaNextRid": "x",
368             "sambaBadPasswordCount": "x",
369             "sambaLogonTime": "y",
370             "description": "x"})
371
372         self.samba3.db.add({
373             "dn": self.samba3.dn("cn=C"),
374             "objectClass": "top",
375             "cn": "C",
376             "sambaNextRid": "x",
377             "sambaBadPasswordCount": "y",
378             "sambaLogonTime": "z",
379             "description": "y"})
380
381         # Testing search by DN
382
383         # Search remote record by local DN
384         dn = self.samba4.dn("cn=A")
385         res = self.ldb.search(dn, scope=SCOPE_BASE, 
386                 attrs=["dnsHostName", "lastLogon"])
387         self.assertEquals(len(res), 1)
388         self.assertEquals(str(res[0].dn), dn)
389         self.assertTrue(not "dnsHostName" in res[0])
390         self.assertEquals(str(res[0]["lastLogon"]), "x")
391
392         # Search remote record by remote DN
393         dn = self.samba3.dn("cn=A")
394         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
395                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
396         self.assertEquals(len(res), 1)
397         self.assertEquals(str(res[0].dn), dn)
398         self.assertTrue(not "dnsHostName" in res[0])
399         self.assertTrue(not "lastLogon" in res[0])
400         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
401
402         # Search split record by local DN
403         dn = self.samba4.dn("cn=X")
404         res = self.ldb.search(dn, scope=SCOPE_BASE, 
405                 attrs=["dnsHostName", "lastLogon"])
406         self.assertEquals(len(res), 1)
407         self.assertEquals(str(res[0].dn), dn)
408         self.assertEquals(str(res[0]["dnsHostName"]), "x")
409         self.assertEquals(str(res[0]["lastLogon"]), "x")
410
411         # Search split record by remote DN
412         dn = self.samba3.dn("cn=X")
413         res = self.samba3.db.search(dn, scope=SCOPE_BASE, 
414                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
415         self.assertEquals(len(res), 1)
416         self.assertEquals(str(res[0].dn), dn)
417         self.assertTrue(not "dnsHostName" in res[0])
418         self.assertTrue(not "lastLogon" in res[0])
419         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
420
421         # Testing search by attribute
422
423         # Search by ignored attribute
424         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, 
425                 attrs=["dnsHostName", "lastLogon"])
426         self.assertEquals(len(res), 2)
427         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
428         self.assertEquals(str(res[0]["dnsHostName"]), "y")
429         self.assertEquals(str(res[0]["lastLogon"]), "y")
430         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
431         self.assertEquals(str(res[1]["dnsHostName"]), "x")
432         self.assertEquals(str(res[1]["lastLogon"]), "x")
433
434         # Search by kept attribute
435         res = self.ldb.search(expression="(description=y)", 
436                 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
437         self.assertEquals(len(res), 2)
438         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
439         self.assertEquals(str(res[0]["dnsHostName"]), "z")
440         self.assertEquals(str(res[0]["lastLogon"]), "z")
441         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
442         self.assertTrue(not "dnsHostName" in res[1])
443         self.assertEquals(str(res[1]["lastLogon"]), "z")
444
445         # Search by renamed attribute
446         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
447                               attrs=["dnsHostName", "lastLogon"])
448         self.assertEquals(len(res), 2)
449         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
450         self.assertTrue(not "dnsHostName" in res[0])
451         self.assertEquals(str(res[0]["lastLogon"]), "y")
452         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
453         self.assertTrue(not "dnsHostName" in res[1])
454         self.assertEquals(str(res[1]["lastLogon"]), "x")
455
456         # Search by converted attribute
457         # TODO:
458         #   Using the SID directly in the parse tree leads to conversion
459         #   errors, letting the search fail with no results.
460         #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
461         res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
462         self.assertEquals(len(res), 3)
463         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
464         self.assertEquals(str(res[0]["dnsHostName"]), "x")
465         self.assertEquals(str(res[0]["lastLogon"]), "x")
466         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", 
467                              res[0]["objectSid"])
468         self.assertTrue("objectSid" in res[0])
469         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
470         self.assertTrue(not "dnsHostName" in res[1])
471         self.assertEquals(str(res[1]["lastLogon"]), "x")
472         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
473                              res[1]["objectSid"])
474         self.assertTrue("objectSid" in res[1])
475
476         # Search by generated attribute 
477         # In most cases, this even works when the mapping is missing
478         # a `convert_operator' by enumerating the remote db.
479         res = self.ldb.search(expression="(primaryGroupID=512)", 
480                            attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
481         self.assertEquals(len(res), 1)
482         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
483         self.assertTrue(not "dnsHostName" in res[0])
484         self.assertEquals(str(res[0]["lastLogon"]), "x")
485         self.assertEquals(str(res[0]["primaryGroupID"]), "512")
486
487         # Note that Xs "objectSid" seems to be fine in the previous search for
488         # "objectSid"...
489         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
490         #print len(res) + " results found"
491         #for i in range(len(res)):
492         #    for (obj in res[i]) {
493         #        print obj + ": " + res[i][obj]
494         #    }
495         #    print "---"
496         #    
497
498         # Search by remote name of renamed attribute */
499         res = self.ldb.search(expression="(sambaBadPasswordCount=*)", 
500                               attrs=["dnsHostName", "lastLogon"])
501         self.assertEquals(len(res), 0)
502
503         # Search by objectClass
504         attrs = ["dnsHostName", "lastLogon", "objectClass"]
505         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
506         self.assertEquals(len(res), 2)
507         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
508         self.assertEquals(str(res[0]["dnsHostName"]), "x")
509         self.assertEquals(str(res[0]["lastLogon"]), "x")
510         self.assertEquals(str(res[0]["objectClass"][0]), "user")
511         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
512         self.assertTrue(not "dnsHostName" in res[1])
513         self.assertEquals(str(res[1]["lastLogon"]), "x")
514         self.assertEquals(str(res[1]["objectClass"][0]), "user")
515
516         # Prove that the objectClass is actually used for the search
517         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
518                               attrs=attrs)
519         self.assertEquals(len(res), 3)
520         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
521         self.assertTrue(not "dnsHostName" in res[0])
522         self.assertEquals(str(res[0]["lastLogon"]), "y")
523         self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
524         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
525         self.assertEquals(str(res[1]["dnsHostName"]), "x")
526         self.assertEquals(str(res[1]["lastLogon"]), "x")
527         self.assertEquals(str(res[1]["objectClass"][0]), "user")
528         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
529         self.assertTrue(not "dnsHostName" in res[2])
530         self.assertEquals(str(res[2]["lastLogon"]), "x")
531         self.assertEquals(res[2]["objectClass"][0], "user")
532
533         # Testing search by parse tree
534
535         # Search by conjunction of local attributes
536         res = self.ldb.search(expression="(&(codePage=x)(revision=x))", 
537                               attrs=["dnsHostName", "lastLogon"])
538         self.assertEquals(len(res), 2)
539         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
540         self.assertEquals(str(res[0]["dnsHostName"]), "y")
541         self.assertEquals(str(res[0]["lastLogon"]), "y")
542         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
543         self.assertEquals(str(res[1]["dnsHostName"]), "x")
544         self.assertEquals(str(res[1]["lastLogon"]), "x")
545
546         # Search by conjunction of remote attributes
547         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", 
548                               attrs=["dnsHostName", "lastLogon"])
549         self.assertEquals(len(res), 2)
550         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
551         self.assertEquals(str(res[0]["dnsHostName"]), "x")
552         self.assertEquals(str(res[0]["lastLogon"]), "x")
553         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
554         self.assertTrue(not "dnsHostName" in res[1])
555         self.assertEquals(str(res[1]["lastLogon"]), "x")
556         
557         # Search by conjunction of local and remote attribute 
558         res = self.ldb.search(expression="(&(codePage=x)(description=x))", 
559                               attrs=["dnsHostName", "lastLogon"])
560         self.assertEquals(len(res), 2)
561         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
562         self.assertEquals(str(res[0]["dnsHostName"]), "y")
563         self.assertEquals(str(res[0]["lastLogon"]), "y")
564         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
565         self.assertEquals(str(res[1]["dnsHostName"]), "x")
566         self.assertEquals(str(res[1]["lastLogon"]), "x")
567
568         # Search by conjunction of local and remote attribute w/o match
569         attrs = ["dnsHostName", "lastLogon"]
570         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", 
571                               attrs=attrs)
572         self.assertEquals(len(res), 0)
573         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", 
574                               attrs=attrs)
575         self.assertEquals(len(res), 0)
576
577         # Search by disjunction of local attributes
578         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", 
579                               attrs=["dnsHostName", "lastLogon"])
580         self.assertEquals(len(res), 2)
581         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
582         self.assertEquals(str(res[0]["dnsHostName"]), "y")
583         self.assertEquals(str(res[0]["lastLogon"]), "y")
584         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
585         self.assertEquals(str(res[1]["dnsHostName"]), "x")
586         self.assertEquals(str(res[1]["lastLogon"]), "x")
587
588         # Search by disjunction of remote attributes
589         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", 
590                               attrs=["dnsHostName", "lastLogon"])
591         self.assertEquals(len(res), 3)
592         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
593         self.assertFalse("dnsHostName" in res[0])
594         self.assertEquals(str(res[0]["lastLogon"]), "y")
595         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
596         self.assertEquals(str(res[1]["dnsHostName"]), "x")
597         self.assertEquals(str(res[1]["lastLogon"]), "x")
598         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
599         self.assertFalse("dnsHostName" in res[2])
600         self.assertEquals(str(res[2]["lastLogon"]), "x")
601
602         # Search by disjunction of local and remote attribute
603         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", 
604                               attrs=["dnsHostName", "lastLogon"])
605         self.assertEquals(len(res), 3)
606         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
607         self.assertEquals(str(res[0]["dnsHostName"]), "y")
608         self.assertEquals(str(res[0]["lastLogon"]), "y")
609         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
610         self.assertFalse("dnsHostName" in res[1])
611         self.assertEquals(str(res[1]["lastLogon"]), "y")
612         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
613         self.assertEquals(str(res[2]["dnsHostName"]), "x")
614         self.assertEquals(str(res[2]["lastLogon"]), "x")
615
616         # Search by disjunction of local and remote attribute w/o match
617         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", 
618                               attrs=["dnsHostName", "lastLogon"])
619         self.assertEquals(len(res), 0)
620
621         # Search by negated local attribute
622         res = self.ldb.search(expression="(!(revision=x))", 
623                               attrs=["dnsHostName", "lastLogon"])
624         self.assertEquals(len(res), 5)
625         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
626         self.assertTrue(not "dnsHostName" in res[0])
627         self.assertEquals(str(res[0]["lastLogon"]), "y")
628         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
629         self.assertTrue(not "dnsHostName" in res[1])
630         self.assertEquals(str(res[1]["lastLogon"]), "x")
631         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
632         self.assertEquals(str(res[2]["dnsHostName"]), "z")
633         self.assertEquals(str(res[2]["lastLogon"]), "z")
634         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
635         self.assertTrue(not "dnsHostName" in res[3])
636         self.assertEquals(str(res[3]["lastLogon"]), "z")
637
638         # Search by negated remote attribute
639         res = self.ldb.search(expression="(!(description=x))", 
640                               attrs=["dnsHostName", "lastLogon"])
641         self.assertEquals(len(res), 3)
642         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
643         self.assertEquals(str(res[0]["dnsHostName"]), "z")
644         self.assertEquals(str(res[0]["lastLogon"]), "z")
645         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
646         self.assertTrue(not "dnsHostName" in res[1])
647         self.assertEquals(str(res[1]["lastLogon"]), "z")
648
649         # Search by negated conjunction of local attributes
650         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", 
651                               attrs=["dnsHostName", "lastLogon"])
652         self.assertEquals(len(res), 5)
653         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
654         self.assertTrue(not "dnsHostName" in res[0])
655         self.assertEquals(str(res[0]["lastLogon"]), "y")
656         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
657         self.assertTrue(not "dnsHostName" in res[1])
658         self.assertEquals(str(res[1]["lastLogon"]), "x")
659         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
660         self.assertEquals(str(res[2]["dnsHostName"]), "z")
661         self.assertEquals(str(res[2]["lastLogon"]), "z")
662         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
663         self.assertTrue(not "dnsHostName" in res[3])
664         self.assertEquals(str(res[3]["lastLogon"]), "z")
665
666         # Search by negated conjunction of remote attributes
667         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", 
668                               attrs=["dnsHostName", "lastLogon"])
669         self.assertEquals(len(res), 5)
670         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
671         self.assertEquals(str(res[0]["dnsHostName"]), "y")
672         self.assertEquals(str(res[0]["lastLogon"]), "y")
673         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
674         self.assertTrue(not "dnsHostName" in res[1])
675         self.assertEquals(str(res[1]["lastLogon"]), "y")
676         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
677         self.assertEquals(str(res[2]["dnsHostName"]), "z")
678         self.assertEquals(str(res[2]["lastLogon"]), "z")
679         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
680         self.assertTrue(not "dnsHostName" in res[3])
681         self.assertEquals(str(res[3]["lastLogon"]), "z")
682
683         # Search by negated conjunction of local and remote attribute
684         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", 
685                               attrs=["dnsHostName", "lastLogon"])
686         self.assertEquals(len(res), 5)
687         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
688         self.assertTrue(not "dnsHostName" in res[0])
689         self.assertEquals(str(res[0]["lastLogon"]), "y")
690         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
691         self.assertTrue(not "dnsHostName" in res[1])
692         self.assertEquals(str(res[1]["lastLogon"]), "x")
693         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
694         self.assertEquals(str(res[2]["dnsHostName"]), "z")
695         self.assertEquals(str(res[2]["lastLogon"]), "z")
696         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
697         self.assertTrue(not "dnsHostName" in res[3])
698         self.assertEquals(str(res[3]["lastLogon"]), "z")
699
700         # Search by negated disjunction of local attributes
701         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", 
702                               attrs=["dnsHostName", "lastLogon"])
703         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
704         self.assertTrue(not "dnsHostName" in res[0])
705         self.assertEquals(str(res[0]["lastLogon"]), "y")
706         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
707         self.assertTrue(not "dnsHostName" in res[1])
708         self.assertEquals(str(res[1]["lastLogon"]), "x")
709         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
710         self.assertEquals(str(res[2]["dnsHostName"]), "z")
711         self.assertEquals(str(res[2]["lastLogon"]), "z")
712         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
713         self.assertTrue(not "dnsHostName" in res[3])
714         self.assertEquals(str(res[3]["lastLogon"]), "z")
715
716         # Search by negated disjunction of remote attributes
717         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", 
718                               attrs=["dnsHostName", "lastLogon"])
719         self.assertEquals(len(res), 4)
720         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
721         self.assertEquals(str(res[0]["dnsHostName"]), "y")
722         self.assertEquals(str(res[0]["lastLogon"]), "y")
723         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
724         self.assertEquals(str(res[1]["dnsHostName"]), "z")
725         self.assertEquals(str(res[1]["lastLogon"]), "z")
726         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
727         self.assertTrue(not "dnsHostName" in res[2])
728         self.assertEquals(str(res[2]["lastLogon"]), "z")
729
730         # Search by negated disjunction of local and remote attribute
731         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", 
732                               attrs=["dnsHostName", "lastLogon"])
733         self.assertEquals(len(res), 4)
734         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
735         self.assertTrue(not "dnsHostName" in res[0])
736         self.assertEquals(str(res[0]["lastLogon"]), "x")
737         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
738         self.assertEquals(str(res[1]["dnsHostName"]), "z")
739         self.assertEquals(str(res[1]["lastLogon"]), "z")
740         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
741         self.assertTrue(not "dnsHostName" in res[2])
742         self.assertEquals(str(res[2]["lastLogon"]), "z")
743
744         # Search by complex parse tree
745         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
746         self.assertEquals(len(res), 6)
747         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
748         self.assertTrue(not "dnsHostName" in res[0])
749         self.assertEquals(str(res[0]["lastLogon"]), "y")
750         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
751         self.assertEquals(str(res[1]["dnsHostName"]), "x")
752         self.assertEquals(str(res[1]["lastLogon"]), "x")
753         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
754         self.assertTrue(not "dnsHostName" in res[2])
755         self.assertEquals(str(res[2]["lastLogon"]), "x")
756         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
757         self.assertEquals(str(res[3]["dnsHostName"]), "z")
758         self.assertEquals(str(res[3]["lastLogon"]), "z")
759         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
760         self.assertTrue(not "dnsHostName" in res[4])
761         self.assertEquals(str(res[4]["lastLogon"]), "z")
762
763         # Clean up
764         dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
765         for dn in dns:
766             self.ldb.delete(dn)
767
768     def test_map_modify_local(self):
769         """Modification of local records."""
770         # Add local record
771         dn = "cn=test,dc=idealx,dc=org"
772         self.ldb.add({"dn": dn, 
773                  "cn": "test",
774                  "foo": "bar",
775                  "revision": "1",
776                  "description": "test"})
777         # Check it's there
778         attrs = ["foo", "revision", "description"]
779         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
780         self.assertEquals(len(res), 1)
781         self.assertEquals(str(res[0].dn), dn)
782         self.assertEquals(str(res[0]["foo"]), "bar")
783         self.assertEquals(str(res[0]["revision"]), "1")
784         self.assertEquals(str(res[0]["description"]), "test")
785         # Check it's not in the local db
786         res = self.samba4.db.search(expression="(cn=test)", 
787                                     scope=SCOPE_DEFAULT, attrs=attrs)
788         self.assertEquals(len(res), 0)
789         # Check it's not in the remote db
790         res = self.samba3.db.search(expression="(cn=test)", 
791                                     scope=SCOPE_DEFAULT, attrs=attrs)
792         self.assertEquals(len(res), 0)
793
794         # Modify local record
795         ldif = """
796 dn: """ + dn + """
797 replace: foo
798 foo: baz
799 replace: description
800 description: foo
801 """
802         self.ldb.modify_ldif(ldif)
803         # Check in local db
804         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
805         self.assertEquals(len(res), 1)
806         self.assertEquals(str(res[0].dn), dn)
807         self.assertEquals(str(res[0]["foo"]), "baz")
808         self.assertEquals(str(res[0]["revision"]), "1")
809         self.assertEquals(str(res[0]["description"]), "foo")
810
811         # Rename local record
812         dn2 = "cn=toast,dc=idealx,dc=org"
813         self.ldb.rename(dn, dn2)
814         # Check in local db
815         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
816         self.assertEquals(len(res), 1)
817         self.assertEquals(str(res[0].dn), dn2)
818         self.assertEquals(str(res[0]["foo"]), "baz")
819         self.assertEquals(str(res[0]["revision"]), "1")
820         self.assertEquals(str(res[0]["description"]), "foo")
821
822         # Delete local record
823         self.ldb.delete(dn2)
824         # Check it's gone
825         res = self.ldb.search(dn2, scope=SCOPE_BASE)
826         self.assertEquals(len(res), 0)
827
828     def test_map_modify_remote_remote(self):
829         """Modification of remote data of remote records"""
830         # Add remote record
831         dn = self.samba4.dn("cn=test")
832         dn2 = self.samba3.dn("cn=test")
833         self.samba3.db.add({"dn": dn2, 
834                    "cn": "test",
835                    "description": "foo",
836                    "sambaBadPasswordCount": "3",
837                    "sambaNextRid": "1001"})
838         # Check it's there
839         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
840                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
841         self.assertEquals(len(res), 1)
842         self.assertEquals(str(res[0].dn), dn2)
843         self.assertEquals(str(res[0]["description"]), "foo")
844         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
845         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
846         # Check in mapped db
847         attrs = ["description", "badPwdCount", "nextRid"]
848         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
849         self.assertEquals(len(res), 1)
850         self.assertEquals(str(res[0].dn), dn)
851         self.assertEquals(str(res[0]["description"]), "foo")
852         self.assertEquals(str(res[0]["badPwdCount"]), "3")
853         self.assertEquals(str(res[0]["nextRid"]), "1001")
854         # Check in local db
855         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
856         self.assertEquals(len(res), 0)
857
858         # Modify remote data of remote record
859         ldif = """
860 dn: """ + dn + """
861 replace: description
862 description: test
863 replace: badPwdCount
864 badPwdCount: 4
865 """
866         self.ldb.modify_ldif(ldif)
867         # Check in mapped db
868         res = self.ldb.search(dn, scope=SCOPE_BASE, 
869                 attrs=["description", "badPwdCount", "nextRid"])
870         self.assertEquals(len(res), 1)
871         self.assertEquals(str(res[0].dn), dn)
872         self.assertEquals(str(res[0]["description"]), "test")
873         self.assertEquals(str(res[0]["badPwdCount"]), "4")
874         self.assertEquals(str(res[0]["nextRid"]), "1001")
875         # Check in remote db
876         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
877                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
878         self.assertEquals(len(res), 1)
879         self.assertEquals(str(res[0].dn), dn2)
880         self.assertEquals(str(res[0]["description"]), "test")
881         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
882         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
883
884         # Rename remote record
885         dn2 = self.samba4.dn("cn=toast")
886         self.ldb.rename(dn, dn2)
887         # Check in mapped db
888         dn = dn2
889         res = self.ldb.search(dn, scope=SCOPE_BASE, 
890                 attrs=["description", "badPwdCount", "nextRid"])
891         self.assertEquals(len(res), 1)
892         self.assertEquals(str(res[0].dn), dn)
893         self.assertEquals(str(res[0]["description"]), "test")
894         self.assertEquals(str(res[0]["badPwdCount"]), "4")
895         self.assertEquals(str(res[0]["nextRid"]), "1001")
896         # Check in remote db 
897         dn2 = self.samba3.dn("cn=toast")
898         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
899                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
900         self.assertEquals(len(res), 1)
901         self.assertEquals(str(res[0].dn), dn2)
902         self.assertEquals(str(res[0]["description"]), "test")
903         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
904         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
905
906         # Delete remote record
907         self.ldb.delete(dn)
908         # Check in mapped db that it's removed
909         res = self.ldb.search(dn, scope=SCOPE_BASE)
910         self.assertEquals(len(res), 0)
911         # Check in remote db
912         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
913         self.assertEquals(len(res), 0)
914
915     def test_map_modify_remote_local(self):
916         """Modification of local data of remote records"""
917         # Add remote record (same as before)
918         dn = self.samba4.dn("cn=test")
919         dn2 = self.samba3.dn("cn=test")
920         self.samba3.db.add({"dn": dn2, 
921                    "cn": "test",
922                    "description": "foo",
923                    "sambaBadPasswordCount": "3",
924                    "sambaNextRid": "1001"})
925
926         # Modify local data of remote record
927         ldif = """
928 dn: """ + dn + """
929 add: revision
930 revision: 1
931 replace: description
932 description: test
933
934 """
935         self.ldb.modify_ldif(ldif)
936         # Check in mapped db
937         attrs = ["revision", "description"]
938         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
939         self.assertEquals(len(res), 1)
940         self.assertEquals(str(res[0].dn), dn)
941         self.assertEquals(str(res[0]["description"]), "test")
942         self.assertEquals(str(res[0]["revision"]), "1")
943         # Check in remote db
944         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
945         self.assertEquals(len(res), 1)
946         self.assertEquals(str(res[0].dn), dn2)
947         self.assertEquals(str(res[0]["description"]), "test")
948         self.assertTrue(not "revision" in res[0])
949         # Check in local db
950         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
951         self.assertEquals(len(res), 1)
952         self.assertEquals(str(res[0].dn), dn)
953         self.assertTrue(not "description" in res[0])
954         self.assertEquals(str(res[0]["revision"]), "1")
955
956         # Delete (newly) split record
957         self.ldb.delete(dn)
958
959     def test_map_modify_split(self):
960         """Testing modification of split records"""
961         # Add split record
962         dn = self.samba4.dn("cn=test")
963         dn2 = self.samba3.dn("cn=test")
964         self.ldb.add({
965             "dn": dn,
966             "cn": "test",
967             "description": "foo",
968             "badPwdCount": "3",
969             "nextRid": "1001",
970             "revision": "1"})
971         # Check it's there
972         attrs = ["description", "badPwdCount", "nextRid", "revision"]
973         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
974         self.assertEquals(len(res), 1)
975         self.assertEquals(str(res[0].dn), dn)
976         self.assertEquals(str(res[0]["description"]), "foo")
977         self.assertEquals(str(res[0]["badPwdCount"]), "3")
978         self.assertEquals(str(res[0]["nextRid"]), "1001")
979         self.assertEquals(str(res[0]["revision"]), "1")
980         # Check in local db
981         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
982         self.assertEquals(len(res), 1)
983         self.assertEquals(str(res[0].dn), dn)
984         self.assertTrue(not "description" in res[0])
985         self.assertTrue(not "badPwdCount" in res[0])
986         self.assertTrue(not "nextRid" in res[0])
987         self.assertEquals(str(res[0]["revision"]), "1")
988         # Check in remote db
989         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
990                  "revision"]
991         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
992         self.assertEquals(len(res), 1)
993         self.assertEquals(str(res[0].dn), dn2)
994         self.assertEquals(str(res[0]["description"]), "foo")
995         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
996         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
997         self.assertTrue(not "revision" in res[0])
998
999         # Modify of split record
1000         ldif = """
1001 dn: """ + dn + """
1002 replace: description
1003 description: test
1004 replace: badPwdCount
1005 badPwdCount: 4
1006 replace: revision
1007 revision: 2
1008 """
1009         self.ldb.modify_ldif(ldif)
1010         # Check in mapped db
1011         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1012         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1013         self.assertEquals(len(res), 1)
1014         self.assertEquals(str(res[0].dn), dn)
1015         self.assertEquals(str(res[0]["description"]), "test")
1016         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1017         self.assertEquals(str(res[0]["nextRid"]), "1001")
1018         self.assertEquals(str(res[0]["revision"]), "2")
1019         # Check in local db
1020         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1021         self.assertEquals(len(res), 1)
1022         self.assertEquals(str(res[0].dn), dn)
1023         self.assertTrue(not "description" in res[0])
1024         self.assertTrue(not "badPwdCount" in res[0])
1025         self.assertTrue(not "nextRid" in res[0])
1026         self.assertEquals(str(res[0]["revision"]), "2")
1027         # Check in remote db
1028         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", 
1029                  "revision"]
1030         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1031         self.assertEquals(len(res), 1)
1032         self.assertEquals(str(res[0].dn), dn2)
1033         self.assertEquals(str(res[0]["description"]), "test")
1034         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1035         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1036         self.assertTrue(not "revision" in res[0])
1037
1038         # Rename split record
1039         dn2 = self.samba4.dn("cn=toast")
1040         self.ldb.rename(dn, dn2)
1041         # Check in mapped db
1042         dn = dn2
1043         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1044         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1045         self.assertEquals(len(res), 1)
1046         self.assertEquals(str(res[0].dn), dn)
1047         self.assertEquals(str(res[0]["description"]), "test")
1048         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1049         self.assertEquals(str(res[0]["nextRid"]), "1001")
1050         self.assertEquals(str(res[0]["revision"]), "2")
1051         # Check in local db
1052         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1053         self.assertEquals(len(res), 1)
1054         self.assertEquals(str(res[0].dn), dn)
1055         self.assertTrue(not "description" in res[0])
1056         self.assertTrue(not "badPwdCount" in res[0])
1057         self.assertTrue(not "nextRid" in res[0])
1058         self.assertEquals(str(res[0]["revision"]), "2")
1059         # Check in remote db
1060         dn2 = self.samba3.dn("cn=toast")
1061         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, 
1062           attrs=["description", "sambaBadPasswordCount", "sambaNextRid", 
1063                  "revision"])
1064         self.assertEquals(len(res), 1)
1065         self.assertEquals(str(res[0].dn), dn2)
1066         self.assertEquals(str(res[0]["description"]), "test")
1067         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1068         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1069         self.assertTrue(not "revision" in res[0])
1070
1071         # Delete split record
1072         self.ldb.delete(dn)
1073         # Check in mapped db
1074         res = self.ldb.search(dn, scope=SCOPE_BASE)
1075         self.assertEquals(len(res), 0)
1076         # Check in local db
1077         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1078         self.assertEquals(len(res), 0)
1079         # Check in remote db
1080         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1081         self.assertEquals(len(res), 0)