traffic: new version of model with packet_rate, version number
[samba.git] / python / samba / tests / samba3sam.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
3 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
4 #
5 # This is a Python port of the original in testprogs/ejs/samba3sam.js
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20
21 from __future__ import print_function
22 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
23
24 import os
25 import ldb
26 from ldb import SCOPE_DEFAULT, SCOPE_BASE
27 from samba import Ldb, substitute_var
28 from samba.tests import TestCaseInTempDir, env_loadparm
29 import samba.dcerpc.security
30 import samba.ndr
31 from samba.auth import system_session
32 from operator import attrgetter
33
34
35 def read_datafile(filename):
36     paths = ["../../../../../testdata/samba3",
37              "../../../../testdata/samba3"]
38     for p in paths:
39         datadir = os.path.join(os.path.dirname(__file__), p)
40         if os.path.exists(datadir):
41             break
42     return open(os.path.join(datadir, filename), 'r').read()
43
44
45 def ldb_debug(l, text):
46     print(text)
47
48
49 class MapBaseTestCase(TestCaseInTempDir):
50     """Base test case for mapping tests."""
51
52     def setup_modules(self, ldb, s3, s4):
53         ldb.add({"dn": "@MAP=samba3sam",
54                  "@FROM": s4.basedn,
55                  "@TO": "sambaDomainName=TESTS," + s3.basedn})
56
57         ldb.add({"dn": "@MODULES",
58                  "@LIST": "rootdse,dsdb_paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,show_deleted_ignore,dsdb_flags_ignore,partition"})
59
60         ldb.add({"dn": "@PARTITION",
61                  "partition": ["%s" % (s4.basedn_casefold),
62                                "%s" % (s3.basedn_casefold)],
63                  "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
64                  "modules": "*:"})
65
66     def setUp(self):
67         self.lp = env_loadparm()
68         self.lp.set("workgroup", "TESTS")
69         self.lp.set("netbios name", "TESTS")
70         super(MapBaseTestCase, self).setUp()
71
72         def make_dn(basedn, rdn):
73             return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
74
75         def make_s4dn(basedn, rdn):
76             return "%s,%s" % (rdn, basedn)
77
78         self.ldbfile = os.path.join(self.tempdir, "sam.ldb")
79         self.ldburl = "tdb://" + self.ldbfile
80
81         tempdir = self.tempdir
82
83         class Target:
84             """Simple helper class that contains data for a specific SAM
85             connection."""
86
87             def __init__(self, basedn, dn, lp):
88                 self.db = Ldb(lp=lp, session_info=system_session())
89                 self.db.set_opaque("skip_allocate_sids", "true")
90                 self.basedn = basedn
91                 self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold()
92                 self.substvars = {"BASEDN": self.basedn}
93                 self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold)
94                 self.url = "tdb://" + self.file
95                 self._dn = dn
96
97             def dn(self, rdn):
98                 return self._dn(self.basedn, rdn)
99
100             def connect(self):
101                 return self.db.connect(self.url)
102
103             def setup_data(self, path):
104                 self.add_ldif(read_datafile(path))
105
106             def subst(self, text):
107                 return substitute_var(text, self.substvars)
108
109             def add_ldif(self, ldif):
110                 self.db.add_ldif(self.subst(ldif))
111
112             def modify_ldif(self, ldif):
113                 self.db.modify_ldif(self.subst(ldif))
114
115         self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn, self.lp)
116         self.samba3 = Target("cn=Samba3Sam", make_dn, self.lp)
117
118         self.samba3.connect()
119         self.samba4.connect()
120
121     def tearDown(self):
122         os.unlink(self.ldbfile)
123         os.unlink(self.samba3.file)
124         os.unlink(self.samba4.file)
125         pdir = "%s.d" % self.ldbfile
126         mdata = os.path.join(pdir, "metadata.tdb")
127         if os.path.exists(mdata):
128             os.unlink(mdata)
129             os.rmdir(pdir)
130         super(MapBaseTestCase, self).tearDown()
131
132     def assertSidEquals(self, text, ndr_sid):
133         sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
134                                         ndr_sid[0])
135         sid_obj2 = samba.dcerpc.security.dom_sid(text)
136         self.assertEquals(sid_obj1, sid_obj2)
137
138
139 class Samba3SamTestCase(MapBaseTestCase):
140
141     def setUp(self):
142         super(Samba3SamTestCase, self).setUp()
143         ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
144         ldb.set_opaque("skip_allocate_sids", "true")
145         self.samba3.setup_data("samba3.ldif")
146         ldif = read_datafile("provision_samba3sam.ldif")
147         ldb.add_ldif(self.samba4.subst(ldif))
148         self.setup_modules(ldb, self.samba3, self.samba4)
149         del ldb
150         self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
151         self.ldb.set_opaque("skip_allocate_sids", "true")
152
153     def test_search_non_mapped(self):
154         """Looking up by non-mapped attribute"""
155         msg = self.ldb.search(expression="(cn=Administrator)")
156         self.assertEquals(len(msg), 1)
157         self.assertEquals(str(msg[0]["cn"]), "Administrator")
158
159     def test_search_mapped(self):
160         """Looking up by mapped attribute"""
161         msg = self.ldb.search(expression="(name=Backup Operators)")
162         self.assertEquals(len(msg), 1)
163         self.assertEquals(str(msg[0]["name"]), "Backup Operators")
164
165     def test_old_name_of_renamed(self):
166         """Looking up by old name of renamed attribute"""
167         msg = self.ldb.search(expression="(displayName=Backup Operators)")
168         self.assertEquals(len(msg), 0)
169
170     def test_mapped_containing_sid(self):
171         """Looking up mapped entry containing SID"""
172         msg = self.ldb.search(expression="(cn=Replicator)")
173         self.assertEquals(len(msg), 1)
174         self.assertEquals(str(msg[0].dn),
175                           "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
176         self.assertTrue("objectSid" in msg[0])
177         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
178                              msg[0]["objectSid"])
179         oc = set(msg[0]["objectClass"])
180         self.assertEquals(oc, set([b"group"]))
181
182     def test_search_by_objclass(self):
183         """Looking up by objectClass"""
184         msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
185         self.assertEquals(set([str(m.dn) for m in msg]),
186                           set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
187                                "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
188
189     def test_s3sam_modify(self):
190         # Adding a record that will be fallbacked
191         self.ldb.add({
192             "dn": "cn=Foo",
193             "foo": "bar",
194             "blah": "Blie",
195             "cn": "Foo",
196             "showInAdvancedViewOnly": "TRUE"})
197
198         # Checking for existence of record (local)
199         # TODO: This record must be searched in the local database, which is
200         # currently only supported for base searches
201         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
202         # TODO: Actually, this version should work as well but doesn't...
203         #
204         #
205         msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
206                               scope=SCOPE_BASE,
207                               attrs=['foo', 'blah', 'cn', 'showInAdvancedViewOnly'])
208         self.assertEquals(len(msg), 1)
209         self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
210         self.assertEquals(str(msg[0]["foo"]), "bar")
211         self.assertEquals(str(msg[0]["blah"]), "Blie")
212
213         # Adding record that will be mapped
214         self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
215                       "objectClass": "user",
216                       "unixName": "bin",
217                       "sambaUnicodePwd": "geheim",
218                       "cn": "Niemand"})
219
220         # Checking for existence of record (remote)
221         msg = self.ldb.search(expression="(unixName=bin)",
222                               attrs=['unixName', 'cn', 'dn', 'sambaUnicodePwd'])
223         self.assertEquals(len(msg), 1)
224         self.assertEquals(str(msg[0]["cn"]), "Niemand")
225         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
226
227         # Checking for existence of record (local && remote)
228         msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
229                               attrs=['unixName', 'cn', 'dn', 'sambaUnicodePwd'])
230         self.assertEquals(len(msg), 1)           # TODO: should check with more records
231         self.assertEquals(str(msg[0]["cn"]), "Niemand")
232         self.assertEquals(str(msg[0]["unixName"]), "bin")
233         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
234
235         # Checking for existence of record (local || remote)
236         msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
237                               attrs=['unixName', 'cn', 'dn', 'sambaUnicodePwd'])
238         # print "got %d replies" % len(msg)
239         self.assertEquals(len(msg), 1)        # TODO: should check with more records
240         self.assertEquals(str(msg[0]["cn"]), "Niemand")
241         self.assertEquals(str(msg[0]["unixName"]), "bin")
242         self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
243
244         # Checking for data in destination database
245         msg = self.samba3.db.search(expression="(cn=Niemand)")
246         self.assertTrue(len(msg) >= 1)
247         self.assertEquals(str(msg[0]["sambaSID"]),
248                           "S-1-5-21-4231626423-2410014848-2360679739-2001")
249         self.assertEquals(str(msg[0]["displayName"]), "Niemand")
250
251         # Adding attribute...
252         self.ldb.modify_ldif("""
253 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
254 changetype: modify
255 add: description
256 description: Blah
257 """)
258
259         # Checking whether changes are still there...
260         msg = self.ldb.search(expression="(cn=Niemand)")
261         self.assertTrue(len(msg) >= 1)
262         self.assertEquals(str(msg[0]["cn"]), "Niemand")
263         self.assertEquals(str(msg[0]["description"]), "Blah")
264
265         # Modifying attribute...
266         self.ldb.modify_ldif("""
267 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
268 changetype: modify
269 replace: description
270 description: Blie
271 """)
272
273         # Checking whether changes are still there...
274         msg = self.ldb.search(expression="(cn=Niemand)")
275         self.assertTrue(len(msg) >= 1)
276         self.assertEquals(str(msg[0]["description"]), "Blie")
277
278         # Deleting attribute...
279         self.ldb.modify_ldif("""
280 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
281 changetype: modify
282 delete: description
283 """)
284
285         # Checking whether changes are no longer there...
286         msg = self.ldb.search(expression="(cn=Niemand)")
287         self.assertTrue(len(msg) >= 1)
288         self.assertTrue("description" not in msg[0])
289
290         # Renaming record...
291         self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
292                         "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
293
294         # Checking whether DN has changed...
295         msg = self.ldb.search(expression="(cn=Niemand2)")
296         self.assertEquals(len(msg), 1)
297         self.assertEquals(str(msg[0].dn),
298                           "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
299
300         # Deleting record...
301         self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
302
303         # Checking whether record is gone...
304         msg = self.ldb.search(expression="(cn=Niemand2)")
305         self.assertEquals(len(msg), 0)
306
307
308 class MapTestCase(MapBaseTestCase):
309
310     def setUp(self):
311         super(MapTestCase, self).setUp()
312         ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
313         ldb.set_opaque("skip_allocate_sids", "true")
314         ldif = read_datafile("provision_samba3sam.ldif")
315         ldb.add_ldif(self.samba4.subst(ldif))
316         self.setup_modules(ldb, self.samba3, self.samba4)
317         del ldb
318         self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
319         self.ldb.set_opaque("skip_allocate_sids", "true")
320
321     def test_map_search(self):
322         """Running search tests on mapped data."""
323         self.samba3.db.add({
324             "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
325             "objectclass": ["sambaDomain", "top"],
326             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
327             "sambaNextRid": "2000",
328             "sambaDomainName": "TESTS"
329         })
330
331         # Add a set of split records
332         self.ldb.add_ldif("""
333 dn: """ + self.samba4.dn("cn=Domain Users") + """
334 objectClass: group
335 cn: Domain Users
336 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
337 """)
338
339         # Add a set of split records
340         self.ldb.add_ldif("""
341 dn: """ + self.samba4.dn("cn=X") + """
342 objectClass: user
343 cn: X
344 codePage: x
345 revision: x
346 dnsHostName: x
347 nextRid: y
348 lastLogon: x
349 description: x
350 objectSid: S-1-5-21-4231626423-2410014848-2360679739-1052
351 """)
352
353         self.ldb.add({
354             "dn": self.samba4.dn("cn=Y"),
355             "objectClass": "top",
356             "cn": "Y",
357             "codePage": "x",
358             "revision": "x",
359             "dnsHostName": "y",
360             "nextRid": "y",
361             "lastLogon": "y",
362             "description": "x"})
363
364         self.ldb.add({
365             "dn": self.samba4.dn("cn=Z"),
366             "objectClass": "top",
367             "cn": "Z",
368             "codePage": "x",
369             "revision": "y",
370             "dnsHostName": "z",
371             "nextRid": "y",
372             "lastLogon": "z",
373             "description": "y"})
374
375         # Add a set of remote records
376
377         self.samba3.db.add({
378             "dn": self.samba3.dn("cn=A"),
379             "objectClass": "posixAccount",
380             "cn": "A",
381             "sambaNextRid": "x",
382             "sambaBadPasswordCount": "x",
383             "sambaLogonTime": "x",
384             "description": "x",
385             "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-1052",
386             "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
387
388         self.samba3.db.add({
389             "dn": self.samba3.dn("cn=B"),
390             "objectClass": "top",
391             "cn": "B",
392             "sambaNextRid": "x",
393             "sambaBadPasswordCount": "x",
394             "sambaLogonTime": "y",
395             "description": "x"})
396
397         self.samba3.db.add({
398             "dn": self.samba3.dn("cn=C"),
399             "objectClass": "top",
400             "cn": "C",
401             "sambaNextRid": "x",
402             "sambaBadPasswordCount": "y",
403             "sambaLogonTime": "z",
404             "description": "y"})
405
406         # Testing search by DN
407
408         # Search remote record by local DN
409         dn = self.samba4.dn("cn=A")
410         res = self.ldb.search(dn, scope=SCOPE_BASE,
411                               attrs=["dnsHostName", "lastLogon"])
412         self.assertEquals(len(res), 1)
413         self.assertEquals(str(res[0].dn), dn)
414         self.assertTrue("dnsHostName" not in res[0])
415         self.assertEquals(str(res[0]["lastLogon"]), "x")
416
417         # Search remote record by remote DN
418         dn = self.samba3.dn("cn=A")
419         res = self.samba3.db.search(dn, scope=SCOPE_BASE,
420                                     attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
421         self.assertEquals(len(res), 1)
422         self.assertEquals(str(res[0].dn), dn)
423         self.assertTrue("dnsHostName" not in res[0])
424         self.assertTrue("lastLogon" not in res[0])
425         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
426
427         # Search split record by local DN
428         dn = self.samba4.dn("cn=X")
429         res = self.ldb.search(dn, scope=SCOPE_BASE,
430                               attrs=["dnsHostName", "lastLogon"])
431         self.assertEquals(len(res), 1)
432         self.assertEquals(str(res[0].dn), dn)
433         self.assertEquals(str(res[0]["dnsHostName"]), "x")
434         self.assertEquals(str(res[0]["lastLogon"]), "x")
435
436         # Search split record by remote DN
437         dn = self.samba3.dn("cn=X")
438         res = self.samba3.db.search(dn, scope=SCOPE_BASE,
439                                     attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
440         self.assertEquals(len(res), 1)
441         self.assertEquals(str(res[0].dn), dn)
442         self.assertTrue("dnsHostName" not in res[0])
443         self.assertTrue("lastLogon" not in res[0])
444         self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
445
446         # Testing search by attribute
447
448         # Search by ignored attribute
449         res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
450                               attrs=["dnsHostName", "lastLogon"])
451         self.assertEquals(len(res), 2)
452         res = sorted(res, key=attrgetter('dn'))
453         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
454         self.assertEquals(str(res[0]["dnsHostName"]), "x")
455         self.assertEquals(str(res[0]["lastLogon"]), "x")
456         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
457         self.assertEquals(str(res[1]["dnsHostName"]), "y")
458         self.assertEquals(str(res[1]["lastLogon"]), "y")
459
460         # Search by kept attribute
461         res = self.ldb.search(expression="(description=y)",
462                               scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
463         self.assertEquals(len(res), 2)
464         res = sorted(res, key=attrgetter('dn'))
465         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
466         self.assertTrue("dnsHostName" not in res[0])
467         self.assertEquals(str(res[0]["lastLogon"]), "z")
468         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
469         self.assertEquals(str(res[1]["dnsHostName"]), "z")
470         self.assertEquals(str(res[1]["lastLogon"]), "z")
471
472         # Search by renamed attribute
473         res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
474                               attrs=["dnsHostName", "lastLogon"])
475         self.assertEquals(len(res), 2)
476         res = sorted(res, key=attrgetter('dn'))
477         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
478         self.assertTrue("dnsHostName" not in res[0])
479         self.assertEquals(str(res[0]["lastLogon"]), "x")
480         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
481         self.assertTrue("dnsHostName" not in res[1])
482         self.assertEquals(str(res[1]["lastLogon"]), "y")
483
484         # Search by converted attribute
485         # TODO:
486         #   Using the SID directly in the parse tree leads to conversion
487         #   errors, letting the search fail with no results.
488         # res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-1052)", scope=SCOPE_DEFAULT, attrs)
489         res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
490         self.assertEquals(len(res), 4)
491         res = sorted(res, key=attrgetter('dn'))
492         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
493         self.assertEquals(str(res[1]["dnsHostName"]), "x")
494         self.assertEquals(str(res[1]["lastLogon"]), "x")
495         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
496                              res[1]["objectSid"])
497         self.assertTrue("objectSid" in res[1])
498         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
499         self.assertTrue("dnsHostName" not in res[0])
500         self.assertEquals(str(res[0]["lastLogon"]), "x")
501         self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
502                              res[0]["objectSid"])
503         self.assertTrue("objectSid" in res[0])
504
505         # Search by generated attribute
506         # In most cases, this even works when the mapping is missing
507         # a `convert_operator' by enumerating the remote db.
508         res = self.ldb.search(expression="(primaryGroupID=512)",
509                               attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
510         self.assertEquals(len(res), 1)
511         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
512         self.assertTrue("dnsHostName" not in res[0])
513         self.assertEquals(str(res[0]["lastLogon"]), "x")
514         self.assertEquals(str(res[0]["primaryGroupID"]), "512")
515
516         # Note that Xs "objectSid" seems to be fine in the previous search for
517         # "objectSid"...
518         # res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
519         # print len(res) + " results found"
520         # for i in range(len(res)):
521         #    for (obj in res[i]) {
522         #        print obj + ": " + res[i][obj]
523         #    }
524         #    print "---"
525         #
526
527         # Search by remote name of renamed attribute */
528         res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
529                               attrs=["dnsHostName", "lastLogon"])
530         self.assertEquals(len(res), 0)
531
532         # Search by objectClass
533         attrs = ["dnsHostName", "lastLogon", "objectClass"]
534         res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
535         self.assertEquals(len(res), 2)
536         res = sorted(res, key=attrgetter('dn'))
537         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
538         self.assertTrue("dnsHostName" not in res[0])
539         self.assertEquals(str(res[0]["lastLogon"]), "x")
540         self.assertEquals(str(res[0]["objectClass"][0]), "user")
541         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
542         self.assertEquals(str(res[1]["dnsHostName"]), "x")
543         self.assertEquals(str(res[1]["lastLogon"]), "x")
544         self.assertEquals(str(res[1]["objectClass"][0]), "user")
545
546         # Prove that the objectClass is actually used for the search
547         res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
548                               attrs=attrs)
549         self.assertEquals(len(res), 3)
550         res = sorted(res, key=attrgetter('dn'))
551         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
552         self.assertTrue("dnsHostName" not in res[0])
553         self.assertEquals(str(res[0]["lastLogon"]), "x")
554         self.assertEquals(str(res[0]["objectClass"][0]), "user")
555         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
556         self.assertTrue("dnsHostName" not in res[1])
557         self.assertEquals(str(res[1]["lastLogon"]), "y")
558         self.assertEquals(set(res[1]["objectClass"]), set([b"top"]))
559         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
560         self.assertEquals(str(res[2]["dnsHostName"]), "x")
561         self.assertEquals(str(res[2]["lastLogon"]), "x")
562         self.assertEquals(str(res[2]["objectClass"][0]), "user")
563
564         # Testing search by parse tree
565
566         # Search by conjunction of local attributes
567         res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
568                               attrs=["dnsHostName", "lastLogon"])
569         self.assertEquals(len(res), 2)
570         res = sorted(res, key=attrgetter('dn'))
571         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
572         self.assertEquals(str(res[0]["dnsHostName"]), "x")
573         self.assertEquals(str(res[0]["lastLogon"]), "x")
574         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
575         self.assertEquals(str(res[1]["dnsHostName"]), "y")
576         self.assertEquals(str(res[1]["lastLogon"]), "y")
577
578         # Search by conjunction of remote attributes
579         res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
580                               attrs=["dnsHostName", "lastLogon"])
581         self.assertEquals(len(res), 2)
582         res = sorted(res, key=attrgetter('dn'))
583         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
584         self.assertTrue("dnsHostName" not in res[0])
585         self.assertEquals(str(res[0]["lastLogon"]), "x")
586         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
587         self.assertEquals(str(res[1]["dnsHostName"]), "x")
588         self.assertEquals(str(res[1]["lastLogon"]), "x")
589
590         # Search by conjunction of local and remote attribute
591         res = self.ldb.search(expression="(&(codePage=x)(description=x))",
592                               attrs=["dnsHostName", "lastLogon"])
593         self.assertEquals(len(res), 2)
594         res = sorted(res, key=attrgetter('dn'))
595         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
596         self.assertEquals(str(res[0]["dnsHostName"]), "x")
597         self.assertEquals(str(res[0]["lastLogon"]), "x")
598         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
599         self.assertEquals(str(res[1]["dnsHostName"]), "y")
600         self.assertEquals(str(res[1]["lastLogon"]), "y")
601
602         # Search by conjunction of local and remote attribute w/o match
603         attrs = ["dnsHostName", "lastLogon"]
604         res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
605                               attrs=attrs)
606         self.assertEquals(len(res), 0)
607         res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
608                               attrs=attrs)
609         self.assertEquals(len(res), 0)
610
611         # Search by disjunction of local attributes
612         res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
613                               attrs=["dnsHostName", "lastLogon"])
614         self.assertEquals(len(res), 2)
615         res = sorted(res, key=attrgetter('dn'))
616         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
617         self.assertEquals(str(res[0]["dnsHostName"]), "x")
618         self.assertEquals(str(res[0]["lastLogon"]), "x")
619         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
620         self.assertEquals(str(res[1]["dnsHostName"]), "y")
621         self.assertEquals(str(res[1]["lastLogon"]), "y")
622
623         # Search by disjunction of remote attributes
624         res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
625                               attrs=["dnsHostName", "lastLogon"])
626         self.assertEquals(len(res), 3)
627         res = sorted(res, key=attrgetter('dn'))
628         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
629         self.assertFalse("dnsHostName" in res[0])
630         self.assertEquals(str(res[0]["lastLogon"]), "x")
631         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
632         self.assertFalse("dnsHostName" in res[1])
633         self.assertEquals(str(res[1]["lastLogon"]), "y")
634         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
635         self.assertEquals(str(res[2]["dnsHostName"]), "x")
636         self.assertEquals(str(res[2]["lastLogon"]), "x")
637
638         # Search by disjunction of local and remote attribute
639         res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
640                               attrs=["dnsHostName", "lastLogon"])
641         self.assertEquals(len(res), 3)
642         res = sorted(res, key=attrgetter('dn'))
643         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
644         self.assertFalse("dnsHostName" in res[0])
645         self.assertEquals(str(res[0]["lastLogon"]), "y")
646         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
647         self.assertEquals(str(res[1]["dnsHostName"]), "x")
648         self.assertEquals(str(res[1]["lastLogon"]), "x")
649         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
650         self.assertEquals(str(res[2]["dnsHostName"]), "y")
651         self.assertEquals(str(res[2]["lastLogon"]), "y")
652
653         # Search by disjunction of local and remote attribute w/o match
654         res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
655                               attrs=["dnsHostName", "lastLogon"])
656         self.assertEquals(len(res), 0)
657
658         # Search by negated local attribute
659         res = self.ldb.search(expression="(!(revision=x))",
660                               attrs=["dnsHostName", "lastLogon"])
661         self.assertEquals(len(res), 6)
662         res = sorted(res, key=attrgetter('dn'))
663         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
664         self.assertTrue("dnsHostName" not in res[0])
665         self.assertEquals(str(res[0]["lastLogon"]), "x")
666         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
667         self.assertTrue("dnsHostName" not in res[1])
668         self.assertEquals(str(res[1]["lastLogon"]), "y")
669         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
670         self.assertTrue("dnsHostName" not in res[2])
671         self.assertEquals(str(res[2]["lastLogon"]), "z")
672         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
673         self.assertEquals(str(res[3]["dnsHostName"]), "z")
674         self.assertEquals(str(res[3]["lastLogon"]), "z")
675
676         # Search by negated remote attribute
677         res = self.ldb.search(expression="(!(description=x))",
678                               attrs=["dnsHostName", "lastLogon"])
679         self.assertEquals(len(res), 4)
680         res = sorted(res, key=attrgetter('dn'))
681         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
682         self.assertTrue("dnsHostName" not in res[0])
683         self.assertEquals(str(res[0]["lastLogon"]), "z")
684         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
685         self.assertEquals(str(res[1]["dnsHostName"]), "z")
686         self.assertEquals(str(res[1]["lastLogon"]), "z")
687
688         # Search by negated conjunction of local attributes
689         res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
690                               attrs=["dnsHostName", "lastLogon"])
691         self.assertEquals(len(res), 6)
692         res = sorted(res, key=attrgetter('dn'))
693         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
694         self.assertTrue("dnsHostName" not in res[0])
695         self.assertEquals(str(res[0]["lastLogon"]), "x")
696         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
697         self.assertTrue("dnsHostName" not in res[1])
698         self.assertEquals(str(res[1]["lastLogon"]), "y")
699         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
700         self.assertTrue("dnsHostName" not in res[2])
701         self.assertEquals(str(res[2]["lastLogon"]), "z")
702         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
703         self.assertEquals(str(res[3]["dnsHostName"]), "z")
704         self.assertEquals(str(res[3]["lastLogon"]), "z")
705
706         # Search by negated conjunction of remote attributes
707         res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
708                               attrs=["dnsHostName", "lastLogon"])
709         self.assertEquals(len(res), 6)
710         res = sorted(res, key=attrgetter('dn'))
711         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
712         self.assertTrue("dnsHostName" not in res[0])
713         self.assertEquals(str(res[0]["lastLogon"]), "y")
714         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
715         self.assertTrue("dnsHostName" not in res[1])
716         self.assertEquals(str(res[1]["lastLogon"]), "z")
717         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
718         self.assertEquals(str(res[2]["dnsHostName"]), "y")
719         self.assertEquals(str(res[2]["lastLogon"]), "y")
720         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
721         self.assertEquals(str(res[3]["dnsHostName"]), "z")
722         self.assertEquals(str(res[3]["lastLogon"]), "z")
723
724         # Search by negated conjunction of local and remote attribute
725         res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
726                               attrs=["dnsHostName", "lastLogon"])
727         self.assertEquals(len(res), 6)
728         res = sorted(res, key=attrgetter('dn'))
729         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
730         self.assertTrue("dnsHostName" not in res[0])
731         self.assertEquals(str(res[0]["lastLogon"]), "x")
732         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
733         self.assertTrue("dnsHostName" not in res[1])
734         self.assertEquals(str(res[1]["lastLogon"]), "y")
735         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
736         self.assertTrue("dnsHostName" not in res[2])
737         self.assertEquals(str(res[2]["lastLogon"]), "z")
738         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
739         self.assertEquals(str(res[3]["dnsHostName"]), "z")
740         self.assertEquals(str(res[3]["lastLogon"]), "z")
741
742         # Search by negated disjunction of local attributes
743         res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
744                               attrs=["dnsHostName", "lastLogon"])
745         res = sorted(res, key=attrgetter('dn'))
746         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
747         self.assertTrue("dnsHostName" not in res[0])
748         self.assertEquals(str(res[0]["lastLogon"]), "x")
749         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
750         self.assertTrue("dnsHostName" not in res[1])
751         self.assertEquals(str(res[1]["lastLogon"]), "y")
752         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
753         self.assertTrue("dnsHostName" not in res[2])
754         self.assertEquals(str(res[2]["lastLogon"]), "z")
755         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
756         self.assertEquals(str(res[3]["dnsHostName"]), "z")
757         self.assertEquals(str(res[3]["lastLogon"]), "z")
758
759         # Search by negated disjunction of remote attributes
760         res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
761                               attrs=["dnsHostName", "lastLogon"])
762         self.assertEquals(len(res), 5)
763         res = sorted(res, key=attrgetter('dn'))
764         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
765         self.assertTrue("dnsHostName" not in res[0])
766         self.assertEquals(str(res[0]["lastLogon"]), "z")
767         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
768         self.assertEquals(str(res[1]["dnsHostName"]), "y")
769         self.assertEquals(str(res[1]["lastLogon"]), "y")
770         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
771         self.assertEquals(str(res[2]["dnsHostName"]), "z")
772         self.assertEquals(str(res[2]["lastLogon"]), "z")
773
774         # Search by negated disjunction of local and remote attribute
775         res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
776                               attrs=["dnsHostName", "lastLogon"])
777         self.assertEquals(len(res), 5)
778         res = sorted(res, key=attrgetter('dn'))
779         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
780         self.assertTrue("dnsHostName" not in res[0])
781         self.assertEquals(str(res[0]["lastLogon"]), "x")
782         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
783         self.assertTrue("dnsHostName" not in res[1])
784         self.assertEquals(str(res[1]["lastLogon"]), "z")
785         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
786         self.assertEquals(str(res[2]["dnsHostName"]), "z")
787         self.assertEquals(str(res[2]["lastLogon"]), "z")
788
789         # Search by complex parse tree
790         res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
791         self.assertEquals(len(res), 7)
792         res = sorted(res, key=attrgetter('dn'))
793         self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
794         self.assertTrue("dnsHostName" not in res[0])
795         self.assertEquals(str(res[0]["lastLogon"]), "x")
796         self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
797         self.assertTrue("dnsHostName" not in res[1])
798         self.assertEquals(str(res[1]["lastLogon"]), "y")
799         self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
800         self.assertTrue("dnsHostName" not in res[2])
801         self.assertEquals(str(res[2]["lastLogon"]), "z")
802         self.assertEquals(str(res[3].dn), self.samba4.dn("cn=X"))
803         self.assertEquals(str(res[3]["dnsHostName"]), "x")
804         self.assertEquals(str(res[3]["lastLogon"]), "x")
805         self.assertEquals(str(res[4].dn), self.samba4.dn("cn=Z"))
806         self.assertEquals(str(res[4]["dnsHostName"]), "z")
807         self.assertEquals(str(res[4]["lastLogon"]), "z")
808
809         # Clean up
810         dns = [self.samba4.dn("cn=%s" % n) for n in ["A", "B", "C", "X", "Y", "Z"]]
811         for dn in dns:
812             self.ldb.delete(dn)
813
814     def test_map_modify_local(self):
815         """Modification of local records."""
816         # Add local record
817         dn = "cn=test,dc=idealx,dc=org"
818         self.ldb.add({"dn": dn,
819                       "cn": "test",
820                       "foo": "bar",
821                       "revision": "1",
822                       "description": "test"})
823         # Check it's there
824         attrs = ["foo", "revision", "description"]
825         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
826         self.assertEquals(len(res), 1)
827         self.assertEquals(str(res[0].dn), dn)
828         self.assertEquals(str(res[0]["foo"]), "bar")
829         self.assertEquals(str(res[0]["revision"]), "1")
830         self.assertEquals(str(res[0]["description"]), "test")
831         # Check it's not in the local db
832         res = self.samba4.db.search(expression="(cn=test)",
833                                     scope=SCOPE_DEFAULT, attrs=attrs)
834         self.assertEquals(len(res), 0)
835         # Check it's not in the remote db
836         res = self.samba3.db.search(expression="(cn=test)",
837                                     scope=SCOPE_DEFAULT, attrs=attrs)
838         self.assertEquals(len(res), 0)
839
840         # Modify local record
841         ldif = """
842 dn: """ + dn + """
843 replace: foo
844 foo: baz
845 replace: description
846 description: foo
847 """
848         self.ldb.modify_ldif(ldif)
849         # Check in local db
850         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
851         self.assertEquals(len(res), 1)
852         self.assertEquals(str(res[0].dn), dn)
853         self.assertEquals(str(res[0]["foo"]), "baz")
854         self.assertEquals(str(res[0]["revision"]), "1")
855         self.assertEquals(str(res[0]["description"]), "foo")
856
857         # Rename local record
858         dn2 = "cn=toast,dc=idealx,dc=org"
859         self.ldb.rename(dn, dn2)
860         # Check in local db
861         res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
862         self.assertEquals(len(res), 1)
863         self.assertEquals(str(res[0].dn), dn2)
864         self.assertEquals(str(res[0]["foo"]), "baz")
865         self.assertEquals(str(res[0]["revision"]), "1")
866         self.assertEquals(str(res[0]["description"]), "foo")
867
868         # Delete local record
869         self.ldb.delete(dn2)
870         # Check it's gone
871         res = self.ldb.search(dn2, scope=SCOPE_BASE)
872         self.assertEquals(len(res), 0)
873
874     def test_map_modify_remote_remote(self):
875         """Modification of remote data of remote records"""
876         # Add remote record
877         dn = self.samba4.dn("cn=test")
878         dn2 = self.samba3.dn("cn=test")
879         self.samba3.db.add({"dn": dn2,
880                             "cn": "test",
881                             "description": "foo",
882                             "sambaBadPasswordCount": "3",
883                             "sambaNextRid": "1001"})
884         # Check it's there
885         res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
886                                     attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
887         self.assertEquals(len(res), 1)
888         self.assertEquals(str(res[0].dn), dn2)
889         self.assertEquals(str(res[0]["description"]), "foo")
890         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
891         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
892         # Check in mapped db
893         attrs = ["description", "badPwdCount", "nextRid"]
894         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
895         self.assertEquals(len(res), 1)
896         self.assertEquals(str(res[0].dn), dn)
897         self.assertEquals(str(res[0]["description"]), "foo")
898         self.assertEquals(str(res[0]["badPwdCount"]), "3")
899         self.assertEquals(str(res[0]["nextRid"]), "1001")
900         # Check in local db
901         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
902         self.assertEquals(len(res), 0)
903
904         # Modify remote data of remote record
905         ldif = """
906 dn: """ + dn + """
907 replace: description
908 description: test
909 replace: badPwdCount
910 badPwdCount: 4
911 """
912         self.ldb.modify_ldif(ldif)
913         # Check in mapped db
914         res = self.ldb.search(dn, scope=SCOPE_BASE,
915                               attrs=["description", "badPwdCount", "nextRid"])
916         self.assertEquals(len(res), 1)
917         self.assertEquals(str(res[0].dn), dn)
918         self.assertEquals(str(res[0]["description"]), "test")
919         self.assertEquals(str(res[0]["badPwdCount"]), "4")
920         self.assertEquals(str(res[0]["nextRid"]), "1001")
921         # Check in remote db
922         res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
923                                     attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
924         self.assertEquals(len(res), 1)
925         self.assertEquals(str(res[0].dn), dn2)
926         self.assertEquals(str(res[0]["description"]), "test")
927         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
928         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
929
930         # Rename remote record
931         dn2 = self.samba4.dn("cn=toast")
932         self.ldb.rename(dn, dn2)
933         # Check in mapped db
934         dn = dn2
935         res = self.ldb.search(dn, scope=SCOPE_BASE,
936                               attrs=["description", "badPwdCount", "nextRid"])
937         self.assertEquals(len(res), 1)
938         self.assertEquals(str(res[0].dn), dn)
939         self.assertEquals(str(res[0]["description"]), "test")
940         self.assertEquals(str(res[0]["badPwdCount"]), "4")
941         self.assertEquals(str(res[0]["nextRid"]), "1001")
942         # Check in remote db
943         dn2 = self.samba3.dn("cn=toast")
944         res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
945                                     attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
946         self.assertEquals(len(res), 1)
947         self.assertEquals(str(res[0].dn), dn2)
948         self.assertEquals(str(res[0]["description"]), "test")
949         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
950         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
951
952         # Delete remote record
953         self.ldb.delete(dn)
954         # Check in mapped db that it's removed
955         res = self.ldb.search(dn, scope=SCOPE_BASE)
956         self.assertEquals(len(res), 0)
957         # Check in remote db
958         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
959         self.assertEquals(len(res), 0)
960
961     def test_map_modify_remote_local(self):
962         """Modification of local data of remote records"""
963         # Add remote record (same as before)
964         dn = self.samba4.dn("cn=test")
965         dn2 = self.samba3.dn("cn=test")
966         self.samba3.db.add({"dn": dn2,
967                             "cn": "test",
968                             "description": "foo",
969                             "sambaBadPasswordCount": "3",
970                             "sambaNextRid": "1001"})
971
972         # Modify local data of remote record
973         ldif = """
974 dn: """ + dn + """
975 add: revision
976 revision: 1
977 replace: description
978 description: test
979
980 """
981         self.ldb.modify_ldif(ldif)
982         # Check in mapped db
983         attrs = ["revision", "description"]
984         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
985         self.assertEquals(len(res), 1)
986         self.assertEquals(str(res[0].dn), dn)
987         self.assertEquals(str(res[0]["description"]), "test")
988         self.assertEquals(str(res[0]["revision"]), "1")
989         # Check in remote db
990         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
991         self.assertEquals(len(res), 1)
992         self.assertEquals(str(res[0].dn), dn2)
993         self.assertEquals(str(res[0]["description"]), "test")
994         self.assertTrue("revision" not in res[0])
995         # Check in local db
996         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
997         self.assertEquals(len(res), 1)
998         self.assertEquals(str(res[0].dn), dn)
999         self.assertTrue("description" not in res[0])
1000         self.assertEquals(str(res[0]["revision"]), "1")
1001
1002         # Delete (newly) split record
1003         self.ldb.delete(dn)
1004
1005     def test_map_modify_split(self):
1006         """Testing modification of split records"""
1007         # Add split record
1008         dn = self.samba4.dn("cn=test")
1009         dn2 = self.samba3.dn("cn=test")
1010         self.ldb.add({
1011             "dn": dn,
1012             "cn": "test",
1013             "description": "foo",
1014             "badPwdCount": "3",
1015             "nextRid": "1001",
1016             "revision": "1"})
1017         # Check it's there
1018         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1019         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1020         self.assertEquals(len(res), 1)
1021         self.assertEquals(str(res[0].dn), dn)
1022         self.assertEquals(str(res[0]["description"]), "foo")
1023         self.assertEquals(str(res[0]["badPwdCount"]), "3")
1024         self.assertEquals(str(res[0]["nextRid"]), "1001")
1025         self.assertEquals(str(res[0]["revision"]), "1")
1026         # Check in local db
1027         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1028         self.assertEquals(len(res), 1)
1029         self.assertEquals(str(res[0].dn), dn)
1030         self.assertTrue("description" not in res[0])
1031         self.assertTrue("badPwdCount" not in res[0])
1032         self.assertTrue("nextRid" not in res[0])
1033         self.assertEquals(str(res[0]["revision"]), "1")
1034         # Check in remote db
1035         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1036                  "revision"]
1037         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1038         self.assertEquals(len(res), 1)
1039         self.assertEquals(str(res[0].dn), dn2)
1040         self.assertEquals(str(res[0]["description"]), "foo")
1041         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1042         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1043         self.assertTrue("revision" not in res[0])
1044
1045         # Modify of split record
1046         ldif = """
1047 dn: """ + dn + """
1048 replace: description
1049 description: test
1050 replace: badPwdCount
1051 badPwdCount: 4
1052 replace: revision
1053 revision: 2
1054 """
1055         self.ldb.modify_ldif(ldif)
1056         # Check in mapped db
1057         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1058         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1059         self.assertEquals(len(res), 1)
1060         self.assertEquals(str(res[0].dn), dn)
1061         self.assertEquals(str(res[0]["description"]), "test")
1062         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1063         self.assertEquals(str(res[0]["nextRid"]), "1001")
1064         self.assertEquals(str(res[0]["revision"]), "2")
1065         # Check in local db
1066         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1067         self.assertEquals(len(res), 1)
1068         self.assertEquals(str(res[0].dn), dn)
1069         self.assertTrue("description" not in res[0])
1070         self.assertTrue("badPwdCount" not in res[0])
1071         self.assertTrue("nextRid" not in res[0])
1072         self.assertEquals(str(res[0]["revision"]), "2")
1073         # Check in remote db
1074         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1075                  "revision"]
1076         res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1077         self.assertEquals(len(res), 1)
1078         self.assertEquals(str(res[0].dn), dn2)
1079         self.assertEquals(str(res[0]["description"]), "test")
1080         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1081         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1082         self.assertTrue("revision" not in res[0])
1083
1084         # Rename split record
1085         dn2 = self.samba4.dn("cn=toast")
1086         self.ldb.rename(dn, dn2)
1087         # Check in mapped db
1088         dn = dn2
1089         attrs = ["description", "badPwdCount", "nextRid", "revision"]
1090         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1091         self.assertEquals(len(res), 1)
1092         self.assertEquals(str(res[0].dn), dn)
1093         self.assertEquals(str(res[0]["description"]), "test")
1094         self.assertEquals(str(res[0]["badPwdCount"]), "4")
1095         self.assertEquals(str(res[0]["nextRid"]), "1001")
1096         self.assertEquals(str(res[0]["revision"]), "2")
1097         # Check in local db
1098         res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1099         self.assertEquals(len(res), 1)
1100         self.assertEquals(str(res[0].dn), dn)
1101         self.assertTrue("description" not in res[0])
1102         self.assertTrue("badPwdCount" not in res[0])
1103         self.assertTrue("nextRid" not in res[0])
1104         self.assertEquals(str(res[0]["revision"]), "2")
1105         # Check in remote db
1106         dn2 = self.samba3.dn("cn=toast")
1107         res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1108                                     attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1109                                            "revision"])
1110         self.assertEquals(len(res), 1)
1111         self.assertEquals(str(res[0].dn), dn2)
1112         self.assertEquals(str(res[0]["description"]), "test")
1113         self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1114         self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1115         self.assertTrue("revision" not in res[0])
1116
1117         # Delete split record
1118         self.ldb.delete(dn)
1119         # Check in mapped db
1120         res = self.ldb.search(dn, scope=SCOPE_BASE)
1121         self.assertEquals(len(res), 0)
1122         # Check in local db
1123         res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1124         self.assertEquals(len(res), 0)
1125         # Check in remote db
1126         res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1127         self.assertEquals(len(res), 0)