1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
3 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
5 # This is a Python port of the original in testprogs/ejs/samba3sam.js
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import print_function
22 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
26 from ldb import SCOPE_DEFAULT, SCOPE_BASE
27 from samba import Ldb, substitute_var
28 from samba.tests import TestCaseInTempDir, env_loadparm
29 import samba.dcerpc.security
31 from samba.auth import system_session
32 from operator import attrgetter
35 def read_datafile(filename):
36 paths = ["../../../../../testdata/samba3",
37 "../../../../testdata/samba3"]
39 datadir = os.path.join(os.path.dirname(__file__), p)
40 if os.path.exists(datadir):
42 return open(os.path.join(datadir, filename), 'r').read()
45 def ldb_debug(l, text):
49 class MapBaseTestCase(TestCaseInTempDir):
50 """Base test case for mapping tests."""
52 def setup_modules(self, ldb, s3, s4):
53 ldb.add({"dn": "@MAP=samba3sam",
55 "@TO": "sambaDomainName=TESTS," + s3.basedn})
57 ldb.add({"dn": "@MODULES",
58 "@LIST": "rootdse,dsdb_paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,show_deleted_ignore,dsdb_flags_ignore,partition"})
60 ldb.add({"dn": "@PARTITION",
61 "partition": ["%s" % (s4.basedn_casefold),
62 "%s" % (s3.basedn_casefold)],
63 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
67 self.lp = env_loadparm()
68 self.lp.set("workgroup", "TESTS")
69 self.lp.set("netbios name", "TESTS")
70 super(MapBaseTestCase, self).setUp()
72 def make_dn(basedn, rdn):
73 return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
75 def make_s4dn(basedn, rdn):
76 return "%s,%s" % (rdn, basedn)
78 self.ldbfile = os.path.join(self.tempdir, "sam.ldb")
79 self.ldburl = "tdb://" + self.ldbfile
81 tempdir = self.tempdir
84 """Simple helper class that contains data for a specific SAM
87 def __init__(self, basedn, dn, lp):
88 self.db = Ldb(lp=lp, session_info=system_session())
89 self.db.set_opaque("skip_allocate_sids", "true")
91 self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold()
92 self.substvars = {"BASEDN": self.basedn}
93 self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold)
94 self.url = "tdb://" + self.file
98 return self._dn(self.basedn, rdn)
101 return self.db.connect(self.url)
103 def setup_data(self, path):
104 self.add_ldif(read_datafile(path))
106 def subst(self, text):
107 return substitute_var(text, self.substvars)
109 def add_ldif(self, ldif):
110 self.db.add_ldif(self.subst(ldif))
112 def modify_ldif(self, ldif):
113 self.db.modify_ldif(self.subst(ldif))
115 self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn, self.lp)
116 self.samba3 = Target("cn=Samba3Sam", make_dn, self.lp)
118 self.samba3.connect()
119 self.samba4.connect()
122 os.unlink(self.ldbfile)
123 os.unlink(self.samba3.file)
124 os.unlink(self.samba4.file)
125 pdir = "%s.d" % self.ldbfile
126 mdata = os.path.join(pdir, "metadata.tdb")
127 if os.path.exists(mdata):
130 super(MapBaseTestCase, self).tearDown()
132 def assertSidEquals(self, text, ndr_sid):
133 sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
135 sid_obj2 = samba.dcerpc.security.dom_sid(text)
136 self.assertEquals(sid_obj1, sid_obj2)
139 class Samba3SamTestCase(MapBaseTestCase):
142 super(Samba3SamTestCase, self).setUp()
143 ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
144 ldb.set_opaque("skip_allocate_sids", "true")
145 self.samba3.setup_data("samba3.ldif")
146 ldif = read_datafile("provision_samba3sam.ldif")
147 ldb.add_ldif(self.samba4.subst(ldif))
148 self.setup_modules(ldb, self.samba3, self.samba4)
150 self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
151 self.ldb.set_opaque("skip_allocate_sids", "true")
153 def test_search_non_mapped(self):
154 """Looking up by non-mapped attribute"""
155 msg = self.ldb.search(expression="(cn=Administrator)")
156 self.assertEquals(len(msg), 1)
157 self.assertEquals(str(msg[0]["cn"]), "Administrator")
159 def test_search_mapped(self):
160 """Looking up by mapped attribute"""
161 msg = self.ldb.search(expression="(name=Backup Operators)")
162 self.assertEquals(len(msg), 1)
163 self.assertEquals(str(msg[0]["name"]), "Backup Operators")
165 def test_old_name_of_renamed(self):
166 """Looking up by old name of renamed attribute"""
167 msg = self.ldb.search(expression="(displayName=Backup Operators)")
168 self.assertEquals(len(msg), 0)
170 def test_mapped_containing_sid(self):
171 """Looking up mapped entry containing SID"""
172 msg = self.ldb.search(expression="(cn=Replicator)")
173 self.assertEquals(len(msg), 1)
174 self.assertEquals(str(msg[0].dn),
175 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
176 self.assertTrue("objectSid" in msg[0])
177 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
179 oc = set(msg[0]["objectClass"])
180 self.assertEquals(oc, set([b"group"]))
182 def test_search_by_objclass(self):
183 """Looking up by objectClass"""
184 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
185 self.assertEquals(set([str(m.dn) for m in msg]),
186 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
187 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
189 def test_s3sam_modify(self):
190 # Adding a record that will be fallbacked
196 "showInAdvancedViewOnly": "TRUE"})
198 # Checking for existence of record (local)
199 # TODO: This record must be searched in the local database, which is
200 # currently only supported for base searches
201 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
202 # TODO: Actually, this version should work as well but doesn't...
205 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
207 attrs=['foo', 'blah', 'cn', 'showInAdvancedViewOnly'])
208 self.assertEquals(len(msg), 1)
209 self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
210 self.assertEquals(str(msg[0]["foo"]), "bar")
211 self.assertEquals(str(msg[0]["blah"]), "Blie")
213 # Adding record that will be mapped
214 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
215 "objectClass": "user",
217 "sambaUnicodePwd": "geheim",
220 # Checking for existence of record (remote)
221 msg = self.ldb.search(expression="(unixName=bin)",
222 attrs=['unixName', 'cn', 'dn', 'sambaUnicodePwd'])
223 self.assertEquals(len(msg), 1)
224 self.assertEquals(str(msg[0]["cn"]), "Niemand")
225 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
227 # Checking for existence of record (local && remote)
228 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
229 attrs=['unixName', 'cn', 'dn', 'sambaUnicodePwd'])
230 self.assertEquals(len(msg), 1) # TODO: should check with more records
231 self.assertEquals(str(msg[0]["cn"]), "Niemand")
232 self.assertEquals(str(msg[0]["unixName"]), "bin")
233 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
235 # Checking for existence of record (local || remote)
236 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
237 attrs=['unixName', 'cn', 'dn', 'sambaUnicodePwd'])
238 # print "got %d replies" % len(msg)
239 self.assertEquals(len(msg), 1) # TODO: should check with more records
240 self.assertEquals(str(msg[0]["cn"]), "Niemand")
241 self.assertEquals(str(msg[0]["unixName"]), "bin")
242 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
244 # Checking for data in destination database
245 msg = self.samba3.db.search(expression="(cn=Niemand)")
246 self.assertTrue(len(msg) >= 1)
247 self.assertEquals(str(msg[0]["sambaSID"]),
248 "S-1-5-21-4231626423-2410014848-2360679739-2001")
249 self.assertEquals(str(msg[0]["displayName"]), "Niemand")
251 # Adding attribute...
252 self.ldb.modify_ldif("""
253 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
259 # Checking whether changes are still there...
260 msg = self.ldb.search(expression="(cn=Niemand)")
261 self.assertTrue(len(msg) >= 1)
262 self.assertEquals(str(msg[0]["cn"]), "Niemand")
263 self.assertEquals(str(msg[0]["description"]), "Blah")
265 # Modifying attribute...
266 self.ldb.modify_ldif("""
267 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
273 # Checking whether changes are still there...
274 msg = self.ldb.search(expression="(cn=Niemand)")
275 self.assertTrue(len(msg) >= 1)
276 self.assertEquals(str(msg[0]["description"]), "Blie")
278 # Deleting attribute...
279 self.ldb.modify_ldif("""
280 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
285 # Checking whether changes are no longer there...
286 msg = self.ldb.search(expression="(cn=Niemand)")
287 self.assertTrue(len(msg) >= 1)
288 self.assertTrue("description" not in msg[0])
291 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
292 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
294 # Checking whether DN has changed...
295 msg = self.ldb.search(expression="(cn=Niemand2)")
296 self.assertEquals(len(msg), 1)
297 self.assertEquals(str(msg[0].dn),
298 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
301 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
303 # Checking whether record is gone...
304 msg = self.ldb.search(expression="(cn=Niemand2)")
305 self.assertEquals(len(msg), 0)
308 class MapTestCase(MapBaseTestCase):
311 super(MapTestCase, self).setUp()
312 ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
313 ldb.set_opaque("skip_allocate_sids", "true")
314 ldif = read_datafile("provision_samba3sam.ldif")
315 ldb.add_ldif(self.samba4.subst(ldif))
316 self.setup_modules(ldb, self.samba3, self.samba4)
318 self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
319 self.ldb.set_opaque("skip_allocate_sids", "true")
321 def test_map_search(self):
322 """Running search tests on mapped data."""
324 "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
325 "objectclass": ["sambaDomain", "top"],
326 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
327 "sambaNextRid": "2000",
328 "sambaDomainName": "TESTS"
331 # Add a set of split records
332 self.ldb.add_ldif("""
333 dn: """ + self.samba4.dn("cn=Domain Users") + """
336 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
339 # Add a set of split records
340 self.ldb.add_ldif("""
341 dn: """ + self.samba4.dn("cn=X") + """
350 objectSid: S-1-5-21-4231626423-2410014848-2360679739-1052
354 "dn": self.samba4.dn("cn=Y"),
355 "objectClass": "top",
365 "dn": self.samba4.dn("cn=Z"),
366 "objectClass": "top",
375 # Add a set of remote records
378 "dn": self.samba3.dn("cn=A"),
379 "objectClass": "posixAccount",
382 "sambaBadPasswordCount": "x",
383 "sambaLogonTime": "x",
385 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-1052",
386 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
389 "dn": self.samba3.dn("cn=B"),
390 "objectClass": "top",
393 "sambaBadPasswordCount": "x",
394 "sambaLogonTime": "y",
398 "dn": self.samba3.dn("cn=C"),
399 "objectClass": "top",
402 "sambaBadPasswordCount": "y",
403 "sambaLogonTime": "z",
406 # Testing search by DN
408 # Search remote record by local DN
409 dn = self.samba4.dn("cn=A")
410 res = self.ldb.search(dn, scope=SCOPE_BASE,
411 attrs=["dnsHostName", "lastLogon"])
412 self.assertEquals(len(res), 1)
413 self.assertEquals(str(res[0].dn), dn)
414 self.assertTrue("dnsHostName" not in res[0])
415 self.assertEquals(str(res[0]["lastLogon"]), "x")
417 # Search remote record by remote DN
418 dn = self.samba3.dn("cn=A")
419 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
420 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
421 self.assertEquals(len(res), 1)
422 self.assertEquals(str(res[0].dn), dn)
423 self.assertTrue("dnsHostName" not in res[0])
424 self.assertTrue("lastLogon" not in res[0])
425 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
427 # Search split record by local DN
428 dn = self.samba4.dn("cn=X")
429 res = self.ldb.search(dn, scope=SCOPE_BASE,
430 attrs=["dnsHostName", "lastLogon"])
431 self.assertEquals(len(res), 1)
432 self.assertEquals(str(res[0].dn), dn)
433 self.assertEquals(str(res[0]["dnsHostName"]), "x")
434 self.assertEquals(str(res[0]["lastLogon"]), "x")
436 # Search split record by remote DN
437 dn = self.samba3.dn("cn=X")
438 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
439 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
440 self.assertEquals(len(res), 1)
441 self.assertEquals(str(res[0].dn), dn)
442 self.assertTrue("dnsHostName" not in res[0])
443 self.assertTrue("lastLogon" not in res[0])
444 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
446 # Testing search by attribute
448 # Search by ignored attribute
449 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
450 attrs=["dnsHostName", "lastLogon"])
451 self.assertEquals(len(res), 2)
452 res = sorted(res, key=attrgetter('dn'))
453 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
454 self.assertEquals(str(res[0]["dnsHostName"]), "x")
455 self.assertEquals(str(res[0]["lastLogon"]), "x")
456 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
457 self.assertEquals(str(res[1]["dnsHostName"]), "y")
458 self.assertEquals(str(res[1]["lastLogon"]), "y")
460 # Search by kept attribute
461 res = self.ldb.search(expression="(description=y)",
462 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
463 self.assertEquals(len(res), 2)
464 res = sorted(res, key=attrgetter('dn'))
465 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
466 self.assertTrue("dnsHostName" not in res[0])
467 self.assertEquals(str(res[0]["lastLogon"]), "z")
468 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
469 self.assertEquals(str(res[1]["dnsHostName"]), "z")
470 self.assertEquals(str(res[1]["lastLogon"]), "z")
472 # Search by renamed attribute
473 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
474 attrs=["dnsHostName", "lastLogon"])
475 self.assertEquals(len(res), 2)
476 res = sorted(res, key=attrgetter('dn'))
477 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
478 self.assertTrue("dnsHostName" not in res[0])
479 self.assertEquals(str(res[0]["lastLogon"]), "x")
480 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
481 self.assertTrue("dnsHostName" not in res[1])
482 self.assertEquals(str(res[1]["lastLogon"]), "y")
484 # Search by converted attribute
486 # Using the SID directly in the parse tree leads to conversion
487 # errors, letting the search fail with no results.
488 # res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-1052)", scope=SCOPE_DEFAULT, attrs)
489 res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
490 self.assertEquals(len(res), 4)
491 res = sorted(res, key=attrgetter('dn'))
492 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
493 self.assertEquals(str(res[1]["dnsHostName"]), "x")
494 self.assertEquals(str(res[1]["lastLogon"]), "x")
495 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
497 self.assertTrue("objectSid" in res[1])
498 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
499 self.assertTrue("dnsHostName" not in res[0])
500 self.assertEquals(str(res[0]["lastLogon"]), "x")
501 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-1052",
503 self.assertTrue("objectSid" in res[0])
505 # Search by generated attribute
506 # In most cases, this even works when the mapping is missing
507 # a `convert_operator' by enumerating the remote db.
508 res = self.ldb.search(expression="(primaryGroupID=512)",
509 attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
510 self.assertEquals(len(res), 1)
511 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
512 self.assertTrue("dnsHostName" not in res[0])
513 self.assertEquals(str(res[0]["lastLogon"]), "x")
514 self.assertEquals(str(res[0]["primaryGroupID"]), "512")
516 # Note that Xs "objectSid" seems to be fine in the previous search for
518 # res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
519 # print len(res) + " results found"
520 # for i in range(len(res)):
521 # for (obj in res[i]) {
522 # print obj + ": " + res[i][obj]
527 # Search by remote name of renamed attribute */
528 res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
529 attrs=["dnsHostName", "lastLogon"])
530 self.assertEquals(len(res), 0)
532 # Search by objectClass
533 attrs = ["dnsHostName", "lastLogon", "objectClass"]
534 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
535 self.assertEquals(len(res), 2)
536 res = sorted(res, key=attrgetter('dn'))
537 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
538 self.assertTrue("dnsHostName" not in res[0])
539 self.assertEquals(str(res[0]["lastLogon"]), "x")
540 self.assertEquals(str(res[0]["objectClass"][0]), "user")
541 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
542 self.assertEquals(str(res[1]["dnsHostName"]), "x")
543 self.assertEquals(str(res[1]["lastLogon"]), "x")
544 self.assertEquals(str(res[1]["objectClass"][0]), "user")
546 # Prove that the objectClass is actually used for the search
547 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
549 self.assertEquals(len(res), 3)
550 res = sorted(res, key=attrgetter('dn'))
551 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
552 self.assertTrue("dnsHostName" not in res[0])
553 self.assertEquals(str(res[0]["lastLogon"]), "x")
554 self.assertEquals(str(res[0]["objectClass"][0]), "user")
555 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
556 self.assertTrue("dnsHostName" not in res[1])
557 self.assertEquals(str(res[1]["lastLogon"]), "y")
558 self.assertEquals(set(res[1]["objectClass"]), set([b"top"]))
559 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
560 self.assertEquals(str(res[2]["dnsHostName"]), "x")
561 self.assertEquals(str(res[2]["lastLogon"]), "x")
562 self.assertEquals(str(res[2]["objectClass"][0]), "user")
564 # Testing search by parse tree
566 # Search by conjunction of local attributes
567 res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
568 attrs=["dnsHostName", "lastLogon"])
569 self.assertEquals(len(res), 2)
570 res = sorted(res, key=attrgetter('dn'))
571 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
572 self.assertEquals(str(res[0]["dnsHostName"]), "x")
573 self.assertEquals(str(res[0]["lastLogon"]), "x")
574 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
575 self.assertEquals(str(res[1]["dnsHostName"]), "y")
576 self.assertEquals(str(res[1]["lastLogon"]), "y")
578 # Search by conjunction of remote attributes
579 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
580 attrs=["dnsHostName", "lastLogon"])
581 self.assertEquals(len(res), 2)
582 res = sorted(res, key=attrgetter('dn'))
583 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
584 self.assertTrue("dnsHostName" not in res[0])
585 self.assertEquals(str(res[0]["lastLogon"]), "x")
586 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
587 self.assertEquals(str(res[1]["dnsHostName"]), "x")
588 self.assertEquals(str(res[1]["lastLogon"]), "x")
590 # Search by conjunction of local and remote attribute
591 res = self.ldb.search(expression="(&(codePage=x)(description=x))",
592 attrs=["dnsHostName", "lastLogon"])
593 self.assertEquals(len(res), 2)
594 res = sorted(res, key=attrgetter('dn'))
595 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
596 self.assertEquals(str(res[0]["dnsHostName"]), "x")
597 self.assertEquals(str(res[0]["lastLogon"]), "x")
598 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
599 self.assertEquals(str(res[1]["dnsHostName"]), "y")
600 self.assertEquals(str(res[1]["lastLogon"]), "y")
602 # Search by conjunction of local and remote attribute w/o match
603 attrs = ["dnsHostName", "lastLogon"]
604 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
606 self.assertEquals(len(res), 0)
607 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
609 self.assertEquals(len(res), 0)
611 # Search by disjunction of local attributes
612 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
613 attrs=["dnsHostName", "lastLogon"])
614 self.assertEquals(len(res), 2)
615 res = sorted(res, key=attrgetter('dn'))
616 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
617 self.assertEquals(str(res[0]["dnsHostName"]), "x")
618 self.assertEquals(str(res[0]["lastLogon"]), "x")
619 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
620 self.assertEquals(str(res[1]["dnsHostName"]), "y")
621 self.assertEquals(str(res[1]["lastLogon"]), "y")
623 # Search by disjunction of remote attributes
624 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
625 attrs=["dnsHostName", "lastLogon"])
626 self.assertEquals(len(res), 3)
627 res = sorted(res, key=attrgetter('dn'))
628 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
629 self.assertFalse("dnsHostName" in res[0])
630 self.assertEquals(str(res[0]["lastLogon"]), "x")
631 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
632 self.assertFalse("dnsHostName" in res[1])
633 self.assertEquals(str(res[1]["lastLogon"]), "y")
634 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
635 self.assertEquals(str(res[2]["dnsHostName"]), "x")
636 self.assertEquals(str(res[2]["lastLogon"]), "x")
638 # Search by disjunction of local and remote attribute
639 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
640 attrs=["dnsHostName", "lastLogon"])
641 self.assertEquals(len(res), 3)
642 res = sorted(res, key=attrgetter('dn'))
643 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
644 self.assertFalse("dnsHostName" in res[0])
645 self.assertEquals(str(res[0]["lastLogon"]), "y")
646 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
647 self.assertEquals(str(res[1]["dnsHostName"]), "x")
648 self.assertEquals(str(res[1]["lastLogon"]), "x")
649 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
650 self.assertEquals(str(res[2]["dnsHostName"]), "y")
651 self.assertEquals(str(res[2]["lastLogon"]), "y")
653 # Search by disjunction of local and remote attribute w/o match
654 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
655 attrs=["dnsHostName", "lastLogon"])
656 self.assertEquals(len(res), 0)
658 # Search by negated local attribute
659 res = self.ldb.search(expression="(!(revision=x))",
660 attrs=["dnsHostName", "lastLogon"])
661 self.assertEquals(len(res), 6)
662 res = sorted(res, key=attrgetter('dn'))
663 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
664 self.assertTrue("dnsHostName" not in res[0])
665 self.assertEquals(str(res[0]["lastLogon"]), "x")
666 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
667 self.assertTrue("dnsHostName" not in res[1])
668 self.assertEquals(str(res[1]["lastLogon"]), "y")
669 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
670 self.assertTrue("dnsHostName" not in res[2])
671 self.assertEquals(str(res[2]["lastLogon"]), "z")
672 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
673 self.assertEquals(str(res[3]["dnsHostName"]), "z")
674 self.assertEquals(str(res[3]["lastLogon"]), "z")
676 # Search by negated remote attribute
677 res = self.ldb.search(expression="(!(description=x))",
678 attrs=["dnsHostName", "lastLogon"])
679 self.assertEquals(len(res), 4)
680 res = sorted(res, key=attrgetter('dn'))
681 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
682 self.assertTrue("dnsHostName" not in res[0])
683 self.assertEquals(str(res[0]["lastLogon"]), "z")
684 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
685 self.assertEquals(str(res[1]["dnsHostName"]), "z")
686 self.assertEquals(str(res[1]["lastLogon"]), "z")
688 # Search by negated conjunction of local attributes
689 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
690 attrs=["dnsHostName", "lastLogon"])
691 self.assertEquals(len(res), 6)
692 res = sorted(res, key=attrgetter('dn'))
693 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
694 self.assertTrue("dnsHostName" not in res[0])
695 self.assertEquals(str(res[0]["lastLogon"]), "x")
696 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
697 self.assertTrue("dnsHostName" not in res[1])
698 self.assertEquals(str(res[1]["lastLogon"]), "y")
699 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
700 self.assertTrue("dnsHostName" not in res[2])
701 self.assertEquals(str(res[2]["lastLogon"]), "z")
702 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
703 self.assertEquals(str(res[3]["dnsHostName"]), "z")
704 self.assertEquals(str(res[3]["lastLogon"]), "z")
706 # Search by negated conjunction of remote attributes
707 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
708 attrs=["dnsHostName", "lastLogon"])
709 self.assertEquals(len(res), 6)
710 res = sorted(res, key=attrgetter('dn'))
711 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
712 self.assertTrue("dnsHostName" not in res[0])
713 self.assertEquals(str(res[0]["lastLogon"]), "y")
714 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
715 self.assertTrue("dnsHostName" not in res[1])
716 self.assertEquals(str(res[1]["lastLogon"]), "z")
717 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
718 self.assertEquals(str(res[2]["dnsHostName"]), "y")
719 self.assertEquals(str(res[2]["lastLogon"]), "y")
720 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
721 self.assertEquals(str(res[3]["dnsHostName"]), "z")
722 self.assertEquals(str(res[3]["lastLogon"]), "z")
724 # Search by negated conjunction of local and remote attribute
725 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
726 attrs=["dnsHostName", "lastLogon"])
727 self.assertEquals(len(res), 6)
728 res = sorted(res, key=attrgetter('dn'))
729 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
730 self.assertTrue("dnsHostName" not in res[0])
731 self.assertEquals(str(res[0]["lastLogon"]), "x")
732 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
733 self.assertTrue("dnsHostName" not in res[1])
734 self.assertEquals(str(res[1]["lastLogon"]), "y")
735 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
736 self.assertTrue("dnsHostName" not in res[2])
737 self.assertEquals(str(res[2]["lastLogon"]), "z")
738 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
739 self.assertEquals(str(res[3]["dnsHostName"]), "z")
740 self.assertEquals(str(res[3]["lastLogon"]), "z")
742 # Search by negated disjunction of local attributes
743 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
744 attrs=["dnsHostName", "lastLogon"])
745 res = sorted(res, key=attrgetter('dn'))
746 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
747 self.assertTrue("dnsHostName" not in res[0])
748 self.assertEquals(str(res[0]["lastLogon"]), "x")
749 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
750 self.assertTrue("dnsHostName" not in res[1])
751 self.assertEquals(str(res[1]["lastLogon"]), "y")
752 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
753 self.assertTrue("dnsHostName" not in res[2])
754 self.assertEquals(str(res[2]["lastLogon"]), "z")
755 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
756 self.assertEquals(str(res[3]["dnsHostName"]), "z")
757 self.assertEquals(str(res[3]["lastLogon"]), "z")
759 # Search by negated disjunction of remote attributes
760 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
761 attrs=["dnsHostName", "lastLogon"])
762 self.assertEquals(len(res), 5)
763 res = sorted(res, key=attrgetter('dn'))
764 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
765 self.assertTrue("dnsHostName" not in res[0])
766 self.assertEquals(str(res[0]["lastLogon"]), "z")
767 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
768 self.assertEquals(str(res[1]["dnsHostName"]), "y")
769 self.assertEquals(str(res[1]["lastLogon"]), "y")
770 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
771 self.assertEquals(str(res[2]["dnsHostName"]), "z")
772 self.assertEquals(str(res[2]["lastLogon"]), "z")
774 # Search by negated disjunction of local and remote attribute
775 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
776 attrs=["dnsHostName", "lastLogon"])
777 self.assertEquals(len(res), 5)
778 res = sorted(res, key=attrgetter('dn'))
779 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
780 self.assertTrue("dnsHostName" not in res[0])
781 self.assertEquals(str(res[0]["lastLogon"]), "x")
782 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
783 self.assertTrue("dnsHostName" not in res[1])
784 self.assertEquals(str(res[1]["lastLogon"]), "z")
785 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
786 self.assertEquals(str(res[2]["dnsHostName"]), "z")
787 self.assertEquals(str(res[2]["lastLogon"]), "z")
789 # Search by complex parse tree
790 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
791 self.assertEquals(len(res), 7)
792 res = sorted(res, key=attrgetter('dn'))
793 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
794 self.assertTrue("dnsHostName" not in res[0])
795 self.assertEquals(str(res[0]["lastLogon"]), "x")
796 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
797 self.assertTrue("dnsHostName" not in res[1])
798 self.assertEquals(str(res[1]["lastLogon"]), "y")
799 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
800 self.assertTrue("dnsHostName" not in res[2])
801 self.assertEquals(str(res[2]["lastLogon"]), "z")
802 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=X"))
803 self.assertEquals(str(res[3]["dnsHostName"]), "x")
804 self.assertEquals(str(res[3]["lastLogon"]), "x")
805 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=Z"))
806 self.assertEquals(str(res[4]["dnsHostName"]), "z")
807 self.assertEquals(str(res[4]["lastLogon"]), "z")
810 dns = [self.samba4.dn("cn=%s" % n) for n in ["A", "B", "C", "X", "Y", "Z"]]
814 def test_map_modify_local(self):
815 """Modification of local records."""
817 dn = "cn=test,dc=idealx,dc=org"
818 self.ldb.add({"dn": dn,
822 "description": "test"})
824 attrs = ["foo", "revision", "description"]
825 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
826 self.assertEquals(len(res), 1)
827 self.assertEquals(str(res[0].dn), dn)
828 self.assertEquals(str(res[0]["foo"]), "bar")
829 self.assertEquals(str(res[0]["revision"]), "1")
830 self.assertEquals(str(res[0]["description"]), "test")
831 # Check it's not in the local db
832 res = self.samba4.db.search(expression="(cn=test)",
833 scope=SCOPE_DEFAULT, attrs=attrs)
834 self.assertEquals(len(res), 0)
835 # Check it's not in the remote db
836 res = self.samba3.db.search(expression="(cn=test)",
837 scope=SCOPE_DEFAULT, attrs=attrs)
838 self.assertEquals(len(res), 0)
840 # Modify local record
848 self.ldb.modify_ldif(ldif)
850 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
851 self.assertEquals(len(res), 1)
852 self.assertEquals(str(res[0].dn), dn)
853 self.assertEquals(str(res[0]["foo"]), "baz")
854 self.assertEquals(str(res[0]["revision"]), "1")
855 self.assertEquals(str(res[0]["description"]), "foo")
857 # Rename local record
858 dn2 = "cn=toast,dc=idealx,dc=org"
859 self.ldb.rename(dn, dn2)
861 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
862 self.assertEquals(len(res), 1)
863 self.assertEquals(str(res[0].dn), dn2)
864 self.assertEquals(str(res[0]["foo"]), "baz")
865 self.assertEquals(str(res[0]["revision"]), "1")
866 self.assertEquals(str(res[0]["description"]), "foo")
868 # Delete local record
871 res = self.ldb.search(dn2, scope=SCOPE_BASE)
872 self.assertEquals(len(res), 0)
874 def test_map_modify_remote_remote(self):
875 """Modification of remote data of remote records"""
877 dn = self.samba4.dn("cn=test")
878 dn2 = self.samba3.dn("cn=test")
879 self.samba3.db.add({"dn": dn2,
881 "description": "foo",
882 "sambaBadPasswordCount": "3",
883 "sambaNextRid": "1001"})
885 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
886 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
887 self.assertEquals(len(res), 1)
888 self.assertEquals(str(res[0].dn), dn2)
889 self.assertEquals(str(res[0]["description"]), "foo")
890 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
891 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
893 attrs = ["description", "badPwdCount", "nextRid"]
894 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
895 self.assertEquals(len(res), 1)
896 self.assertEquals(str(res[0].dn), dn)
897 self.assertEquals(str(res[0]["description"]), "foo")
898 self.assertEquals(str(res[0]["badPwdCount"]), "3")
899 self.assertEquals(str(res[0]["nextRid"]), "1001")
901 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
902 self.assertEquals(len(res), 0)
904 # Modify remote data of remote record
912 self.ldb.modify_ldif(ldif)
914 res = self.ldb.search(dn, scope=SCOPE_BASE,
915 attrs=["description", "badPwdCount", "nextRid"])
916 self.assertEquals(len(res), 1)
917 self.assertEquals(str(res[0].dn), dn)
918 self.assertEquals(str(res[0]["description"]), "test")
919 self.assertEquals(str(res[0]["badPwdCount"]), "4")
920 self.assertEquals(str(res[0]["nextRid"]), "1001")
922 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
923 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
924 self.assertEquals(len(res), 1)
925 self.assertEquals(str(res[0].dn), dn2)
926 self.assertEquals(str(res[0]["description"]), "test")
927 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
928 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
930 # Rename remote record
931 dn2 = self.samba4.dn("cn=toast")
932 self.ldb.rename(dn, dn2)
935 res = self.ldb.search(dn, scope=SCOPE_BASE,
936 attrs=["description", "badPwdCount", "nextRid"])
937 self.assertEquals(len(res), 1)
938 self.assertEquals(str(res[0].dn), dn)
939 self.assertEquals(str(res[0]["description"]), "test")
940 self.assertEquals(str(res[0]["badPwdCount"]), "4")
941 self.assertEquals(str(res[0]["nextRid"]), "1001")
943 dn2 = self.samba3.dn("cn=toast")
944 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
945 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
946 self.assertEquals(len(res), 1)
947 self.assertEquals(str(res[0].dn), dn2)
948 self.assertEquals(str(res[0]["description"]), "test")
949 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
950 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
952 # Delete remote record
954 # Check in mapped db that it's removed
955 res = self.ldb.search(dn, scope=SCOPE_BASE)
956 self.assertEquals(len(res), 0)
958 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
959 self.assertEquals(len(res), 0)
961 def test_map_modify_remote_local(self):
962 """Modification of local data of remote records"""
963 # Add remote record (same as before)
964 dn = self.samba4.dn("cn=test")
965 dn2 = self.samba3.dn("cn=test")
966 self.samba3.db.add({"dn": dn2,
968 "description": "foo",
969 "sambaBadPasswordCount": "3",
970 "sambaNextRid": "1001"})
972 # Modify local data of remote record
981 self.ldb.modify_ldif(ldif)
983 attrs = ["revision", "description"]
984 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
985 self.assertEquals(len(res), 1)
986 self.assertEquals(str(res[0].dn), dn)
987 self.assertEquals(str(res[0]["description"]), "test")
988 self.assertEquals(str(res[0]["revision"]), "1")
990 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
991 self.assertEquals(len(res), 1)
992 self.assertEquals(str(res[0].dn), dn2)
993 self.assertEquals(str(res[0]["description"]), "test")
994 self.assertTrue("revision" not in res[0])
996 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
997 self.assertEquals(len(res), 1)
998 self.assertEquals(str(res[0].dn), dn)
999 self.assertTrue("description" not in res[0])
1000 self.assertEquals(str(res[0]["revision"]), "1")
1002 # Delete (newly) split record
1005 def test_map_modify_split(self):
1006 """Testing modification of split records"""
1008 dn = self.samba4.dn("cn=test")
1009 dn2 = self.samba3.dn("cn=test")
1013 "description": "foo",
1018 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1019 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1020 self.assertEquals(len(res), 1)
1021 self.assertEquals(str(res[0].dn), dn)
1022 self.assertEquals(str(res[0]["description"]), "foo")
1023 self.assertEquals(str(res[0]["badPwdCount"]), "3")
1024 self.assertEquals(str(res[0]["nextRid"]), "1001")
1025 self.assertEquals(str(res[0]["revision"]), "1")
1027 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1028 self.assertEquals(len(res), 1)
1029 self.assertEquals(str(res[0].dn), dn)
1030 self.assertTrue("description" not in res[0])
1031 self.assertTrue("badPwdCount" not in res[0])
1032 self.assertTrue("nextRid" not in res[0])
1033 self.assertEquals(str(res[0]["revision"]), "1")
1034 # Check in remote db
1035 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1037 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1038 self.assertEquals(len(res), 1)
1039 self.assertEquals(str(res[0].dn), dn2)
1040 self.assertEquals(str(res[0]["description"]), "foo")
1041 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1042 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1043 self.assertTrue("revision" not in res[0])
1045 # Modify of split record
1048 replace: description
1050 replace: badPwdCount
1055 self.ldb.modify_ldif(ldif)
1056 # Check in mapped db
1057 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1058 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1059 self.assertEquals(len(res), 1)
1060 self.assertEquals(str(res[0].dn), dn)
1061 self.assertEquals(str(res[0]["description"]), "test")
1062 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1063 self.assertEquals(str(res[0]["nextRid"]), "1001")
1064 self.assertEquals(str(res[0]["revision"]), "2")
1066 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1067 self.assertEquals(len(res), 1)
1068 self.assertEquals(str(res[0].dn), dn)
1069 self.assertTrue("description" not in res[0])
1070 self.assertTrue("badPwdCount" not in res[0])
1071 self.assertTrue("nextRid" not in res[0])
1072 self.assertEquals(str(res[0]["revision"]), "2")
1073 # Check in remote db
1074 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1076 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1077 self.assertEquals(len(res), 1)
1078 self.assertEquals(str(res[0].dn), dn2)
1079 self.assertEquals(str(res[0]["description"]), "test")
1080 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1081 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1082 self.assertTrue("revision" not in res[0])
1084 # Rename split record
1085 dn2 = self.samba4.dn("cn=toast")
1086 self.ldb.rename(dn, dn2)
1087 # Check in mapped db
1089 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1090 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1091 self.assertEquals(len(res), 1)
1092 self.assertEquals(str(res[0].dn), dn)
1093 self.assertEquals(str(res[0]["description"]), "test")
1094 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1095 self.assertEquals(str(res[0]["nextRid"]), "1001")
1096 self.assertEquals(str(res[0]["revision"]), "2")
1098 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1099 self.assertEquals(len(res), 1)
1100 self.assertEquals(str(res[0].dn), dn)
1101 self.assertTrue("description" not in res[0])
1102 self.assertTrue("badPwdCount" not in res[0])
1103 self.assertTrue("nextRid" not in res[0])
1104 self.assertEquals(str(res[0]["revision"]), "2")
1105 # Check in remote db
1106 dn2 = self.samba3.dn("cn=toast")
1107 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1108 attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1110 self.assertEquals(len(res), 1)
1111 self.assertEquals(str(res[0].dn), dn2)
1112 self.assertEquals(str(res[0]["description"]), "test")
1113 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1114 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1115 self.assertTrue("revision" not in res[0])
1117 # Delete split record
1119 # Check in mapped db
1120 res = self.ldb.search(dn, scope=SCOPE_BASE)
1121 self.assertEquals(len(res), 0)
1123 res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1124 self.assertEquals(len(res), 0)
1125 # Check in remote db
1126 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1127 self.assertEquals(len(res), 0)