1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
3 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
5 # This is a Python port of the original in testprogs/ejs/samba3sam.js
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import print_function
22 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
26 from ldb import SCOPE_DEFAULT, SCOPE_BASE
27 from samba import Ldb, substitute_var
28 from samba.tests import TestCaseInTempDir, env_loadparm
29 import samba.dcerpc.security
31 from samba.auth import system_session
32 from operator import attrgetter
35 def read_datafile(filename):
36 paths = [ "../../../../../testdata/samba3",
37 "../../../../testdata/samba3" ]
39 datadir = os.path.join(os.path.dirname(__file__), p)
40 if os.path.exists(datadir):
42 return open(os.path.join(datadir, filename), 'r').read()
44 def ldb_debug(l, text):
48 class MapBaseTestCase(TestCaseInTempDir):
49 """Base test case for mapping tests."""
51 def setup_modules(self, ldb, s3, s4):
52 ldb.add({"dn": "@MAP=samba3sam",
54 "@TO": "sambaDomainName=TESTS," + s3.basedn})
56 ldb.add({"dn": "@MODULES",
57 "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,show_deleted_ignore,dsdb_flags_ignore,partition"})
59 ldb.add({"dn": "@PARTITION",
60 "partition": ["%s" % (s4.basedn_casefold),
61 "%s" % (s3.basedn_casefold)],
62 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
66 self.lp = env_loadparm()
67 self.lp.set("workgroup", "TESTS")
68 self.lp.set("netbios name", "TESTS")
69 super(MapBaseTestCase, self).setUp()
71 def make_dn(basedn, rdn):
72 return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
74 def make_s4dn(basedn, rdn):
75 return "%s,%s" % (rdn, basedn)
77 self.ldbfile = os.path.join(self.tempdir, "sam.ldb")
78 self.ldburl = "tdb://" + self.ldbfile
80 tempdir = self.tempdir
83 """Simple helper class that contains data for a specific SAM
86 def __init__(self, basedn, dn, lp):
87 self.db = Ldb(lp=lp, session_info=system_session())
88 self.db.set_opaque("skip_allocate_sids", "true");
90 self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold()
91 self.substvars = {"BASEDN": self.basedn}
92 self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold)
93 self.url = "tdb://" + self.file
97 return self._dn(self.basedn, rdn)
100 return self.db.connect(self.url)
102 def setup_data(self, path):
103 self.add_ldif(read_datafile(path))
105 def subst(self, text):
106 return substitute_var(text, self.substvars)
108 def add_ldif(self, ldif):
109 self.db.add_ldif(self.subst(ldif))
111 def modify_ldif(self, ldif):
112 self.db.modify_ldif(self.subst(ldif))
114 self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn, self.lp)
115 self.samba3 = Target("cn=Samba3Sam", make_dn, self.lp)
117 self.samba3.connect()
118 self.samba4.connect()
121 os.unlink(self.ldbfile)
122 os.unlink(self.samba3.file)
123 os.unlink(self.samba4.file)
124 pdir = "%s.d" % self.ldbfile
125 mdata = os.path.join(pdir, "metadata.tdb")
126 if os.path.exists(mdata):
129 super(MapBaseTestCase, self).tearDown()
131 def assertSidEquals(self, text, ndr_sid):
132 sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
134 sid_obj2 = samba.dcerpc.security.dom_sid(text)
135 self.assertEquals(sid_obj1, sid_obj2)
138 class Samba3SamTestCase(MapBaseTestCase):
141 super(Samba3SamTestCase, self).setUp()
142 ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
143 ldb.set_opaque("skip_allocate_sids", "true");
144 self.samba3.setup_data("samba3.ldif")
145 ldif = read_datafile("provision_samba3sam.ldif")
146 ldb.add_ldif(self.samba4.subst(ldif))
147 self.setup_modules(ldb, self.samba3, self.samba4)
149 self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
150 self.ldb.set_opaque("skip_allocate_sids", "true");
152 def test_search_non_mapped(self):
153 """Looking up by non-mapped attribute"""
154 msg = self.ldb.search(expression="(cn=Administrator)")
155 self.assertEquals(len(msg), 1)
156 self.assertEquals(msg[0]["cn"], "Administrator")
158 def test_search_non_mapped(self):
159 """Looking up by mapped attribute"""
160 msg = self.ldb.search(expression="(name=Backup Operators)")
161 self.assertEquals(len(msg), 1)
162 self.assertEquals(str(msg[0]["name"]), "Backup Operators")
164 def test_old_name_of_renamed(self):
165 """Looking up by old name of renamed attribute"""
166 msg = self.ldb.search(expression="(displayName=Backup Operators)")
167 self.assertEquals(len(msg), 0)
169 def test_mapped_containing_sid(self):
170 """Looking up mapped entry containing SID"""
171 msg = self.ldb.search(expression="(cn=Replicator)")
172 self.assertEquals(len(msg), 1)
173 self.assertEquals(str(msg[0].dn),
174 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
175 self.assertTrue("objectSid" in msg[0])
176 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
178 oc = set(msg[0]["objectClass"])
179 self.assertEquals(oc, set(["group"]))
181 def test_search_by_objclass(self):
182 """Looking up by objectClass"""
183 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
184 self.assertEquals(set([str(m.dn) for m in msg]),
185 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
186 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
188 def test_s3sam_modify(self):
189 # Adding a record that will be fallbacked
195 "showInAdvancedViewOnly": "TRUE"})
197 # Checking for existence of record (local)
198 # TODO: This record must be searched in the local database, which is
199 # currently only supported for base searches
200 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
201 # TODO: Actually, this version should work as well but doesn't...
204 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
206 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
207 self.assertEquals(len(msg), 1)
208 self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
209 self.assertEquals(str(msg[0]["foo"]), "bar")
210 self.assertEquals(str(msg[0]["blah"]), "Blie")
212 # Adding record that will be mapped
213 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
214 "objectClass": "user",
216 "sambaUnicodePwd": "geheim",
219 # Checking for existence of record (remote)
220 msg = self.ldb.search(expression="(unixName=bin)",
221 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
222 self.assertEquals(len(msg), 1)
223 self.assertEquals(str(msg[0]["cn"]), "Niemand")
224 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
226 # Checking for existence of record (local && remote)
227 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
228 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
229 self.assertEquals(len(msg), 1) # TODO: should check with more records
230 self.assertEquals(str(msg[0]["cn"]), "Niemand")
231 self.assertEquals(str(msg[0]["unixName"]), "bin")
232 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
234 # Checking for existence of record (local || remote)
235 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
236 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
237 #print "got %d replies" % len(msg)
238 self.assertEquals(len(msg), 1) # TODO: should check with more records
239 self.assertEquals(str(msg[0]["cn"]), "Niemand")
240 self.assertEquals(str(msg[0]["unixName"]), "bin")
241 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
243 # Checking for data in destination database
244 msg = self.samba3.db.search(expression="(cn=Niemand)")
245 self.assertTrue(len(msg) >= 1)
246 self.assertEquals(str(msg[0]["sambaSID"]),
247 "S-1-5-21-4231626423-2410014848-2360679739-2001")
248 self.assertEquals(str(msg[0]["displayName"]), "Niemand")
250 # Adding attribute...
251 self.ldb.modify_ldif("""
252 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
258 # Checking whether changes are still there...
259 msg = self.ldb.search(expression="(cn=Niemand)")
260 self.assertTrue(len(msg) >= 1)
261 self.assertEquals(str(msg[0]["cn"]), "Niemand")
262 self.assertEquals(str(msg[0]["description"]), "Blah")
264 # Modifying attribute...
265 self.ldb.modify_ldif("""
266 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
272 # Checking whether changes are still there...
273 msg = self.ldb.search(expression="(cn=Niemand)")
274 self.assertTrue(len(msg) >= 1)
275 self.assertEquals(str(msg[0]["description"]), "Blie")
277 # Deleting attribute...
278 self.ldb.modify_ldif("""
279 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
284 # Checking whether changes are no longer there...
285 msg = self.ldb.search(expression="(cn=Niemand)")
286 self.assertTrue(len(msg) >= 1)
287 self.assertTrue(not "description" in msg[0])
290 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
291 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
293 # Checking whether DN has changed...
294 msg = self.ldb.search(expression="(cn=Niemand2)")
295 self.assertEquals(len(msg), 1)
296 self.assertEquals(str(msg[0].dn),
297 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
300 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
302 # Checking whether record is gone...
303 msg = self.ldb.search(expression="(cn=Niemand2)")
304 self.assertEquals(len(msg), 0)
307 class MapTestCase(MapBaseTestCase):
310 super(MapTestCase, self).setUp()
311 ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
312 ldb.set_opaque("skip_allocate_sids", "true");
313 ldif = read_datafile("provision_samba3sam.ldif")
314 ldb.add_ldif(self.samba4.subst(ldif))
315 self.setup_modules(ldb, self.samba3, self.samba4)
317 self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
318 self.ldb.set_opaque("skip_allocate_sids", "true");
320 def test_map_search(self):
321 """Running search tests on mapped data."""
323 "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
324 "objectclass": ["sambaDomain", "top"],
325 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
326 "sambaNextRid": "2000",
327 "sambaDomainName": "TESTS"
330 # Add a set of split records
331 self.ldb.add_ldif("""
332 dn: """+ self.samba4.dn("cn=Domain Users") + """
335 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
338 # Add a set of split records
339 self.ldb.add_ldif("""
340 dn: """+ self.samba4.dn("cn=X") + """
349 objectSid: S-1-5-21-4231626423-2410014848-2360679739-1052
353 "dn": self.samba4.dn("cn=Y"),
354 "objectClass": "top",
364 "dn": self.samba4.dn("cn=Z"),
365 "objectClass": "top",
374 # Add a set of remote records
377 "dn": self.samba3.dn("cn=A"),
378 "objectClass": "posixAccount",
381 "sambaBadPasswordCount": "x",
382 "sambaLogonTime": "x",
384 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-1052",
385 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
388 "dn": self.samba3.dn("cn=B"),
389 "objectClass": "top",
392 "sambaBadPasswordCount": "x",
393 "sambaLogonTime": "y",
397 "dn": self.samba3.dn("cn=C"),
398 "objectClass": "top",
401 "sambaBadPasswordCount": "y",
402 "sambaLogonTime": "z",
405 # Testing search by DN
407 # Search remote record by local DN
408 dn = self.samba4.dn("cn=A")
409 res = self.ldb.search(dn, scope=SCOPE_BASE,
410 attrs=["dnsHostName", "lastLogon"])
411 self.assertEquals(len(res), 1)
412 self.assertEquals(str(res[0].dn), dn)
413 self.assertTrue(not "dnsHostName" in res[0])
414 self.assertEquals(str(res[0]["lastLogon"]), "x")
416 # Search remote record by remote DN
417 dn = self.samba3.dn("cn=A")
418 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
419 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
420 self.assertEquals(len(res), 1)
421 self.assertEquals(str(res[0].dn), dn)
422 self.assertTrue(not "dnsHostName" in res[0])
423 self.assertTrue(not "lastLogon" in res[0])
424 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
426 # Search split record by local DN
427 dn = self.samba4.dn("cn=X")
428 res = self.ldb.search(dn, scope=SCOPE_BASE,
429 attrs=["dnsHostName", "lastLogon"])
430 self.assertEquals(len(res), 1)
431 self.assertEquals(str(res[0].dn), dn)
432 self.assertEquals(str(res[0]["dnsHostName"]), "x")
433 self.assertEquals(str(res[0]["lastLogon"]), "x")
435 # Search split record by remote DN
436 dn = self.samba3.dn("cn=X")
437 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
438 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
439 self.assertEquals(len(res), 1)
440 self.assertEquals(str(res[0].dn), dn)
441 self.assertTrue(not "dnsHostName" in res[0])
442 self.assertTrue(not "lastLogon" in res[0])
443 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
445 # Testing search by attribute
447 # Search by ignored attribute
448 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
449 attrs=["dnsHostName", "lastLogon"])
450 self.assertEquals(len(res), 2)
451 res = sorted(res, key=attrgetter('dn'))
452 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
453 self.assertEquals(str(res[0]["dnsHostName"]), "x")
454 self.assertEquals(str(res[0]["lastLogon"]), "x")
455 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
456 self.assertEquals(str(res[1]["dnsHostName"]), "y")
457 self.assertEquals(str(res[1]["lastLogon"]), "y")
459 # Search by kept attribute
460 res = self.ldb.search(expression="(description=y)",
461 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
462 self.assertEquals(len(res), 2)
463 res = sorted(res, key=attrgetter('dn'))
464 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
465 self.assertTrue(not "dnsHostName" in res[0])
466 self.assertEquals(str(res[0]["lastLogon"]), "z")
467 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
468 self.assertEquals(str(res[1]["dnsHostName"]), "z")
469 self.assertEquals(str(res[1]["lastLogon"]), "z")
471 # Search by renamed attribute
472 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
473 attrs=["dnsHostName", "lastLogon"])
474 self.assertEquals(len(res), 2)
475 res = sorted(res, key=attrgetter('dn'))
476 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
477 self.assertTrue(not "dnsHostName" in res[0])
478 self.assertEquals(str(res[0]["lastLogon"]), "x")
479 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
480 self.assertTrue(not "dnsHostName" in res[1])
481 self.assertEquals(str(res[1]["lastLogon"]), "y")
483 # Search by converted attribute
485 # Using the SID directly in the parse tree leads to conversion
486 # errors, letting the search fail with no results.
487 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-1052)", scope=SCOPE_DEFAULT, attrs)
488 res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
489 self.assertEquals(len(res), 4)
490 res = sorted(res, key=attrgetter('dn'))
491 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
492 self.assertEquals(str(res[1]["dnsHostName"]), "x")
493 self.assertEquals(str(res[1]["lastLogon"]), "x")
494 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
496 self.assertTrue("objectSid" in res[1])
497 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
498 self.assertTrue(not "dnsHostName" in res[0])
499 self.assertEquals(str(res[0]["lastLogon"]), "x")
500 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
502 self.assertTrue("objectSid" in res[0])
504 # Search by generated attribute
505 # In most cases, this even works when the mapping is missing
506 # a `convert_operator' by enumerating the remote db.
507 res = self.ldb.search(expression="(primaryGroupID=512)",
508 attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
509 self.assertEquals(len(res), 1)
510 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
511 self.assertTrue(not "dnsHostName" in res[0])
512 self.assertEquals(str(res[0]["lastLogon"]), "x")
513 self.assertEquals(str(res[0]["primaryGroupID"]), "512")
515 # Note that Xs "objectSid" seems to be fine in the previous search for
517 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
518 #print len(res) + " results found"
519 #for i in range(len(res)):
520 # for (obj in res[i]) {
521 # print obj + ": " + res[i][obj]
526 # Search by remote name of renamed attribute */
527 res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
528 attrs=["dnsHostName", "lastLogon"])
529 self.assertEquals(len(res), 0)
531 # Search by objectClass
532 attrs = ["dnsHostName", "lastLogon", "objectClass"]
533 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
534 self.assertEquals(len(res), 2)
535 res = sorted(res, key=attrgetter('dn'))
536 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
537 self.assertTrue(not "dnsHostName" in res[0])
538 self.assertEquals(str(res[0]["lastLogon"]), "x")
539 self.assertEquals(str(res[0]["objectClass"][0]), "user")
540 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
541 self.assertEquals(str(res[1]["dnsHostName"]), "x")
542 self.assertEquals(str(res[1]["lastLogon"]), "x")
543 self.assertEquals(str(res[1]["objectClass"][0]), "user")
545 # Prove that the objectClass is actually used for the search
546 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
548 self.assertEquals(len(res), 3)
549 res = sorted(res, key=attrgetter('dn'))
550 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
551 self.assertTrue(not "dnsHostName" in res[0])
552 self.assertEquals(str(res[0]["lastLogon"]), "x")
553 self.assertEquals(res[0]["objectClass"][0], "user")
554 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
555 self.assertTrue(not "dnsHostName" in res[1])
556 self.assertEquals(str(res[1]["lastLogon"]), "y")
557 self.assertEquals(set(res[1]["objectClass"]), set(["top"]))
558 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
559 self.assertEquals(str(res[2]["dnsHostName"]), "x")
560 self.assertEquals(str(res[2]["lastLogon"]), "x")
561 self.assertEquals(str(res[2]["objectClass"][0]), "user")
563 # Testing search by parse tree
565 # Search by conjunction of local attributes
566 res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
567 attrs=["dnsHostName", "lastLogon"])
568 self.assertEquals(len(res), 2)
569 res = sorted(res, key=attrgetter('dn'))
570 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
571 self.assertEquals(str(res[0]["dnsHostName"]), "x")
572 self.assertEquals(str(res[0]["lastLogon"]), "x")
573 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
574 self.assertEquals(str(res[1]["dnsHostName"]), "y")
575 self.assertEquals(str(res[1]["lastLogon"]), "y")
577 # Search by conjunction of remote attributes
578 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
579 attrs=["dnsHostName", "lastLogon"])
580 self.assertEquals(len(res), 2)
581 res = sorted(res, key=attrgetter('dn'))
582 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
583 self.assertTrue(not "dnsHostName" in res[0])
584 self.assertEquals(str(res[0]["lastLogon"]), "x")
585 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
586 self.assertEquals(str(res[1]["dnsHostName"]), "x")
587 self.assertEquals(str(res[1]["lastLogon"]), "x")
589 # Search by conjunction of local and remote attribute
590 res = self.ldb.search(expression="(&(codePage=x)(description=x))",
591 attrs=["dnsHostName", "lastLogon"])
592 self.assertEquals(len(res), 2)
593 res = sorted(res, key=attrgetter('dn'))
594 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
595 self.assertEquals(str(res[0]["dnsHostName"]), "x")
596 self.assertEquals(str(res[0]["lastLogon"]), "x")
597 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
598 self.assertEquals(str(res[1]["dnsHostName"]), "y")
599 self.assertEquals(str(res[1]["lastLogon"]), "y")
601 # Search by conjunction of local and remote attribute w/o match
602 attrs = ["dnsHostName", "lastLogon"]
603 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
605 self.assertEquals(len(res), 0)
606 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
608 self.assertEquals(len(res), 0)
610 # Search by disjunction of local attributes
611 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
612 attrs=["dnsHostName", "lastLogon"])
613 self.assertEquals(len(res), 2)
614 res = sorted(res, key=attrgetter('dn'))
615 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
616 self.assertEquals(str(res[0]["dnsHostName"]), "x")
617 self.assertEquals(str(res[0]["lastLogon"]), "x")
618 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
619 self.assertEquals(str(res[1]["dnsHostName"]), "y")
620 self.assertEquals(str(res[1]["lastLogon"]), "y")
622 # Search by disjunction of remote attributes
623 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
624 attrs=["dnsHostName", "lastLogon"])
625 self.assertEquals(len(res), 3)
626 res = sorted(res, key=attrgetter('dn'))
627 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
628 self.assertFalse("dnsHostName" in res[0])
629 self.assertEquals(str(res[0]["lastLogon"]), "x")
630 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
631 self.assertFalse("dnsHostName" in res[1])
632 self.assertEquals(str(res[1]["lastLogon"]), "y")
633 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
634 self.assertEquals(str(res[2]["dnsHostName"]), "x")
635 self.assertEquals(str(res[2]["lastLogon"]), "x")
637 # Search by disjunction of local and remote attribute
638 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
639 attrs=["dnsHostName", "lastLogon"])
640 self.assertEquals(len(res), 3)
641 res = sorted(res, key=attrgetter('dn'))
642 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
643 self.assertFalse("dnsHostName" in res[0])
644 self.assertEquals(str(res[0]["lastLogon"]), "y")
645 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
646 self.assertEquals(str(res[1]["dnsHostName"]), "x")
647 self.assertEquals(str(res[1]["lastLogon"]), "x")
648 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
649 self.assertEquals(str(res[2]["dnsHostName"]), "y")
650 self.assertEquals(str(res[2]["lastLogon"]), "y")
652 # Search by disjunction of local and remote attribute w/o match
653 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
654 attrs=["dnsHostName", "lastLogon"])
655 self.assertEquals(len(res), 0)
657 # Search by negated local attribute
658 res = self.ldb.search(expression="(!(revision=x))",
659 attrs=["dnsHostName", "lastLogon"])
660 self.assertEquals(len(res), 6)
661 res = sorted(res, key=attrgetter('dn'))
662 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
663 self.assertTrue(not "dnsHostName" in res[0])
664 self.assertEquals(str(res[0]["lastLogon"]), "x")
665 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
666 self.assertTrue(not "dnsHostName" in res[1])
667 self.assertEquals(str(res[1]["lastLogon"]), "y")
668 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
669 self.assertTrue(not "dnsHostName" in res[2])
670 self.assertEquals(str(res[2]["lastLogon"]), "z")
671 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
672 self.assertEquals(str(res[3]["dnsHostName"]), "z")
673 self.assertEquals(str(res[3]["lastLogon"]), "z")
675 # Search by negated remote attribute
676 res = self.ldb.search(expression="(!(description=x))",
677 attrs=["dnsHostName", "lastLogon"])
678 self.assertEquals(len(res), 4)
679 res = sorted(res, key=attrgetter('dn'))
680 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
681 self.assertTrue(not "dnsHostName" in res[0])
682 self.assertEquals(str(res[0]["lastLogon"]), "z")
683 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
684 self.assertEquals(str(res[1]["dnsHostName"]), "z")
685 self.assertEquals(str(res[1]["lastLogon"]), "z")
687 # Search by negated conjunction of local attributes
688 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
689 attrs=["dnsHostName", "lastLogon"])
690 self.assertEquals(len(res), 6)
691 res = sorted(res, key=attrgetter('dn'))
692 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
693 self.assertTrue(not "dnsHostName" in res[0])
694 self.assertEquals(str(res[0]["lastLogon"]), "x")
695 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
696 self.assertTrue(not "dnsHostName" in res[1])
697 self.assertEquals(str(res[1]["lastLogon"]), "y")
698 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
699 self.assertTrue(not "dnsHostName" in res[2])
700 self.assertEquals(str(res[2]["lastLogon"]), "z")
701 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
702 self.assertEquals(str(res[3]["dnsHostName"]), "z")
703 self.assertEquals(str(res[3]["lastLogon"]), "z")
705 # Search by negated conjunction of remote attributes
706 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
707 attrs=["dnsHostName", "lastLogon"])
708 self.assertEquals(len(res), 6)
709 res = sorted(res, key=attrgetter('dn'))
710 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
711 self.assertTrue(not "dnsHostName" in res[0])
712 self.assertEquals(str(res[0]["lastLogon"]), "y")
713 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
714 self.assertTrue(not "dnsHostName" in res[1])
715 self.assertEquals(str(res[1]["lastLogon"]), "z")
716 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
717 self.assertEquals(str(res[2]["dnsHostName"]), "y")
718 self.assertEquals(str(res[2]["lastLogon"]), "y")
719 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
720 self.assertEquals(str(res[3]["dnsHostName"]), "z")
721 self.assertEquals(str(res[3]["lastLogon"]), "z")
723 # Search by negated conjunction of local and remote attribute
724 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
725 attrs=["dnsHostName", "lastLogon"])
726 self.assertEquals(len(res), 6)
727 res = sorted(res, key=attrgetter('dn'))
728 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
729 self.assertTrue(not "dnsHostName" in res[0])
730 self.assertEquals(str(res[0]["lastLogon"]), "x")
731 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
732 self.assertTrue(not "dnsHostName" in res[1])
733 self.assertEquals(str(res[1]["lastLogon"]), "y")
734 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
735 self.assertTrue(not "dnsHostName" in res[2])
736 self.assertEquals(str(res[2]["lastLogon"]), "z")
737 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
738 self.assertEquals(str(res[3]["dnsHostName"]), "z")
739 self.assertEquals(str(res[3]["lastLogon"]), "z")
741 # Search by negated disjunction of local attributes
742 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
743 attrs=["dnsHostName", "lastLogon"])
744 res = sorted(res, key=attrgetter('dn'))
745 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
746 self.assertTrue(not "dnsHostName" in res[0])
747 self.assertEquals(str(res[0]["lastLogon"]), "x")
748 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
749 self.assertTrue(not "dnsHostName" in res[1])
750 self.assertEquals(str(res[1]["lastLogon"]), "y")
751 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
752 self.assertTrue(not "dnsHostName" in res[2])
753 self.assertEquals(str(res[2]["lastLogon"]), "z")
754 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
755 self.assertEquals(str(res[3]["dnsHostName"]), "z")
756 self.assertEquals(str(res[3]["lastLogon"]), "z")
758 # Search by negated disjunction of remote attributes
759 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
760 attrs=["dnsHostName", "lastLogon"])
761 self.assertEquals(len(res), 5)
762 res = sorted(res, key=attrgetter('dn'))
763 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
764 self.assertTrue(not "dnsHostName" in res[0])
765 self.assertEquals(str(res[0]["lastLogon"]), "z")
766 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
767 self.assertEquals(str(res[1]["dnsHostName"]), "y")
768 self.assertEquals(str(res[1]["lastLogon"]), "y")
769 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
770 self.assertEquals(str(res[2]["dnsHostName"]), "z")
771 self.assertEquals(str(res[2]["lastLogon"]), "z")
773 # Search by negated disjunction of local and remote attribute
774 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
775 attrs=["dnsHostName", "lastLogon"])
776 self.assertEquals(len(res), 5)
777 res = sorted(res, key=attrgetter('dn'))
778 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
779 self.assertTrue(not "dnsHostName" in res[0])
780 self.assertEquals(str(res[0]["lastLogon"]), "x")
781 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
782 self.assertTrue(not "dnsHostName" in res[1])
783 self.assertEquals(str(res[1]["lastLogon"]), "z")
784 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
785 self.assertEquals(str(res[2]["dnsHostName"]), "z")
786 self.assertEquals(str(res[2]["lastLogon"]), "z")
788 # Search by complex parse tree
789 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
790 self.assertEquals(len(res), 7)
791 res = sorted(res, key=attrgetter('dn'))
792 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
793 self.assertTrue(not "dnsHostName" in res[0])
794 self.assertEquals(str(res[0]["lastLogon"]), "x")
795 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
796 self.assertTrue(not "dnsHostName" in res[1])
797 self.assertEquals(str(res[1]["lastLogon"]), "y")
798 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
799 self.assertTrue(not "dnsHostName" in res[2])
800 self.assertEquals(str(res[2]["lastLogon"]), "z")
801 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=X"))
802 self.assertEquals(str(res[3]["dnsHostName"]), "x")
803 self.assertEquals(str(res[3]["lastLogon"]), "x")
804 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=Z"))
805 self.assertEquals(str(res[4]["dnsHostName"]), "z")
806 self.assertEquals(str(res[4]["lastLogon"]), "z")
809 dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
813 def test_map_modify_local(self):
814 """Modification of local records."""
816 dn = "cn=test,dc=idealx,dc=org"
817 self.ldb.add({"dn": dn,
821 "description": "test"})
823 attrs = ["foo", "revision", "description"]
824 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
825 self.assertEquals(len(res), 1)
826 self.assertEquals(str(res[0].dn), dn)
827 self.assertEquals(str(res[0]["foo"]), "bar")
828 self.assertEquals(str(res[0]["revision"]), "1")
829 self.assertEquals(str(res[0]["description"]), "test")
830 # Check it's not in the local db
831 res = self.samba4.db.search(expression="(cn=test)",
832 scope=SCOPE_DEFAULT, attrs=attrs)
833 self.assertEquals(len(res), 0)
834 # Check it's not in the remote db
835 res = self.samba3.db.search(expression="(cn=test)",
836 scope=SCOPE_DEFAULT, attrs=attrs)
837 self.assertEquals(len(res), 0)
839 # Modify local record
847 self.ldb.modify_ldif(ldif)
849 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
850 self.assertEquals(len(res), 1)
851 self.assertEquals(str(res[0].dn), dn)
852 self.assertEquals(str(res[0]["foo"]), "baz")
853 self.assertEquals(str(res[0]["revision"]), "1")
854 self.assertEquals(str(res[0]["description"]), "foo")
856 # Rename local record
857 dn2 = "cn=toast,dc=idealx,dc=org"
858 self.ldb.rename(dn, dn2)
860 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
861 self.assertEquals(len(res), 1)
862 self.assertEquals(str(res[0].dn), dn2)
863 self.assertEquals(str(res[0]["foo"]), "baz")
864 self.assertEquals(str(res[0]["revision"]), "1")
865 self.assertEquals(str(res[0]["description"]), "foo")
867 # Delete local record
870 res = self.ldb.search(dn2, scope=SCOPE_BASE)
871 self.assertEquals(len(res), 0)
873 def test_map_modify_remote_remote(self):
874 """Modification of remote data of remote records"""
876 dn = self.samba4.dn("cn=test")
877 dn2 = self.samba3.dn("cn=test")
878 self.samba3.db.add({"dn": dn2,
880 "description": "foo",
881 "sambaBadPasswordCount": "3",
882 "sambaNextRid": "1001"})
884 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
885 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
886 self.assertEquals(len(res), 1)
887 self.assertEquals(str(res[0].dn), dn2)
888 self.assertEquals(str(res[0]["description"]), "foo")
889 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
890 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
892 attrs = ["description", "badPwdCount", "nextRid"]
893 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
894 self.assertEquals(len(res), 1)
895 self.assertEquals(str(res[0].dn), dn)
896 self.assertEquals(str(res[0]["description"]), "foo")
897 self.assertEquals(str(res[0]["badPwdCount"]), "3")
898 self.assertEquals(str(res[0]["nextRid"]), "1001")
900 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
901 self.assertEquals(len(res), 0)
903 # Modify remote data of remote record
911 self.ldb.modify_ldif(ldif)
913 res = self.ldb.search(dn, scope=SCOPE_BASE,
914 attrs=["description", "badPwdCount", "nextRid"])
915 self.assertEquals(len(res), 1)
916 self.assertEquals(str(res[0].dn), dn)
917 self.assertEquals(str(res[0]["description"]), "test")
918 self.assertEquals(str(res[0]["badPwdCount"]), "4")
919 self.assertEquals(str(res[0]["nextRid"]), "1001")
921 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
922 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
923 self.assertEquals(len(res), 1)
924 self.assertEquals(str(res[0].dn), dn2)
925 self.assertEquals(str(res[0]["description"]), "test")
926 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
927 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
929 # Rename remote record
930 dn2 = self.samba4.dn("cn=toast")
931 self.ldb.rename(dn, dn2)
934 res = self.ldb.search(dn, scope=SCOPE_BASE,
935 attrs=["description", "badPwdCount", "nextRid"])
936 self.assertEquals(len(res), 1)
937 self.assertEquals(str(res[0].dn), dn)
938 self.assertEquals(str(res[0]["description"]), "test")
939 self.assertEquals(str(res[0]["badPwdCount"]), "4")
940 self.assertEquals(str(res[0]["nextRid"]), "1001")
942 dn2 = self.samba3.dn("cn=toast")
943 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
944 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
945 self.assertEquals(len(res), 1)
946 self.assertEquals(str(res[0].dn), dn2)
947 self.assertEquals(str(res[0]["description"]), "test")
948 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
949 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
951 # Delete remote record
953 # Check in mapped db that it's removed
954 res = self.ldb.search(dn, scope=SCOPE_BASE)
955 self.assertEquals(len(res), 0)
957 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
958 self.assertEquals(len(res), 0)
960 def test_map_modify_remote_local(self):
961 """Modification of local data of remote records"""
962 # Add remote record (same as before)
963 dn = self.samba4.dn("cn=test")
964 dn2 = self.samba3.dn("cn=test")
965 self.samba3.db.add({"dn": dn2,
967 "description": "foo",
968 "sambaBadPasswordCount": "3",
969 "sambaNextRid": "1001"})
971 # Modify local data of remote record
980 self.ldb.modify_ldif(ldif)
982 attrs = ["revision", "description"]
983 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
984 self.assertEquals(len(res), 1)
985 self.assertEquals(str(res[0].dn), dn)
986 self.assertEquals(str(res[0]["description"]), "test")
987 self.assertEquals(str(res[0]["revision"]), "1")
989 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
990 self.assertEquals(len(res), 1)
991 self.assertEquals(str(res[0].dn), dn2)
992 self.assertEquals(str(res[0]["description"]), "test")
993 self.assertTrue(not "revision" in res[0])
995 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
996 self.assertEquals(len(res), 1)
997 self.assertEquals(str(res[0].dn), dn)
998 self.assertTrue(not "description" in res[0])
999 self.assertEquals(str(res[0]["revision"]), "1")
1001 # Delete (newly) split record
1004 def test_map_modify_split(self):
1005 """Testing modification of split records"""
1007 dn = self.samba4.dn("cn=test")
1008 dn2 = self.samba3.dn("cn=test")
1012 "description": "foo",
1017 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1018 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1019 self.assertEquals(len(res), 1)
1020 self.assertEquals(str(res[0].dn), dn)
1021 self.assertEquals(str(res[0]["description"]), "foo")
1022 self.assertEquals(str(res[0]["badPwdCount"]), "3")
1023 self.assertEquals(str(res[0]["nextRid"]), "1001")
1024 self.assertEquals(str(res[0]["revision"]), "1")
1026 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1027 self.assertEquals(len(res), 1)
1028 self.assertEquals(str(res[0].dn), dn)
1029 self.assertTrue(not "description" in res[0])
1030 self.assertTrue(not "badPwdCount" in res[0])
1031 self.assertTrue(not "nextRid" in res[0])
1032 self.assertEquals(str(res[0]["revision"]), "1")
1033 # Check in remote db
1034 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1036 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1037 self.assertEquals(len(res), 1)
1038 self.assertEquals(str(res[0].dn), dn2)
1039 self.assertEquals(str(res[0]["description"]), "foo")
1040 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1041 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1042 self.assertTrue(not "revision" in res[0])
1044 # Modify of split record
1047 replace: description
1049 replace: badPwdCount
1054 self.ldb.modify_ldif(ldif)
1055 # Check in mapped db
1056 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1057 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1058 self.assertEquals(len(res), 1)
1059 self.assertEquals(str(res[0].dn), dn)
1060 self.assertEquals(str(res[0]["description"]), "test")
1061 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1062 self.assertEquals(str(res[0]["nextRid"]), "1001")
1063 self.assertEquals(str(res[0]["revision"]), "2")
1065 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1066 self.assertEquals(len(res), 1)
1067 self.assertEquals(str(res[0].dn), dn)
1068 self.assertTrue(not "description" in res[0])
1069 self.assertTrue(not "badPwdCount" in res[0])
1070 self.assertTrue(not "nextRid" in res[0])
1071 self.assertEquals(str(res[0]["revision"]), "2")
1072 # Check in remote db
1073 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1075 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1076 self.assertEquals(len(res), 1)
1077 self.assertEquals(str(res[0].dn), dn2)
1078 self.assertEquals(str(res[0]["description"]), "test")
1079 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1080 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1081 self.assertTrue(not "revision" in res[0])
1083 # Rename split record
1084 dn2 = self.samba4.dn("cn=toast")
1085 self.ldb.rename(dn, dn2)
1086 # Check in mapped db
1088 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1089 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1090 self.assertEquals(len(res), 1)
1091 self.assertEquals(str(res[0].dn), dn)
1092 self.assertEquals(str(res[0]["description"]), "test")
1093 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1094 self.assertEquals(str(res[0]["nextRid"]), "1001")
1095 self.assertEquals(str(res[0]["revision"]), "2")
1097 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1098 self.assertEquals(len(res), 1)
1099 self.assertEquals(str(res[0].dn), dn)
1100 self.assertTrue(not "description" in res[0])
1101 self.assertTrue(not "badPwdCount" in res[0])
1102 self.assertTrue(not "nextRid" in res[0])
1103 self.assertEquals(str(res[0]["revision"]), "2")
1104 # Check in remote db
1105 dn2 = self.samba3.dn("cn=toast")
1106 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1107 attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1109 self.assertEquals(len(res), 1)
1110 self.assertEquals(str(res[0].dn), dn2)
1111 self.assertEquals(str(res[0]["description"]), "test")
1112 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1113 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1114 self.assertTrue(not "revision" in res[0])
1116 # Delete split record
1118 # Check in mapped db
1119 res = self.ldb.search(dn, scope=SCOPE_BASE)
1120 self.assertEquals(len(res), 0)
1122 res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1123 self.assertEquals(len(res), 0)
1124 # Check in remote db
1125 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1126 self.assertEquals(len(res), 0)