3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE
28 from samba import Ldb, substitute_var
29 from samba.tests import TestCaseInTempDir, env_loadparm
30 import samba.dcerpc.security
32 from samba.auth import system_session
33 from operator import attrgetter
36 def read_datafile(filename):
37 paths = [ "../../../../../testdata/samba3",
38 "../../../../testdata/samba3" ]
40 datadir = os.path.join(os.path.dirname(__file__), p)
41 if os.path.exists(datadir):
43 return open(os.path.join(datadir, filename), 'r').read()
45 def ldb_debug(l, text):
49 class MapBaseTestCase(TestCaseInTempDir):
50 """Base test case for mapping tests."""
52 def setup_modules(self, ldb, s3, s4):
53 ldb.add({"dn": "@MAP=samba3sam",
55 "@TO": "sambaDomainName=TESTS," + s3.basedn})
57 ldb.add({"dn": "@MODULES",
58 "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,partition"})
60 ldb.add({"dn": "@PARTITION",
61 "partition": ["%s" % (s4.basedn_casefold),
62 "%s" % (s3.basedn_casefold)],
63 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
67 self.lp = env_loadparm()
68 self.lp.set("workgroup", "TESTS")
69 self.lp.set("netbios name", "TESTS")
70 super(MapBaseTestCase, self).setUp()
72 def make_dn(basedn, rdn):
73 return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
75 def make_s4dn(basedn, rdn):
76 return "%s,%s" % (rdn, basedn)
78 self.ldbfile = os.path.join(self.tempdir, "test.ldb")
79 self.ldburl = "tdb://" + self.ldbfile
81 tempdir = self.tempdir
84 """Simple helper class that contains data for a specific SAM
87 def __init__(self, basedn, dn, lp):
88 self.db = Ldb(lp=lp, session_info=system_session())
89 self.db.set_opaque("skip_allocate_sids", "true");
91 self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold()
92 self.substvars = {"BASEDN": self.basedn}
93 self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold)
94 self.url = "tdb://" + self.file
98 return self._dn(self.basedn, rdn)
101 return self.db.connect(self.url)
103 def setup_data(self, path):
104 self.add_ldif(read_datafile(path))
106 def subst(self, text):
107 return substitute_var(text, self.substvars)
109 def add_ldif(self, ldif):
110 self.db.add_ldif(self.subst(ldif))
112 def modify_ldif(self, ldif):
113 self.db.modify_ldif(self.subst(ldif))
115 self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn, self.lp)
116 self.samba3 = Target("cn=Samba3Sam", make_dn, self.lp)
118 self.samba3.connect()
119 self.samba4.connect()
122 os.unlink(self.ldbfile)
123 os.unlink(self.samba3.file)
124 os.unlink(self.samba4.file)
125 super(MapBaseTestCase, self).tearDown()
127 def assertSidEquals(self, text, ndr_sid):
128 sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
130 sid_obj2 = samba.dcerpc.security.dom_sid(text)
131 self.assertEquals(sid_obj1, sid_obj2)
134 class Samba3SamTestCase(MapBaseTestCase):
137 super(Samba3SamTestCase, self).setUp()
138 ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
139 ldb.set_opaque("skip_allocate_sids", "true");
140 self.samba3.setup_data("samba3.ldif")
141 ldif = read_datafile("provision_samba3sam.ldif")
142 ldb.add_ldif(self.samba4.subst(ldif))
143 self.setup_modules(ldb, self.samba3, self.samba4)
145 self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
146 self.ldb.set_opaque("skip_allocate_sids", "true");
148 def test_search_non_mapped(self):
149 """Looking up by non-mapped attribute"""
150 msg = self.ldb.search(expression="(cn=Administrator)")
151 self.assertEquals(len(msg), 1)
152 self.assertEquals(msg[0]["cn"], "Administrator")
154 def test_search_non_mapped(self):
155 """Looking up by mapped attribute"""
156 msg = self.ldb.search(expression="(name=Backup Operators)")
157 self.assertEquals(len(msg), 1)
158 self.assertEquals(str(msg[0]["name"]), "Backup Operators")
160 def test_old_name_of_renamed(self):
161 """Looking up by old name of renamed attribute"""
162 msg = self.ldb.search(expression="(displayName=Backup Operators)")
163 self.assertEquals(len(msg), 0)
165 def test_mapped_containing_sid(self):
166 """Looking up mapped entry containing SID"""
167 msg = self.ldb.search(expression="(cn=Replicator)")
168 self.assertEquals(len(msg), 1)
169 self.assertEquals(str(msg[0].dn),
170 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
171 self.assertTrue("objectSid" in msg[0])
172 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
174 oc = set(msg[0]["objectClass"])
175 self.assertEquals(oc, set(["group"]))
177 def test_search_by_objclass(self):
178 """Looking up by objectClass"""
179 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
180 self.assertEquals(set([str(m.dn) for m in msg]),
181 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
182 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
184 def test_s3sam_modify(self):
185 # Adding a record that will be fallbacked
186 self.ldb.add({"dn": "cn=Foo",
190 "showInAdvancedViewOnly": "TRUE"}
193 # Checking for existence of record (local)
194 # TODO: This record must be searched in the local database, which is
195 # currently only supported for base searches
196 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
197 # TODO: Actually, this version should work as well but doesn't...
200 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
202 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
203 self.assertEquals(len(msg), 1)
204 self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
205 self.assertEquals(str(msg[0]["foo"]), "bar")
206 self.assertEquals(str(msg[0]["blah"]), "Blie")
208 # Adding record that will be mapped
209 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
210 "objectClass": "user",
212 "sambaUnicodePwd": "geheim",
215 # Checking for existence of record (remote)
216 msg = self.ldb.search(expression="(unixName=bin)",
217 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
218 self.assertEquals(len(msg), 1)
219 self.assertEquals(str(msg[0]["cn"]), "Niemand")
220 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
222 # Checking for existence of record (local && remote)
223 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
224 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
225 self.assertEquals(len(msg), 1) # TODO: should check with more records
226 self.assertEquals(str(msg[0]["cn"]), "Niemand")
227 self.assertEquals(str(msg[0]["unixName"]), "bin")
228 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
230 # Checking for existence of record (local || remote)
231 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
232 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
233 #print "got %d replies" % len(msg)
234 self.assertEquals(len(msg), 1) # TODO: should check with more records
235 self.assertEquals(str(msg[0]["cn"]), "Niemand")
236 self.assertEquals(str(msg[0]["unixName"]), "bin")
237 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
239 # Checking for data in destination database
240 msg = self.samba3.db.search(expression="(cn=Niemand)")
241 self.assertTrue(len(msg) >= 1)
242 self.assertEquals(str(msg[0]["sambaSID"]),
243 "S-1-5-21-4231626423-2410014848-2360679739-2001")
244 self.assertEquals(str(msg[0]["displayName"]), "Niemand")
246 # Adding attribute...
247 self.ldb.modify_ldif("""
248 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
254 # Checking whether changes are still there...
255 msg = self.ldb.search(expression="(cn=Niemand)")
256 self.assertTrue(len(msg) >= 1)
257 self.assertEquals(str(msg[0]["cn"]), "Niemand")
258 self.assertEquals(str(msg[0]["description"]), "Blah")
260 # Modifying attribute...
261 self.ldb.modify_ldif("""
262 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
268 # Checking whether changes are still there...
269 msg = self.ldb.search(expression="(cn=Niemand)")
270 self.assertTrue(len(msg) >= 1)
271 self.assertEquals(str(msg[0]["description"]), "Blie")
273 # Deleting attribute...
274 self.ldb.modify_ldif("""
275 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
280 # Checking whether changes are no longer there...
281 msg = self.ldb.search(expression="(cn=Niemand)")
282 self.assertTrue(len(msg) >= 1)
283 self.assertTrue(not "description" in msg[0])
286 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
287 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
289 # Checking whether DN has changed...
290 msg = self.ldb.search(expression="(cn=Niemand2)")
291 self.assertEquals(len(msg), 1)
292 self.assertEquals(str(msg[0].dn),
293 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
296 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
298 # Checking whether record is gone...
299 msg = self.ldb.search(expression="(cn=Niemand2)")
300 self.assertEquals(len(msg), 0)
303 class MapTestCase(MapBaseTestCase):
306 super(MapTestCase, self).setUp()
307 ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
308 ldb.set_opaque("skip_allocate_sids", "true");
309 ldif = read_datafile("provision_samba3sam.ldif")
310 ldb.add_ldif(self.samba4.subst(ldif))
311 self.setup_modules(ldb, self.samba3, self.samba4)
313 self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
314 self.ldb.set_opaque("skip_allocate_sids", "true");
316 def test_map_search(self):
317 """Running search tests on mapped data."""
319 "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
320 "objectclass": ["sambaDomain", "top"],
321 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
322 "sambaNextRid": "2000",
323 "sambaDomainName": "TESTS"
326 # Add a set of split records
327 self.ldb.add_ldif("""
328 dn: """+ self.samba4.dn("cn=Domain Users") + """
331 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
334 # Add a set of split records
335 self.ldb.add_ldif("""
336 dn: """+ self.samba4.dn("cn=X") + """
345 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
349 "dn": self.samba4.dn("cn=Y"),
350 "objectClass": "top",
360 "dn": self.samba4.dn("cn=Z"),
361 "objectClass": "top",
370 # Add a set of remote records
373 "dn": self.samba3.dn("cn=A"),
374 "objectClass": "posixAccount",
377 "sambaBadPasswordCount": "x",
378 "sambaLogonTime": "x",
380 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
381 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
384 "dn": self.samba3.dn("cn=B"),
385 "objectClass": "top",
388 "sambaBadPasswordCount": "x",
389 "sambaLogonTime": "y",
393 "dn": self.samba3.dn("cn=C"),
394 "objectClass": "top",
397 "sambaBadPasswordCount": "y",
398 "sambaLogonTime": "z",
401 # Testing search by DN
403 # Search remote record by local DN
404 dn = self.samba4.dn("cn=A")
405 res = self.ldb.search(dn, scope=SCOPE_BASE,
406 attrs=["dnsHostName", "lastLogon"])
407 self.assertEquals(len(res), 1)
408 self.assertEquals(str(res[0].dn), dn)
409 self.assertTrue(not "dnsHostName" in res[0])
410 self.assertEquals(str(res[0]["lastLogon"]), "x")
412 # Search remote record by remote DN
413 dn = self.samba3.dn("cn=A")
414 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
415 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
416 self.assertEquals(len(res), 1)
417 self.assertEquals(str(res[0].dn), dn)
418 self.assertTrue(not "dnsHostName" in res[0])
419 self.assertTrue(not "lastLogon" in res[0])
420 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
422 # Search split record by local DN
423 dn = self.samba4.dn("cn=X")
424 res = self.ldb.search(dn, scope=SCOPE_BASE,
425 attrs=["dnsHostName", "lastLogon"])
426 self.assertEquals(len(res), 1)
427 self.assertEquals(str(res[0].dn), dn)
428 self.assertEquals(str(res[0]["dnsHostName"]), "x")
429 self.assertEquals(str(res[0]["lastLogon"]), "x")
431 # Search split record by remote DN
432 dn = self.samba3.dn("cn=X")
433 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
434 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
435 self.assertEquals(len(res), 1)
436 self.assertEquals(str(res[0].dn), dn)
437 self.assertTrue(not "dnsHostName" in res[0])
438 self.assertTrue(not "lastLogon" in res[0])
439 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
441 # Testing search by attribute
443 # Search by ignored attribute
444 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
445 attrs=["dnsHostName", "lastLogon"])
446 self.assertEquals(len(res), 2)
447 res = sorted(res, key=attrgetter('dn'))
448 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
449 self.assertEquals(str(res[0]["dnsHostName"]), "x")
450 self.assertEquals(str(res[0]["lastLogon"]), "x")
451 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
452 self.assertEquals(str(res[1]["dnsHostName"]), "y")
453 self.assertEquals(str(res[1]["lastLogon"]), "y")
455 # Search by kept attribute
456 res = self.ldb.search(expression="(description=y)",
457 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
458 self.assertEquals(len(res), 2)
459 res = sorted(res, key=attrgetter('dn'))
460 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
461 self.assertTrue(not "dnsHostName" in res[0])
462 self.assertEquals(str(res[0]["lastLogon"]), "z")
463 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
464 self.assertEquals(str(res[1]["dnsHostName"]), "z")
465 self.assertEquals(str(res[1]["lastLogon"]), "z")
467 # Search by renamed attribute
468 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
469 attrs=["dnsHostName", "lastLogon"])
470 self.assertEquals(len(res), 2)
471 res = sorted(res, key=attrgetter('dn'))
472 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
473 self.assertTrue(not "dnsHostName" in res[0])
474 self.assertEquals(str(res[0]["lastLogon"]), "x")
475 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
476 self.assertTrue(not "dnsHostName" in res[1])
477 self.assertEquals(str(res[1]["lastLogon"]), "y")
479 # Search by converted attribute
481 # Using the SID directly in the parse tree leads to conversion
482 # errors, letting the search fail with no results.
483 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
484 res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
485 self.assertEquals(len(res), 4)
486 res = sorted(res, key=attrgetter('dn'))
487 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
488 self.assertEquals(str(res[1]["dnsHostName"]), "x")
489 self.assertEquals(str(res[1]["lastLogon"]), "x")
490 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
492 self.assertTrue("objectSid" in res[1])
493 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
494 self.assertTrue(not "dnsHostName" in res[0])
495 self.assertEquals(str(res[0]["lastLogon"]), "x")
496 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
498 self.assertTrue("objectSid" in res[0])
500 # Search by generated attribute
501 # In most cases, this even works when the mapping is missing
502 # a `convert_operator' by enumerating the remote db.
503 res = self.ldb.search(expression="(primaryGroupID=512)",
504 attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
505 self.assertEquals(len(res), 1)
506 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
507 self.assertTrue(not "dnsHostName" in res[0])
508 self.assertEquals(str(res[0]["lastLogon"]), "x")
509 self.assertEquals(str(res[0]["primaryGroupID"]), "512")
511 # Note that Xs "objectSid" seems to be fine in the previous search for
513 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
514 #print len(res) + " results found"
515 #for i in range(len(res)):
516 # for (obj in res[i]) {
517 # print obj + ": " + res[i][obj]
522 # Search by remote name of renamed attribute */
523 res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
524 attrs=["dnsHostName", "lastLogon"])
525 self.assertEquals(len(res), 0)
527 # Search by objectClass
528 attrs = ["dnsHostName", "lastLogon", "objectClass"]
529 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
530 self.assertEquals(len(res), 2)
531 res = sorted(res, key=attrgetter('dn'))
532 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
533 self.assertTrue(not "dnsHostName" in res[0])
534 self.assertEquals(str(res[0]["lastLogon"]), "x")
535 self.assertEquals(str(res[0]["objectClass"][0]), "user")
536 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
537 self.assertEquals(str(res[1]["dnsHostName"]), "x")
538 self.assertEquals(str(res[1]["lastLogon"]), "x")
539 self.assertEquals(str(res[1]["objectClass"][0]), "user")
541 # Prove that the objectClass is actually used for the search
542 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
544 self.assertEquals(len(res), 3)
545 res = sorted(res, key=attrgetter('dn'))
546 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
547 self.assertTrue(not "dnsHostName" in res[0])
548 self.assertEquals(str(res[0]["lastLogon"]), "x")
549 self.assertEquals(res[0]["objectClass"][0], "user")
550 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
551 self.assertTrue(not "dnsHostName" in res[1])
552 self.assertEquals(str(res[1]["lastLogon"]), "y")
553 self.assertEquals(set(res[1]["objectClass"]), set(["top"]))
554 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
555 self.assertEquals(str(res[2]["dnsHostName"]), "x")
556 self.assertEquals(str(res[2]["lastLogon"]), "x")
557 self.assertEquals(str(res[2]["objectClass"][0]), "user")
559 # Testing search by parse tree
561 # Search by conjunction of local attributes
562 res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
563 attrs=["dnsHostName", "lastLogon"])
564 self.assertEquals(len(res), 2)
565 res = sorted(res, key=attrgetter('dn'))
566 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
567 self.assertEquals(str(res[0]["dnsHostName"]), "x")
568 self.assertEquals(str(res[0]["lastLogon"]), "x")
569 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
570 self.assertEquals(str(res[1]["dnsHostName"]), "y")
571 self.assertEquals(str(res[1]["lastLogon"]), "y")
573 # Search by conjunction of remote attributes
574 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
575 attrs=["dnsHostName", "lastLogon"])
576 self.assertEquals(len(res), 2)
577 res = sorted(res, key=attrgetter('dn'))
578 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
579 self.assertTrue(not "dnsHostName" in res[0])
580 self.assertEquals(str(res[0]["lastLogon"]), "x")
581 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
582 self.assertEquals(str(res[1]["dnsHostName"]), "x")
583 self.assertEquals(str(res[1]["lastLogon"]), "x")
585 # Search by conjunction of local and remote attribute
586 res = self.ldb.search(expression="(&(codePage=x)(description=x))",
587 attrs=["dnsHostName", "lastLogon"])
588 self.assertEquals(len(res), 2)
589 res = sorted(res, key=attrgetter('dn'))
590 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
591 self.assertEquals(str(res[0]["dnsHostName"]), "x")
592 self.assertEquals(str(res[0]["lastLogon"]), "x")
593 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
594 self.assertEquals(str(res[1]["dnsHostName"]), "y")
595 self.assertEquals(str(res[1]["lastLogon"]), "y")
597 # Search by conjunction of local and remote attribute w/o match
598 attrs = ["dnsHostName", "lastLogon"]
599 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
601 self.assertEquals(len(res), 0)
602 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
604 self.assertEquals(len(res), 0)
606 # Search by disjunction of local attributes
607 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
608 attrs=["dnsHostName", "lastLogon"])
609 self.assertEquals(len(res), 2)
610 res = sorted(res, key=attrgetter('dn'))
611 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
612 self.assertEquals(str(res[0]["dnsHostName"]), "x")
613 self.assertEquals(str(res[0]["lastLogon"]), "x")
614 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
615 self.assertEquals(str(res[1]["dnsHostName"]), "y")
616 self.assertEquals(str(res[1]["lastLogon"]), "y")
618 # Search by disjunction of remote attributes
619 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
620 attrs=["dnsHostName", "lastLogon"])
621 self.assertEquals(len(res), 3)
622 res = sorted(res, key=attrgetter('dn'))
623 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
624 self.assertFalse("dnsHostName" in res[0])
625 self.assertEquals(str(res[0]["lastLogon"]), "x")
626 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
627 self.assertFalse("dnsHostName" in res[1])
628 self.assertEquals(str(res[1]["lastLogon"]), "y")
629 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
630 self.assertEquals(str(res[2]["dnsHostName"]), "x")
631 self.assertEquals(str(res[2]["lastLogon"]), "x")
633 # Search by disjunction of local and remote attribute
634 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
635 attrs=["dnsHostName", "lastLogon"])
636 self.assertEquals(len(res), 3)
637 res = sorted(res, key=attrgetter('dn'))
638 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
639 self.assertFalse("dnsHostName" in res[0])
640 self.assertEquals(str(res[0]["lastLogon"]), "y")
641 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
642 self.assertEquals(str(res[1]["dnsHostName"]), "x")
643 self.assertEquals(str(res[1]["lastLogon"]), "x")
644 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
645 self.assertEquals(str(res[2]["dnsHostName"]), "y")
646 self.assertEquals(str(res[2]["lastLogon"]), "y")
648 # Search by disjunction of local and remote attribute w/o match
649 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
650 attrs=["dnsHostName", "lastLogon"])
651 self.assertEquals(len(res), 0)
653 # Search by negated local attribute
654 res = self.ldb.search(expression="(!(revision=x))",
655 attrs=["dnsHostName", "lastLogon"])
656 self.assertEquals(len(res), 6)
657 res = sorted(res, key=attrgetter('dn'))
658 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
659 self.assertTrue(not "dnsHostName" in res[0])
660 self.assertEquals(str(res[0]["lastLogon"]), "x")
661 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
662 self.assertTrue(not "dnsHostName" in res[1])
663 self.assertEquals(str(res[1]["lastLogon"]), "y")
664 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
665 self.assertTrue(not "dnsHostName" in res[2])
666 self.assertEquals(str(res[2]["lastLogon"]), "z")
667 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
668 self.assertEquals(str(res[3]["dnsHostName"]), "z")
669 self.assertEquals(str(res[3]["lastLogon"]), "z")
671 # Search by negated remote attribute
672 res = self.ldb.search(expression="(!(description=x))",
673 attrs=["dnsHostName", "lastLogon"])
674 self.assertEquals(len(res), 4)
675 res = sorted(res, key=attrgetter('dn'))
676 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
677 self.assertTrue(not "dnsHostName" in res[0])
678 self.assertEquals(str(res[0]["lastLogon"]), "z")
679 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
680 self.assertEquals(str(res[1]["dnsHostName"]), "z")
681 self.assertEquals(str(res[1]["lastLogon"]), "z")
683 # Search by negated conjunction of local attributes
684 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
685 attrs=["dnsHostName", "lastLogon"])
686 self.assertEquals(len(res), 6)
687 res = sorted(res, key=attrgetter('dn'))
688 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
689 self.assertTrue(not "dnsHostName" in res[0])
690 self.assertEquals(str(res[0]["lastLogon"]), "x")
691 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
692 self.assertTrue(not "dnsHostName" in res[1])
693 self.assertEquals(str(res[1]["lastLogon"]), "y")
694 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
695 self.assertTrue(not "dnsHostName" in res[2])
696 self.assertEquals(str(res[2]["lastLogon"]), "z")
697 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
698 self.assertEquals(str(res[3]["dnsHostName"]), "z")
699 self.assertEquals(str(res[3]["lastLogon"]), "z")
701 # Search by negated conjunction of remote attributes
702 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
703 attrs=["dnsHostName", "lastLogon"])
704 self.assertEquals(len(res), 6)
705 res = sorted(res, key=attrgetter('dn'))
706 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
707 self.assertTrue(not "dnsHostName" in res[0])
708 self.assertEquals(str(res[0]["lastLogon"]), "y")
709 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
710 self.assertTrue(not "dnsHostName" in res[1])
711 self.assertEquals(str(res[1]["lastLogon"]), "z")
712 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y"))
713 self.assertEquals(str(res[2]["dnsHostName"]), "y")
714 self.assertEquals(str(res[2]["lastLogon"]), "y")
715 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
716 self.assertEquals(str(res[3]["dnsHostName"]), "z")
717 self.assertEquals(str(res[3]["lastLogon"]), "z")
719 # Search by negated conjunction of local and remote attribute
720 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
721 attrs=["dnsHostName", "lastLogon"])
722 self.assertEquals(len(res), 6)
723 res = sorted(res, key=attrgetter('dn'))
724 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
725 self.assertTrue(not "dnsHostName" in res[0])
726 self.assertEquals(str(res[0]["lastLogon"]), "x")
727 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
728 self.assertTrue(not "dnsHostName" in res[1])
729 self.assertEquals(str(res[1]["lastLogon"]), "y")
730 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
731 self.assertTrue(not "dnsHostName" in res[2])
732 self.assertEquals(str(res[2]["lastLogon"]), "z")
733 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
734 self.assertEquals(str(res[3]["dnsHostName"]), "z")
735 self.assertEquals(str(res[3]["lastLogon"]), "z")
737 # Search by negated disjunction of local attributes
738 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
739 attrs=["dnsHostName", "lastLogon"])
740 res = sorted(res, key=attrgetter('dn'))
741 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
742 self.assertTrue(not "dnsHostName" in res[0])
743 self.assertEquals(str(res[0]["lastLogon"]), "x")
744 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
745 self.assertTrue(not "dnsHostName" in res[1])
746 self.assertEquals(str(res[1]["lastLogon"]), "y")
747 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
748 self.assertTrue(not "dnsHostName" in res[2])
749 self.assertEquals(str(res[2]["lastLogon"]), "z")
750 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
751 self.assertEquals(str(res[3]["dnsHostName"]), "z")
752 self.assertEquals(str(res[3]["lastLogon"]), "z")
754 # Search by negated disjunction of remote attributes
755 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
756 attrs=["dnsHostName", "lastLogon"])
757 self.assertEquals(len(res), 5)
758 res = sorted(res, key=attrgetter('dn'))
759 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C"))
760 self.assertTrue(not "dnsHostName" in res[0])
761 self.assertEquals(str(res[0]["lastLogon"]), "z")
762 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y"))
763 self.assertEquals(str(res[1]["dnsHostName"]), "y")
764 self.assertEquals(str(res[1]["lastLogon"]), "y")
765 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
766 self.assertEquals(str(res[2]["dnsHostName"]), "z")
767 self.assertEquals(str(res[2]["lastLogon"]), "z")
769 # Search by negated disjunction of local and remote attribute
770 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
771 attrs=["dnsHostName", "lastLogon"])
772 self.assertEquals(len(res), 5)
773 res = sorted(res, key=attrgetter('dn'))
774 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
775 self.assertTrue(not "dnsHostName" in res[0])
776 self.assertEquals(str(res[0]["lastLogon"]), "x")
777 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
778 self.assertTrue(not "dnsHostName" in res[1])
779 self.assertEquals(str(res[1]["lastLogon"]), "z")
780 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
781 self.assertEquals(str(res[2]["dnsHostName"]), "z")
782 self.assertEquals(str(res[2]["lastLogon"]), "z")
784 # Search by complex parse tree
785 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
786 self.assertEquals(len(res), 7)
787 res = sorted(res, key=attrgetter('dn'))
788 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
789 self.assertTrue(not "dnsHostName" in res[0])
790 self.assertEquals(str(res[0]["lastLogon"]), "x")
791 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
792 self.assertTrue(not "dnsHostName" in res[1])
793 self.assertEquals(str(res[1]["lastLogon"]), "y")
794 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
795 self.assertTrue(not "dnsHostName" in res[2])
796 self.assertEquals(str(res[2]["lastLogon"]), "z")
797 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=X"))
798 self.assertEquals(str(res[3]["dnsHostName"]), "x")
799 self.assertEquals(str(res[3]["lastLogon"]), "x")
800 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=Z"))
801 self.assertEquals(str(res[4]["dnsHostName"]), "z")
802 self.assertEquals(str(res[4]["lastLogon"]), "z")
805 dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
809 def test_map_modify_local(self):
810 """Modification of local records."""
812 dn = "cn=test,dc=idealx,dc=org"
813 self.ldb.add({"dn": dn,
817 "description": "test"})
819 attrs = ["foo", "revision", "description"]
820 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
821 self.assertEquals(len(res), 1)
822 self.assertEquals(str(res[0].dn), dn)
823 self.assertEquals(str(res[0]["foo"]), "bar")
824 self.assertEquals(str(res[0]["revision"]), "1")
825 self.assertEquals(str(res[0]["description"]), "test")
826 # Check it's not in the local db
827 res = self.samba4.db.search(expression="(cn=test)",
828 scope=SCOPE_DEFAULT, attrs=attrs)
829 self.assertEquals(len(res), 0)
830 # Check it's not in the remote db
831 res = self.samba3.db.search(expression="(cn=test)",
832 scope=SCOPE_DEFAULT, attrs=attrs)
833 self.assertEquals(len(res), 0)
835 # Modify local record
843 self.ldb.modify_ldif(ldif)
845 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
846 self.assertEquals(len(res), 1)
847 self.assertEquals(str(res[0].dn), dn)
848 self.assertEquals(str(res[0]["foo"]), "baz")
849 self.assertEquals(str(res[0]["revision"]), "1")
850 self.assertEquals(str(res[0]["description"]), "foo")
852 # Rename local record
853 dn2 = "cn=toast,dc=idealx,dc=org"
854 self.ldb.rename(dn, dn2)
856 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
857 self.assertEquals(len(res), 1)
858 self.assertEquals(str(res[0].dn), dn2)
859 self.assertEquals(str(res[0]["foo"]), "baz")
860 self.assertEquals(str(res[0]["revision"]), "1")
861 self.assertEquals(str(res[0]["description"]), "foo")
863 # Delete local record
866 res = self.ldb.search(dn2, scope=SCOPE_BASE)
867 self.assertEquals(len(res), 0)
869 def test_map_modify_remote_remote(self):
870 """Modification of remote data of remote records"""
872 dn = self.samba4.dn("cn=test")
873 dn2 = self.samba3.dn("cn=test")
874 self.samba3.db.add({"dn": dn2,
876 "description": "foo",
877 "sambaBadPasswordCount": "3",
878 "sambaNextRid": "1001"})
880 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
881 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
882 self.assertEquals(len(res), 1)
883 self.assertEquals(str(res[0].dn), dn2)
884 self.assertEquals(str(res[0]["description"]), "foo")
885 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
886 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
888 attrs = ["description", "badPwdCount", "nextRid"]
889 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
890 self.assertEquals(len(res), 1)
891 self.assertEquals(str(res[0].dn), dn)
892 self.assertEquals(str(res[0]["description"]), "foo")
893 self.assertEquals(str(res[0]["badPwdCount"]), "3")
894 self.assertEquals(str(res[0]["nextRid"]), "1001")
896 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
897 self.assertEquals(len(res), 0)
899 # Modify remote data of remote record
907 self.ldb.modify_ldif(ldif)
909 res = self.ldb.search(dn, scope=SCOPE_BASE,
910 attrs=["description", "badPwdCount", "nextRid"])
911 self.assertEquals(len(res), 1)
912 self.assertEquals(str(res[0].dn), dn)
913 self.assertEquals(str(res[0]["description"]), "test")
914 self.assertEquals(str(res[0]["badPwdCount"]), "4")
915 self.assertEquals(str(res[0]["nextRid"]), "1001")
917 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
918 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
919 self.assertEquals(len(res), 1)
920 self.assertEquals(str(res[0].dn), dn2)
921 self.assertEquals(str(res[0]["description"]), "test")
922 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
923 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
925 # Rename remote record
926 dn2 = self.samba4.dn("cn=toast")
927 self.ldb.rename(dn, dn2)
930 res = self.ldb.search(dn, scope=SCOPE_BASE,
931 attrs=["description", "badPwdCount", "nextRid"])
932 self.assertEquals(len(res), 1)
933 self.assertEquals(str(res[0].dn), dn)
934 self.assertEquals(str(res[0]["description"]), "test")
935 self.assertEquals(str(res[0]["badPwdCount"]), "4")
936 self.assertEquals(str(res[0]["nextRid"]), "1001")
938 dn2 = self.samba3.dn("cn=toast")
939 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
940 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
941 self.assertEquals(len(res), 1)
942 self.assertEquals(str(res[0].dn), dn2)
943 self.assertEquals(str(res[0]["description"]), "test")
944 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
945 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
947 # Delete remote record
949 # Check in mapped db that it's removed
950 res = self.ldb.search(dn, scope=SCOPE_BASE)
951 self.assertEquals(len(res), 0)
953 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
954 self.assertEquals(len(res), 0)
956 def test_map_modify_remote_local(self):
957 """Modification of local data of remote records"""
958 # Add remote record (same as before)
959 dn = self.samba4.dn("cn=test")
960 dn2 = self.samba3.dn("cn=test")
961 self.samba3.db.add({"dn": dn2,
963 "description": "foo",
964 "sambaBadPasswordCount": "3",
965 "sambaNextRid": "1001"})
967 # Modify local data of remote record
976 self.ldb.modify_ldif(ldif)
978 attrs = ["revision", "description"]
979 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
980 self.assertEquals(len(res), 1)
981 self.assertEquals(str(res[0].dn), dn)
982 self.assertEquals(str(res[0]["description"]), "test")
983 self.assertEquals(str(res[0]["revision"]), "1")
985 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
986 self.assertEquals(len(res), 1)
987 self.assertEquals(str(res[0].dn), dn2)
988 self.assertEquals(str(res[0]["description"]), "test")
989 self.assertTrue(not "revision" in res[0])
991 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
992 self.assertEquals(len(res), 1)
993 self.assertEquals(str(res[0].dn), dn)
994 self.assertTrue(not "description" in res[0])
995 self.assertEquals(str(res[0]["revision"]), "1")
997 # Delete (newly) split record
1000 def test_map_modify_split(self):
1001 """Testing modification of split records"""
1003 dn = self.samba4.dn("cn=test")
1004 dn2 = self.samba3.dn("cn=test")
1008 "description": "foo",
1013 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1014 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1015 self.assertEquals(len(res), 1)
1016 self.assertEquals(str(res[0].dn), dn)
1017 self.assertEquals(str(res[0]["description"]), "foo")
1018 self.assertEquals(str(res[0]["badPwdCount"]), "3")
1019 self.assertEquals(str(res[0]["nextRid"]), "1001")
1020 self.assertEquals(str(res[0]["revision"]), "1")
1022 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1023 self.assertEquals(len(res), 1)
1024 self.assertEquals(str(res[0].dn), dn)
1025 self.assertTrue(not "description" in res[0])
1026 self.assertTrue(not "badPwdCount" in res[0])
1027 self.assertTrue(not "nextRid" in res[0])
1028 self.assertEquals(str(res[0]["revision"]), "1")
1029 # Check in remote db
1030 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1032 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1033 self.assertEquals(len(res), 1)
1034 self.assertEquals(str(res[0].dn), dn2)
1035 self.assertEquals(str(res[0]["description"]), "foo")
1036 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1037 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1038 self.assertTrue(not "revision" in res[0])
1040 # Modify of split record
1043 replace: description
1045 replace: badPwdCount
1050 self.ldb.modify_ldif(ldif)
1051 # Check in mapped db
1052 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1053 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1054 self.assertEquals(len(res), 1)
1055 self.assertEquals(str(res[0].dn), dn)
1056 self.assertEquals(str(res[0]["description"]), "test")
1057 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1058 self.assertEquals(str(res[0]["nextRid"]), "1001")
1059 self.assertEquals(str(res[0]["revision"]), "2")
1061 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1062 self.assertEquals(len(res), 1)
1063 self.assertEquals(str(res[0].dn), dn)
1064 self.assertTrue(not "description" in res[0])
1065 self.assertTrue(not "badPwdCount" in res[0])
1066 self.assertTrue(not "nextRid" in res[0])
1067 self.assertEquals(str(res[0]["revision"]), "2")
1068 # Check in remote db
1069 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1071 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1072 self.assertEquals(len(res), 1)
1073 self.assertEquals(str(res[0].dn), dn2)
1074 self.assertEquals(str(res[0]["description"]), "test")
1075 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1076 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1077 self.assertTrue(not "revision" in res[0])
1079 # Rename split record
1080 dn2 = self.samba4.dn("cn=toast")
1081 self.ldb.rename(dn, dn2)
1082 # Check in mapped db
1084 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1085 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1086 self.assertEquals(len(res), 1)
1087 self.assertEquals(str(res[0].dn), dn)
1088 self.assertEquals(str(res[0]["description"]), "test")
1089 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1090 self.assertEquals(str(res[0]["nextRid"]), "1001")
1091 self.assertEquals(str(res[0]["revision"]), "2")
1093 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1094 self.assertEquals(len(res), 1)
1095 self.assertEquals(str(res[0].dn), dn)
1096 self.assertTrue(not "description" in res[0])
1097 self.assertTrue(not "badPwdCount" in res[0])
1098 self.assertTrue(not "nextRid" in res[0])
1099 self.assertEquals(str(res[0]["revision"]), "2")
1100 # Check in remote db
1101 dn2 = self.samba3.dn("cn=toast")
1102 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1103 attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1105 self.assertEquals(len(res), 1)
1106 self.assertEquals(str(res[0].dn), dn2)
1107 self.assertEquals(str(res[0]["description"]), "test")
1108 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1109 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1110 self.assertTrue(not "revision" in res[0])
1112 # Delete split record
1114 # Check in mapped db
1115 res = self.ldb.search(dn, scope=SCOPE_BASE)
1116 self.assertEquals(len(res), 0)
1118 res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1119 self.assertEquals(len(res), 0)
1120 # Check in remote db
1121 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1122 self.assertEquals(len(res), 0)