s4-samldb: Do not allow deletion of objects with RID < 1000
[metze/samba/wip.git] / python / samba / tests / samba3sam.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
3 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
4 #
5 # This is a Python port of the original in testprogs/ejs/samba3sam.js
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20
21 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
22
23 import os
24 import ldb
25 from ldb import SCOPE_DEFAULT, SCOPE_BASE
26 from samba import Ldb, substitute_var
27 from samba.tests import TestCaseInTempDir, env_loadparm
28 import samba.dcerpc.security
29 import samba.ndr
30 from samba.auth import system_session
31 from operator import attrgetter
32
33
34 def read_datafile(filename):
35     paths = [ "../../../../../testdata/samba3",
36               "../../../../testdata/samba3" ]
37     for p in paths:
38         datadir = os.path.join(os.path.dirname(__file__), p)
39         if os.path.exists(datadir):
40             break
41     return open(os.path.join(datadir, filename), 'r').read()
42
43 def ldb_debug(l, text):
44     print text
45
46
47 class MapBaseTestCase(TestCaseInTempDir):
48     """Base test case for mapping tests."""
49
50     def setup_modules(self, ldb, s3, s4):
51         ldb.add({"dn": "@MAP=samba3sam",
52                  "@FROM": s4.basedn,
53                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
54
55         ldb.add({"dn": "@MODULES",
56                  "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,show_deleted,partition"})
57
58         ldb.add({"dn": "@PARTITION",
59             "partition": ["%s" % (s4.basedn_casefold),
60                           "%s" % (s3.basedn_casefold)],
61             "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
62             "modules": "*:"})
63
64     def setUp(self):
65         self.lp = env_loadparm()
66         self.lp.set("workgroup", "TESTS")
67         self.lp.set("netbios name", "TESTS")
68         super(MapBaseTestCase, self).setUp()
69
70         def make_dn(basedn, rdn):
71             return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
72
73         def make_s4dn(basedn, rdn):
74             return "%s,%s" % (rdn, basedn)
75
76         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
77         self.ldburl = "tdb://" + self.ldbfile
78
79         tempdir = self.tempdir
80
81         class Target:
82             """Simple helper class that contains data for a specific SAM
83             connection."""
84
85             def __init__(self, basedn, dn, lp):
86                 self.db = Ldb(lp=lp, session_info=system_session())
87                 self.db.set_opaque("skip_allocate_sids", "true");
88                 self.basedn = basedn
89                 self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold()
90                 self.substvars = {"BASEDN": self.basedn}
91                 self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold)
92                 self.url = "tdb://" + self.file
93                 self._dn = dn
94
95             def dn(self, rdn):
96                 return self._dn(self.basedn, rdn)
97
98             def connect(self):
99                 return self.db.connect(self.url)
100
101             def setup_data(self, path):
102                 self.add_ldif(read_datafile(path))
103
104             def subst(self, text):
105                 return substitute_var(text, self.substvars)
106
107             def add_ldif(self, ldif):
108                 self.db.add_ldif(self.subst(ldif))
109
110             def modify_ldif(self, ldif):
111                 self.db.modify_ldif(self.subst(ldif))
112
113         self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn, self.lp)
114         self.samba3 = Target("cn=Samba3Sam", make_dn, self.lp)
115
116         self.samba3.connect()
117         self.samba4.connect()
118
119     def tearDown(self):
120         os.unlink(self.ldbfile)
121         os.unlink(self.samba3.file)
122         os.unlink(self.samba4.file)
123         pdir = "%s.d" % self.ldbfile
124         mdata = os.path.join(pdir, "metadata.tdb")
125         if os.path.exists(mdata):
126             os.unlink(mdata)
127             os.rmdir(pdir)
128         super(MapBaseTestCase, self).tearDown()
129
130     def assertSidEquals(self, text, ndr_sid):
131         sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
132                                         str(ndr_sid[0]))
133         sid_obj2 = samba.dcerpc.security.dom_sid(text)
134         self.assertEquals(sid_obj1, sid_obj2)
135
136
137 class Samba3SamTestCase(MapBaseTestCase):
138
139     def setUp(self):
140         super(Samba3SamTestCase, self).setUp()
141         ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
142         ldb.set_opaque("skip_allocate_sids", "true");
143         self.samba3.setup_data("samba3.ldif")
144         ldif = read_datafile("provision_samba3sam.ldif")
145         ldb.add_ldif(self.samba4.subst(ldif))
146         self.setup_modules(ldb, self.samba3, self.samba4)
147         del ldb
148         self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
149         self.ldb.set_opaque("skip_allocate_sids", "true");
150
151     def test_search_non_mapped(self):
152         """Looking up by non-mapped attribute"""
153         msg = self.ldb.search(expression="(cn=Administrator)")
154         self.assertEquals(len(msg), 1)
155         self.assertEquals(msg[0]["cn"], "Administrator")
156
157     def test_search_non_mapped(self):
158         """Looking up by mapped attribute"""
159         msg = self.ldb.search(expression="(name=Backup Operators)")
160         self.assertEquals(len(msg), 1)
161         self.assertEquals(str(msg[0]["name"]), "Backup Operators")
162
163     def test_old_name_of_renamed(self):
164         """Looking up by old name of renamed attribute"""
165         msg = self.ldb.search(expression="(displayName=Backup Operators)")
166         self.assertEquals(len(msg), 0)
167
168     def test_mapped_containing_sid(self):
169         """Looking up mapped entry containing SID"""
170         msg = self.ldb.search(expression="(cn=Replicator)")
171         self.assertEquals(len(msg), 1)
172         self.assertEquals(str(msg[0].dn),
173                           "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
174         self.assertTrue("objectSid" in msg[0])
175         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
176                              msg[0]["objectSid"])
177         oc = set(msg[0]["objectClass"])
178         self.assertEquals(oc, set(["group"]))
179
180     def test_search_by_objclass(self):
181         """Looking up by objectClass"""
182         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
183         self.assertEquals(set([str(m.dn) for m in msg]),
184                 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
185                      "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
186
187     def test_s3sam_modify(self):
188         # Adding a record that will be fallbacked
189         self.ldb.add({"dn": "cn=Foo",
190             "foo": "bar",
191             "blah": "Blie",
192             "cn": "Foo",
193             "showInAdvancedViewOnly": "TRUE"}
194             )
195
196         # Checking for existence of record (local)
197         # TODO: This record must be searched in the local database, which is
198         # currently only supported for base searches
199         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
200         # TODO: Actually, this version should work as well but doesn't...
201         #
202         #
203         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
204                 scope=SCOPE_BASE,
205                 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
206         self.assertEquals(len(msg), 1)
207         self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
208         self.assertEquals(str(msg[0]["foo"]), "bar")
209         self.assertEquals(str(msg[0]["blah"]), "Blie")
210
211         # Adding record that will be mapped
212         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
213                  "objectClass": "user",
214                  "unixName": "bin",
215                  "sambaUnicodePwd": "geheim",
216                  "cn": "Niemand"})
217
218         # Checking for existence of record (remote)
219         msg = self.ldb.search(expression="(unixName=bin)",
220                               attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
221         self.assertEquals(len(msg), 1)
222         self.assertEquals(str(msg[0]["cn"]), "Niemand")
223         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
224
225         # Checking for existence of record (local && remote)
226         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
227                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
228         self.assertEquals(len(msg), 1)           # TODO: should check with more records
229         self.assertEquals(str(msg[0]["cn"]), "Niemand")
230         self.assertEquals(str(msg[0]["unixName"]), "bin")
231         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
232
233         # Checking for existence of record (local || remote)
234         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
235                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
236         #print "got %d replies" % len(msg)
237         self.assertEquals(len(msg), 1)        # TODO: should check with more records
238         self.assertEquals(str(msg[0]["cn"]), "Niemand")
239         self.assertEquals(str(msg[0]["unixName"]), "bin")
240         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
241
242         # Checking for data in destination database
243         msg = self.samba3.db.search(expression="(cn=Niemand)")
244         self.assertTrue(len(msg) >= 1)
245         self.assertEquals(str(msg[0]["sambaSID"]),
246                 "S-1-5-21-4231626423-2410014848-2360679739-2001")
247         self.assertEquals(str(msg[0]["displayName"]), "Niemand")
248
249         # Adding attribute...
250         self.ldb.modify_ldif("""
251 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
252 changetype: modify
253 add: description
254 description: Blah
255 """)
256
257         # Checking whether changes are still there...
258         msg = self.ldb.search(expression="(cn=Niemand)")
259         self.assertTrue(len(msg) >= 1)
260         self.assertEquals(str(msg[0]["cn"]), "Niemand")
261         self.assertEquals(str(msg[0]["description"]), "Blah")
262
263         # Modifying attribute...
264         self.ldb.modify_ldif("""
265 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
266 changetype: modify
267 replace: description
268 description: Blie
269 """)
270
271         # Checking whether changes are still there...
272         msg = self.ldb.search(expression="(cn=Niemand)")
273         self.assertTrue(len(msg) >= 1)
274         self.assertEquals(str(msg[0]["description"]), "Blie")
275
276         # Deleting attribute...
277         self.ldb.modify_ldif("""
278 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
279 changetype: modify
280 delete: description
281 """)
282
283         # Checking whether changes are no longer there...
284         msg = self.ldb.search(expression="(cn=Niemand)")
285         self.assertTrue(len(msg) >= 1)
286         self.assertTrue(not "description" in msg[0])
287
288         # Renaming record...
289         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
290                         "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
291
292         # Checking whether DN has changed...
293         msg = self.ldb.search(expression="(cn=Niemand2)")
294         self.assertEquals(len(msg), 1)
295         self.assertEquals(str(msg[0].dn),
296                           "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
297
298         # Deleting record...
299         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
300
301         # Checking whether record is gone...
302         msg = self.ldb.search(expression="(cn=Niemand2)")
303         self.assertEquals(len(msg), 0)
304
305
306 class MapTestCase(MapBaseTestCase):
307
308     def setUp(self):
309         super(MapTestCase, self).setUp()
310         ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
311         ldb.set_opaque("skip_allocate_sids", "true");
312         ldif = read_datafile("provision_samba3sam.ldif")
313         ldb.add_ldif(self.samba4.subst(ldif))
314         self.setup_modules(ldb, self.samba3, self.samba4)
315         del ldb
316         self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
317         self.ldb.set_opaque("skip_allocate_sids", "true");
318
319     def test_map_search(self):
320         """Running search tests on mapped data."""
321         self.samba3.db.add({
322             "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
323             "objectclass": ["sambaDomain", "top"],
324             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
325             "sambaNextRid": "2000",
326             "sambaDomainName": "TESTS"
327             })
328
329         # Add a set of split records
330         self.ldb.add_ldif("""
331 dn: """+ self.samba4.dn("cn=Domain Users") + """
332 objectClass: group
333 cn: Domain Users
334 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
335 """)
336
337         # Add a set of split records
338         self.ldb.add_ldif("""
339 dn: """+ self.samba4.dn("cn=X") + """
340 objectClass: user
341 cn: X
342 codePage: x
343 revision: x
344 dnsHostName: x
345 nextRid: y
346 lastLogon: x
347 description: x
348 objectSid: S-1-5-21-4231626423-2410014848-2360679739-1052
349 """)
350
351         self.ldb.add({
352             "dn": self.samba4.dn("cn=Y"),
353             "objectClass": "top",
354             "cn": "Y",
355             "codePage": "x",
356             "revision": "x",
357             "dnsHostName": "y",
358             "nextRid": "y",
359             "lastLogon": "y",
360             "description": "x"})
361
362         self.ldb.add({
363             "dn": self.samba4.dn("cn=Z"),
364             "objectClass": "top",
365             "cn": "Z",
366             "codePage": "x",
367             "revision": "y",
368             "dnsHostName": "z",
369             "nextRid": "y",
370             "lastLogon": "z",
371             "description": "y"})
372
373         # Add a set of remote records
374
375         self.samba3.db.add({
376             "dn": self.samba3.dn("cn=A"),
377             "objectClass": "posixAccount",
378             "cn": "A",
379             "sambaNextRid": "x",
380             "sambaBadPasswordCount": "x",
381             "sambaLogonTime": "x",
382             "description": "x",
383             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-1052",
384             "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
385
386         self.samba3.db.add({
387             "dn": self.samba3.dn("cn=B"),
388             "objectClass": "top",
389             "cn": "B",
390             "sambaNextRid": "x",
391             "sambaBadPasswordCount": "x",
392             "sambaLogonTime": "y",
393             "description": "x"})
394
395         self.samba3.db.add({
396             "dn": self.samba3.dn("cn=C"),
397             "objectClass": "top",
398             "cn": "C",
399             "sambaNextRid": "x",
400             "sambaBadPasswordCount": "y",
401             "sambaLogonTime": "z",
402             "description": "y"})
403
404         # Testing search by DN
405
406         # Search remote record by local DN
407         dn = self.samba4.dn("cn=A")
408         res = self.ldb.search(dn, scope=SCOPE_BASE,
409                 attrs=["dnsHostName", "lastLogon"])
410         self.assertEquals(len(res), 1)
411         self.assertEquals(str(res[0].dn), dn)
412         self.assertTrue(not "dnsHostName" in res[0])
413         self.assertEquals(str(res[0]["lastLogon"]), "x")
414
415         # Search remote record by remote DN
416         dn = self.samba3.dn("cn=A")
417         res = self.samba3.db.search(dn, scope=SCOPE_BASE,
418                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
419         self.assertEquals(len(res), 1)
420         self.assertEquals(str(res[0].dn), dn)
421         self.assertTrue(not "dnsHostName" in res[0])
422         self.assertTrue(not "lastLogon" in res[0])
423         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
424
425         # Search split record by local DN
426         dn = self.samba4.dn("cn=X")
427         res = self.ldb.search(dn, scope=SCOPE_BASE,
428                 attrs=["dnsHostName", "lastLogon"])
429         self.assertEquals(len(res), 1)
430         self.assertEquals(str(res[0].dn), dn)
431         self.assertEquals(str(res[0]["dnsHostName"]), "x")
432         self.assertEquals(str(res[0]["lastLogon"]), "x")
433
434         # Search split record by remote DN
435         dn = self.samba3.dn("cn=X")
436         res = self.samba3.db.search(dn, scope=SCOPE_BASE,
437                 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
438         self.assertEquals(len(res), 1)
439         self.assertEquals(str(res[0].dn), dn)
440         self.assertTrue(not "dnsHostName" in res[0])
441         self.assertTrue(not "lastLogon" in res[0])
442         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
443
444         # Testing search by attribute
445
446         # Search by ignored attribute
447         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
448                 attrs=["dnsHostName", "lastLogon"])
449         self.assertEquals(len(res), 2)
450         res = sorted(res, key=attrgetter('dn'))
451         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
452         self.assertEquals(str(res[0]["dnsHostName"]), "x")
453         self.assertEquals(str(res[0]["lastLogon"]), "x")
454         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
455         self.assertEquals(str(res[1]["dnsHostName"]), "y")
456         self.assertEquals(str(res[1]["lastLogon"]), "y")
457
458         # Search by kept attribute
459         res = self.ldb.search(expression="(description=y)",
460                 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
461         self.assertEquals(len(res), 2)
462         res = sorted(res, key=attrgetter('dn'))
463         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
464         self.assertTrue(not "dnsHostName" in res[0])
465         self.assertEquals(str(res[0]["lastLogon"]), "z")
466         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
467         self.assertEquals(str(res[1]["dnsHostName"]), "z")
468         self.assertEquals(str(res[1]["lastLogon"]), "z")
469
470         # Search by renamed attribute
471         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
472                               attrs=["dnsHostName", "lastLogon"])
473         self.assertEquals(len(res), 2)
474         res = sorted(res, key=attrgetter('dn'))
475         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
476         self.assertTrue(not "dnsHostName" in res[0])
477         self.assertEquals(str(res[0]["lastLogon"]), "x")
478         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
479         self.assertTrue(not "dnsHostName" in res[1])
480         self.assertEquals(str(res[1]["lastLogon"]), "y")
481
482         # Search by converted attribute
483         # TODO:
484         #   Using the SID directly in the parse tree leads to conversion
485         #   errors, letting the search fail with no results.
486         #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-1052)", scope=SCOPE_DEFAULT, attrs)
487         res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
488         self.assertEquals(len(res), 4)
489         res = sorted(res, key=attrgetter('dn'))
490         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
491         self.assertEquals(str(res[1]["dnsHostName"]), "x")
492         self.assertEquals(str(res[1]["lastLogon"]), "x")
493         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
494                              res[1]["objectSid"])
495         self.assertTrue("objectSid" in res[1])
496         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
497         self.assertTrue(not "dnsHostName" in res[0])
498         self.assertEquals(str(res[0]["lastLogon"]), "x")
499         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
500                              res[0]["objectSid"])
501         self.assertTrue("objectSid" in res[0])
502
503         # Search by generated attribute
504         # In most cases, this even works when the mapping is missing
505         # a `convert_operator' by enumerating the remote db.
506         res = self.ldb.search(expression="(primaryGroupID=512)",
507                            attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
508         self.assertEquals(len(res), 1)
509         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
510         self.assertTrue(not "dnsHostName" in res[0])
511         self.assertEquals(str(res[0]["lastLogon"]), "x")
512         self.assertEquals(str(res[0]["primaryGroupID"]), "512")
513
514         # Note that Xs "objectSid" seems to be fine in the previous search for
515         # "objectSid"...
516         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
517         #print len(res) + " results found"
518         #for i in range(len(res)):
519         #    for (obj in res[i]) {
520         #        print obj + ": " + res[i][obj]
521         #    }
522         #    print "---"
523         #
524
525         # Search by remote name of renamed attribute */
526         res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
527                               attrs=["dnsHostName", "lastLogon"])
528         self.assertEquals(len(res), 0)
529
530         # Search by objectClass
531         attrs = ["dnsHostName", "lastLogon", "objectClass"]
532         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
533         self.assertEquals(len(res), 2)
534         res = sorted(res, key=attrgetter('dn'))
535         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
536         self.assertTrue(not "dnsHostName" in res[0])
537         self.assertEquals(str(res[0]["lastLogon"]), "x")
538         self.assertEquals(str(res[0]["objectClass"][0]), "user")
539         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
540         self.assertEquals(str(res[1]["dnsHostName"]), "x")
541         self.assertEquals(str(res[1]["lastLogon"]), "x")
542         self.assertEquals(str(res[1]["objectClass"][0]), "user")
543
544         # Prove that the objectClass is actually used for the search
545         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
546                               attrs=attrs)
547         self.assertEquals(len(res), 3)
548         res = sorted(res, key=attrgetter('dn'))
549         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
550         self.assertTrue(not "dnsHostName" in res[0])
551         self.assertEquals(str(res[0]["lastLogon"]), "x")
552         self.assertEquals(res[0]["objectClass"][0], "user")
553         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
554         self.assertTrue(not "dnsHostName" in res[1])
555         self.assertEquals(str(res[1]["lastLogon"]), "y")
556         self.assertEquals(set(res[1]["objectClass"]), set(["top"]))
557         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
558         self.assertEquals(str(res[2]["dnsHostName"]), "x")
559         self.assertEquals(str(res[2]["lastLogon"]), "x")
560         self.assertEquals(str(res[2]["objectClass"][0]), "user")
561
562         # Testing search by parse tree
563
564         # Search by conjunction of local attributes
565         res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
566                               attrs=["dnsHostName", "lastLogon"])
567         self.assertEquals(len(res), 2)
568         res = sorted(res, key=attrgetter('dn'))
569         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
570         self.assertEquals(str(res[0]["dnsHostName"]), "x")
571         self.assertEquals(str(res[0]["lastLogon"]), "x")
572         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
573         self.assertEquals(str(res[1]["dnsHostName"]), "y")
574         self.assertEquals(str(res[1]["lastLogon"]), "y")
575
576         # Search by conjunction of remote attributes
577         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
578                               attrs=["dnsHostName", "lastLogon"])
579         self.assertEquals(len(res), 2)
580         res = sorted(res, key=attrgetter('dn'))
581         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
582         self.assertTrue(not "dnsHostName" in res[0])
583         self.assertEquals(str(res[0]["lastLogon"]), "x")
584         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
585         self.assertEquals(str(res[1]["dnsHostName"]), "x")
586         self.assertEquals(str(res[1]["lastLogon"]), "x")
587
588         # Search by conjunction of local and remote attribute
589         res = self.ldb.search(expression="(&(codePage=x)(description=x))",
590                               attrs=["dnsHostName", "lastLogon"])
591         self.assertEquals(len(res), 2)
592         res = sorted(res, key=attrgetter('dn'))
593         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
594         self.assertEquals(str(res[0]["dnsHostName"]), "x")
595         self.assertEquals(str(res[0]["lastLogon"]), "x")
596         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
597         self.assertEquals(str(res[1]["dnsHostName"]), "y")
598         self.assertEquals(str(res[1]["lastLogon"]), "y")
599
600         # Search by conjunction of local and remote attribute w/o match
601         attrs = ["dnsHostName", "lastLogon"]
602         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
603                               attrs=attrs)
604         self.assertEquals(len(res), 0)
605         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
606                               attrs=attrs)
607         self.assertEquals(len(res), 0)
608
609         # Search by disjunction of local attributes
610         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
611                               attrs=["dnsHostName", "lastLogon"])
612         self.assertEquals(len(res), 2)
613         res = sorted(res, key=attrgetter('dn'))
614         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
615         self.assertEquals(str(res[0]["dnsHostName"]), "x")
616         self.assertEquals(str(res[0]["lastLogon"]), "x")
617         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
618         self.assertEquals(str(res[1]["dnsHostName"]), "y")
619         self.assertEquals(str(res[1]["lastLogon"]), "y")
620
621         # Search by disjunction of remote attributes
622         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
623                               attrs=["dnsHostName", "lastLogon"])
624         self.assertEquals(len(res), 3)
625         res = sorted(res, key=attrgetter('dn'))
626         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
627         self.assertFalse("dnsHostName" in res[0])
628         self.assertEquals(str(res[0]["lastLogon"]), "x")
629         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
630         self.assertFalse("dnsHostName" in res[1])
631         self.assertEquals(str(res[1]["lastLogon"]), "y")
632         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
633         self.assertEquals(str(res[2]["dnsHostName"]), "x")
634         self.assertEquals(str(res[2]["lastLogon"]), "x")
635
636         # Search by disjunction of local and remote attribute
637         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
638                               attrs=["dnsHostName", "lastLogon"])
639         self.assertEquals(len(res), 3)
640         res = sorted(res, key=attrgetter('dn'))
641         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
642         self.assertFalse("dnsHostName" in res[0])
643         self.assertEquals(str(res[0]["lastLogon"]), "y")
644         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
645         self.assertEquals(str(res[1]["dnsHostName"]), "x")
646         self.assertEquals(str(res[1]["lastLogon"]), "x")
647         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
648         self.assertEquals(str(res[2]["dnsHostName"]), "y")
649         self.assertEquals(str(res[2]["lastLogon"]), "y")
650
651         # Search by disjunction of local and remote attribute w/o match
652         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
653                               attrs=["dnsHostName", "lastLogon"])
654         self.assertEquals(len(res), 0)
655
656         # Search by negated local attribute
657         res = self.ldb.search(expression="(!(revision=x))",
658                               attrs=["dnsHostName", "lastLogon"])
659         self.assertEquals(len(res), 6)
660         res = sorted(res, key=attrgetter('dn'))
661         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
662         self.assertTrue(not "dnsHostName" in res[0])
663         self.assertEquals(str(res[0]["lastLogon"]), "x")
664         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
665         self.assertTrue(not "dnsHostName" in res[1])
666         self.assertEquals(str(res[1]["lastLogon"]), "y")
667         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
668         self.assertTrue(not "dnsHostName" in res[2])
669         self.assertEquals(str(res[2]["lastLogon"]), "z")
670         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
671         self.assertEquals(str(res[3]["dnsHostName"]), "z")
672         self.assertEquals(str(res[3]["lastLogon"]), "z")
673
674         # Search by negated remote attribute
675         res = self.ldb.search(expression="(!(description=x))",
676                               attrs=["dnsHostName", "lastLogon"])
677         self.assertEquals(len(res), 4)
678         res = sorted(res, key=attrgetter('dn'))
679         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
680         self.assertTrue(not "dnsHostName" in res[0])
681         self.assertEquals(str(res[0]["lastLogon"]), "z")
682         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
683         self.assertEquals(str(res[1]["dnsHostName"]), "z")
684         self.assertEquals(str(res[1]["lastLogon"]), "z")
685
686         # Search by negated conjunction of local attributes
687         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
688                               attrs=["dnsHostName", "lastLogon"])
689         self.assertEquals(len(res), 6)
690         res = sorted(res, key=attrgetter('dn'))
691         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
692         self.assertTrue(not "dnsHostName" in res[0])
693         self.assertEquals(str(res[0]["lastLogon"]), "x")
694         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
695         self.assertTrue(not "dnsHostName" in res[1])
696         self.assertEquals(str(res[1]["lastLogon"]), "y")
697         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
698         self.assertTrue(not "dnsHostName" in res[2])
699         self.assertEquals(str(res[2]["lastLogon"]), "z")
700         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
701         self.assertEquals(str(res[3]["dnsHostName"]), "z")
702         self.assertEquals(str(res[3]["lastLogon"]), "z")
703
704         # Search by negated conjunction of remote attributes
705         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
706                               attrs=["dnsHostName", "lastLogon"])
707         self.assertEquals(len(res), 6)
708         res = sorted(res, key=attrgetter('dn'))
709         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
710         self.assertTrue(not "dnsHostName" in res[0])
711         self.assertEquals(str(res[0]["lastLogon"]), "y")
712         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
713         self.assertTrue(not "dnsHostName" in res[1])
714         self.assertEquals(str(res[1]["lastLogon"]), "z")
715         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
716         self.assertEquals(str(res[2]["dnsHostName"]), "y")
717         self.assertEquals(str(res[2]["lastLogon"]), "y")
718         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
719         self.assertEquals(str(res[3]["dnsHostName"]), "z")
720         self.assertEquals(str(res[3]["lastLogon"]), "z")
721
722         # Search by negated conjunction of local and remote attribute
723         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
724                               attrs=["dnsHostName", "lastLogon"])
725         self.assertEquals(len(res), 6)
726         res = sorted(res, key=attrgetter('dn'))
727         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
728         self.assertTrue(not "dnsHostName" in res[0])
729         self.assertEquals(str(res[0]["lastLogon"]), "x")
730         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
731         self.assertTrue(not "dnsHostName" in res[1])
732         self.assertEquals(str(res[1]["lastLogon"]), "y")
733         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
734         self.assertTrue(not "dnsHostName" in res[2])
735         self.assertEquals(str(res[2]["lastLogon"]), "z")
736         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
737         self.assertEquals(str(res[3]["dnsHostName"]), "z")
738         self.assertEquals(str(res[3]["lastLogon"]), "z")
739
740         # Search by negated disjunction of local attributes
741         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
742                               attrs=["dnsHostName", "lastLogon"])
743         res = sorted(res, key=attrgetter('dn'))
744         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
745         self.assertTrue(not "dnsHostName" in res[0])
746         self.assertEquals(str(res[0]["lastLogon"]), "x")
747         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
748         self.assertTrue(not "dnsHostName" in res[1])
749         self.assertEquals(str(res[1]["lastLogon"]), "y")
750         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
751         self.assertTrue(not "dnsHostName" in res[2])
752         self.assertEquals(str(res[2]["lastLogon"]), "z")
753         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
754         self.assertEquals(str(res[3]["dnsHostName"]), "z")
755         self.assertEquals(str(res[3]["lastLogon"]), "z")
756
757         # Search by negated disjunction of remote attributes
758         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
759                               attrs=["dnsHostName", "lastLogon"])
760         self.assertEquals(len(res), 5)
761         res = sorted(res, key=attrgetter('dn'))
762         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
763         self.assertTrue(not "dnsHostName" in res[0])
764         self.assertEquals(str(res[0]["lastLogon"]), "z")
765         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
766         self.assertEquals(str(res[1]["dnsHostName"]), "y")
767         self.assertEquals(str(res[1]["lastLogon"]), "y")
768         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
769         self.assertEquals(str(res[2]["dnsHostName"]), "z")
770         self.assertEquals(str(res[2]["lastLogon"]), "z")
771
772         # Search by negated disjunction of local and remote attribute
773         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
774                               attrs=["dnsHostName", "lastLogon"])
775         self.assertEquals(len(res), 5)
776         res = sorted(res, key=attrgetter('dn'))
777         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
778         self.assertTrue(not "dnsHostName" in res[0])
779         self.assertEquals(str(res[0]["lastLogon"]), "x")
780         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
781         self.assertTrue(not "dnsHostName" in res[1])
782         self.assertEquals(str(res[1]["lastLogon"]), "z")
783         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
784         self.assertEquals(str(res[2]["dnsHostName"]), "z")
785         self.assertEquals(str(res[2]["lastLogon"]), "z")
786
787         # Search by complex parse tree
788         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
789         self.assertEquals(len(res), 7)
790         res = sorted(res, key=attrgetter('dn'))
791         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
792         self.assertTrue(not "dnsHostName" in res[0])
793         self.assertEquals(str(res[0]["lastLogon"]), "x")
794         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
795         self.assertTrue(not "dnsHostName" in res[1])
796         self.assertEquals(str(res[1]["lastLogon"]), "y")
797         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
798         self.assertTrue(not "dnsHostName" in res[2])
799         self.assertEquals(str(res[2]["lastLogon"]), "z")
800         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=X"))
801         self.assertEquals(str(res[3]["dnsHostName"]), "x")
802         self.assertEquals(str(res[3]["lastLogon"]), "x")
803         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=Z"))
804         self.assertEquals(str(res[4]["dnsHostName"]), "z")
805         self.assertEquals(str(res[4]["lastLogon"]), "z")
806
807         # Clean up
808         dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
809         for dn in dns:
810             self.ldb.delete(dn)
811
812     def test_map_modify_local(self):
813         """Modification of local records."""
814         # Add local record
815         dn = "cn=test,dc=idealx,dc=org"
816         self.ldb.add({"dn": dn,
817                  "cn": "test",
818                  "foo": "bar",
819                  "revision": "1",
820                  "description": "test"})
821         # Check it's there
822         attrs = ["foo", "revision", "description"]
823         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
824         self.assertEquals(len(res), 1)
825         self.assertEquals(str(res[0].dn), dn)
826         self.assertEquals(str(res[0]["foo"]), "bar")
827         self.assertEquals(str(res[0]["revision"]), "1")
828         self.assertEquals(str(res[0]["description"]), "test")
829         # Check it's not in the local db
830         res = self.samba4.db.search(expression="(cn=test)",
831                                     scope=SCOPE_DEFAULT, attrs=attrs)
832         self.assertEquals(len(res), 0)
833         # Check it's not in the remote db
834         res = self.samba3.db.search(expression="(cn=test)",
835                                     scope=SCOPE_DEFAULT, attrs=attrs)
836         self.assertEquals(len(res), 0)
837
838         # Modify local record
839         ldif = """
840 dn: """ + dn + """
841 replace: foo
842 foo: baz
843 replace: description
844 description: foo
845 """
846         self.ldb.modify_ldif(ldif)
847         # Check in local db
848         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
849         self.assertEquals(len(res), 1)
850         self.assertEquals(str(res[0].dn), dn)
851         self.assertEquals(str(res[0]["foo"]), "baz")
852         self.assertEquals(str(res[0]["revision"]), "1")
853         self.assertEquals(str(res[0]["description"]), "foo")
854
855         # Rename local record
856         dn2 = "cn=toast,dc=idealx,dc=org"
857         self.ldb.rename(dn, dn2)
858         # Check in local db
859         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
860         self.assertEquals(len(res), 1)
861         self.assertEquals(str(res[0].dn), dn2)
862         self.assertEquals(str(res[0]["foo"]), "baz")
863         self.assertEquals(str(res[0]["revision"]), "1")
864         self.assertEquals(str(res[0]["description"]), "foo")
865
866         # Delete local record
867         self.ldb.delete(dn2)
868         # Check it's gone
869         res = self.ldb.search(dn2, scope=SCOPE_BASE)
870         self.assertEquals(len(res), 0)
871
872     def test_map_modify_remote_remote(self):
873         """Modification of remote data of remote records"""
874         # Add remote record
875         dn = self.samba4.dn("cn=test")
876         dn2 = self.samba3.dn("cn=test")
877         self.samba3.db.add({"dn": dn2,
878                    "cn": "test",
879                    "description": "foo",
880                    "sambaBadPasswordCount": "3",
881                    "sambaNextRid": "1001"})
882         # Check it's there
883         res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
884                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
885         self.assertEquals(len(res), 1)
886         self.assertEquals(str(res[0].dn), dn2)
887         self.assertEquals(str(res[0]["description"]), "foo")
888         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
889         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
890         # Check in mapped db
891         attrs = ["description", "badPwdCount", "nextRid"]
892         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
893         self.assertEquals(len(res), 1)
894         self.assertEquals(str(res[0].dn), dn)
895         self.assertEquals(str(res[0]["description"]), "foo")
896         self.assertEquals(str(res[0]["badPwdCount"]), "3")
897         self.assertEquals(str(res[0]["nextRid"]), "1001")
898         # Check in local db
899         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
900         self.assertEquals(len(res), 0)
901
902         # Modify remote data of remote record
903         ldif = """
904 dn: """ + dn + """
905 replace: description
906 description: test
907 replace: badPwdCount
908 badPwdCount: 4
909 """
910         self.ldb.modify_ldif(ldif)
911         # Check in mapped db
912         res = self.ldb.search(dn, scope=SCOPE_BASE,
913                 attrs=["description", "badPwdCount", "nextRid"])
914         self.assertEquals(len(res), 1)
915         self.assertEquals(str(res[0].dn), dn)
916         self.assertEquals(str(res[0]["description"]), "test")
917         self.assertEquals(str(res[0]["badPwdCount"]), "4")
918         self.assertEquals(str(res[0]["nextRid"]), "1001")
919         # Check in remote db
920         res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
921                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
922         self.assertEquals(len(res), 1)
923         self.assertEquals(str(res[0].dn), dn2)
924         self.assertEquals(str(res[0]["description"]), "test")
925         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
926         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
927
928         # Rename remote record
929         dn2 = self.samba4.dn("cn=toast")
930         self.ldb.rename(dn, dn2)
931         # Check in mapped db
932         dn = dn2
933         res = self.ldb.search(dn, scope=SCOPE_BASE,
934                 attrs=["description", "badPwdCount", "nextRid"])
935         self.assertEquals(len(res), 1)
936         self.assertEquals(str(res[0].dn), dn)
937         self.assertEquals(str(res[0]["description"]), "test")
938         self.assertEquals(str(res[0]["badPwdCount"]), "4")
939         self.assertEquals(str(res[0]["nextRid"]), "1001")
940         # Check in remote db
941         dn2 = self.samba3.dn("cn=toast")
942         res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
943                 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
944         self.assertEquals(len(res), 1)
945         self.assertEquals(str(res[0].dn), dn2)
946         self.assertEquals(str(res[0]["description"]), "test")
947         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
948         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
949
950         # Delete remote record
951         self.ldb.delete(dn)
952         # Check in mapped db that it's removed
953         res = self.ldb.search(dn, scope=SCOPE_BASE)
954         self.assertEquals(len(res), 0)
955         # Check in remote db
956         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
957         self.assertEquals(len(res), 0)
958
959     def test_map_modify_remote_local(self):
960         """Modification of local data of remote records"""
961         # Add remote record (same as before)
962         dn = self.samba4.dn("cn=test")
963         dn2 = self.samba3.dn("cn=test")
964         self.samba3.db.add({"dn": dn2,
965                    "cn": "test",
966                    "description": "foo",
967                    "sambaBadPasswordCount": "3",
968                    "sambaNextRid": "1001"})
969
970         # Modify local data of remote record
971         ldif = """
972 dn: """ + dn + """
973 add: revision
974 revision: 1
975 replace: description
976 description: test
977
978 """
979         self.ldb.modify_ldif(ldif)
980         # Check in mapped db
981         attrs = ["revision", "description"]
982         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
983         self.assertEquals(len(res), 1)
984         self.assertEquals(str(res[0].dn), dn)
985         self.assertEquals(str(res[0]["description"]), "test")
986         self.assertEquals(str(res[0]["revision"]), "1")
987         # Check in remote db
988         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
989         self.assertEquals(len(res), 1)
990         self.assertEquals(str(res[0].dn), dn2)
991         self.assertEquals(str(res[0]["description"]), "test")
992         self.assertTrue(not "revision" in res[0])
993         # Check in local db
994         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
995         self.assertEquals(len(res), 1)
996         self.assertEquals(str(res[0].dn), dn)
997         self.assertTrue(not "description" in res[0])
998         self.assertEquals(str(res[0]["revision"]), "1")
999
1000         # Delete (newly) split record
1001         self.ldb.delete(dn)
1002
1003     def test_map_modify_split(self):
1004         """Testing modification of split records"""
1005         # Add split record
1006         dn = self.samba4.dn("cn=test")
1007         dn2 = self.samba3.dn("cn=test")
1008         self.ldb.add({
1009             "dn": dn,
1010             "cn": "test",
1011             "description": "foo",
1012             "badPwdCount": "3",
1013             "nextRid": "1001",
1014             "revision": "1"})
1015         # Check it's there
1016         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1017         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1018         self.assertEquals(len(res), 1)
1019         self.assertEquals(str(res[0].dn), dn)
1020         self.assertEquals(str(res[0]["description"]), "foo")
1021         self.assertEquals(str(res[0]["badPwdCount"]), "3")
1022         self.assertEquals(str(res[0]["nextRid"]), "1001")
1023         self.assertEquals(str(res[0]["revision"]), "1")
1024         # Check in local db
1025         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1026         self.assertEquals(len(res), 1)
1027         self.assertEquals(str(res[0].dn), dn)
1028         self.assertTrue(not "description" in res[0])
1029         self.assertTrue(not "badPwdCount" in res[0])
1030         self.assertTrue(not "nextRid" in res[0])
1031         self.assertEquals(str(res[0]["revision"]), "1")
1032         # Check in remote db
1033         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1034                  "revision"]
1035         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1036         self.assertEquals(len(res), 1)
1037         self.assertEquals(str(res[0].dn), dn2)
1038         self.assertEquals(str(res[0]["description"]), "foo")
1039         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1040         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1041         self.assertTrue(not "revision" in res[0])
1042
1043         # Modify of split record
1044         ldif = """
1045 dn: """ + dn + """
1046 replace: description
1047 description: test
1048 replace: badPwdCount
1049 badPwdCount: 4
1050 replace: revision
1051 revision: 2
1052 """
1053         self.ldb.modify_ldif(ldif)
1054         # Check in mapped db
1055         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1056         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1057         self.assertEquals(len(res), 1)
1058         self.assertEquals(str(res[0].dn), dn)
1059         self.assertEquals(str(res[0]["description"]), "test")
1060         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1061         self.assertEquals(str(res[0]["nextRid"]), "1001")
1062         self.assertEquals(str(res[0]["revision"]), "2")
1063         # Check in local db
1064         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1065         self.assertEquals(len(res), 1)
1066         self.assertEquals(str(res[0].dn), dn)
1067         self.assertTrue(not "description" in res[0])
1068         self.assertTrue(not "badPwdCount" in res[0])
1069         self.assertTrue(not "nextRid" in res[0])
1070         self.assertEquals(str(res[0]["revision"]), "2")
1071         # Check in remote db
1072         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1073                  "revision"]
1074         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1075         self.assertEquals(len(res), 1)
1076         self.assertEquals(str(res[0].dn), dn2)
1077         self.assertEquals(str(res[0]["description"]), "test")
1078         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1079         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1080         self.assertTrue(not "revision" in res[0])
1081
1082         # Rename split record
1083         dn2 = self.samba4.dn("cn=toast")
1084         self.ldb.rename(dn, dn2)
1085         # Check in mapped db
1086         dn = dn2
1087         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1088         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1089         self.assertEquals(len(res), 1)
1090         self.assertEquals(str(res[0].dn), dn)
1091         self.assertEquals(str(res[0]["description"]), "test")
1092         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1093         self.assertEquals(str(res[0]["nextRid"]), "1001")
1094         self.assertEquals(str(res[0]["revision"]), "2")
1095         # Check in local db
1096         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1097         self.assertEquals(len(res), 1)
1098         self.assertEquals(str(res[0].dn), dn)
1099         self.assertTrue(not "description" in res[0])
1100         self.assertTrue(not "badPwdCount" in res[0])
1101         self.assertTrue(not "nextRid" in res[0])
1102         self.assertEquals(str(res[0]["revision"]), "2")
1103         # Check in remote db
1104         dn2 = self.samba3.dn("cn=toast")
1105         res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1106           attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1107                  "revision"])
1108         self.assertEquals(len(res), 1)
1109         self.assertEquals(str(res[0].dn), dn2)
1110         self.assertEquals(str(res[0]["description"]), "test")
1111         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1112         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1113         self.assertTrue(not "revision" in res[0])
1114
1115         # Delete split record
1116         self.ldb.delete(dn)
1117         # Check in mapped db
1118         res = self.ldb.search(dn, scope=SCOPE_BASE)
1119         self.assertEquals(len(res), 0)
1120         # Check in local db
1121         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1122         self.assertEquals(len(res), 0)
1123         # Check in remote db
1124         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1125         self.assertEquals(len(res), 0)