3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm
30 import samba.dcerpc.dom_sid
34 datadir = os.path.join(os.path.dirname(__file__),
35 "../../../../../testdata/samba3")
37 def read_datafile(filename):
38 return open(os.path.join(datadir, filename), 'r').read()
40 def ldb_debug(l, text):
44 class MapBaseTestCase(TestCaseInTempDir):
45 """Base test case for mapping tests."""
47 def setup_modules(self, ldb, s3, s4):
48 ldb.add({"dn": "@MAP=samba3sam",
50 "@TO": "sambaDomainName=TESTS," + s3.basedn})
52 ldb.add({"dn": "@MODULES",
53 "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
55 ldb.add({"dn": "@PARTITION",
56 "partition": ["%s:%s" % (s4.basedn, s4.url),
57 "%s:%s" % (s3.basedn, s3.url)],
58 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
61 super(MapBaseTestCase, self).setUp()
63 def make_dn(basedn, rdn):
64 return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
66 def make_s4dn(basedn, rdn):
67 return "%s,%s" % (rdn, basedn)
69 self.ldbfile = os.path.join(self.tempdir, "test.ldb")
70 self.ldburl = "tdb://" + self.ldbfile
72 tempdir = self.tempdir
75 """Simple helper class that contains data for a specific SAM
77 def __init__(self, file, basedn, dn):
78 self.file = os.path.join(tempdir, file)
79 self.url = "tdb://" + self.file
81 self.substvars = {"BASEDN": self.basedn}
82 self.db = Ldb(lp=cmdline_loadparm)
86 return self._dn(self.basedn, rdn)
89 return self.db.connect(self.url)
91 def setup_data(self, path):
92 self.add_ldif(read_datafile(path))
94 def subst(self, text):
95 return substitute_var(text, self.substvars)
97 def add_ldif(self, ldif):
98 self.db.add_ldif(self.subst(ldif))
100 def modify_ldif(self, ldif):
101 self.db.modify_ldif(self.subst(ldif))
103 self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
104 self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
105 self.templates = Target("templates.ldb", "cn=templates", None)
107 self.samba3.connect()
108 self.templates.connect()
109 self.samba4.connect()
112 os.unlink(self.ldbfile)
113 os.unlink(self.samba3.file)
114 os.unlink(self.templates.file)
115 os.unlink(self.samba4.file)
116 super(MapBaseTestCase, self).tearDown()
118 def assertSidEquals(self, text, ndr_sid):
119 sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.dom_sid.dom_sid,
121 sid_obj2 = samba.security.Sid(text)
122 # For now, this is the only way we can compare these since the
123 # classes are in different places. Should reconcile that at some point.
124 self.assertEquals(sid_obj1.sid_rev_num, sid_obj2.sid_rev_num)
125 self.assertEquals(sid_obj1.num_auths, sid_obj2.num_auths)
126 # FIXME: self.assertEquals(sid_obj1.id_auth, sid_obj2.id_auth)
127 # FIXME: self.assertEquals(sid_obj1.sub_auths[:sid_obj1.num_auths],
128 # sid_obj2.sub_auths[:sid_obj2.num_auths])
131 class Samba3SamTestCase(MapBaseTestCase):
134 super(Samba3SamTestCase, self).setUp()
135 ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
136 self.samba3.setup_data("samba3.ldif")
137 self.templates.setup_data("provision_samba3sam_templates.ldif")
138 ldif = read_datafile("provision_samba3sam.ldif")
139 ldb.add_ldif(self.samba4.subst(ldif))
140 self.setup_modules(ldb, self.samba3, self.samba4)
142 self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
144 def test_search_non_mapped(self):
145 """Looking up by non-mapped attribute"""
146 msg = self.ldb.search(expression="(cn=Administrator)")
147 self.assertEquals(len(msg), 1)
148 self.assertEquals(msg[0]["cn"], "Administrator")
150 def test_search_non_mapped(self):
151 """Looking up by mapped attribute"""
152 msg = self.ldb.search(expression="(name=Backup Operators)")
153 self.assertEquals(len(msg), 1)
154 self.assertEquals(msg[0]["name"], "Backup Operators")
156 def test_old_name_of_renamed(self):
157 """Looking up by old name of renamed attribute"""
158 msg = self.ldb.search(expression="(displayName=Backup Operators)")
159 self.assertEquals(len(msg), 0)
161 def test_mapped_containing_sid(self):
162 """Looking up mapped entry containing SID"""
163 msg = self.ldb.search(expression="(cn=Replicator)")
164 self.assertEquals(len(msg), 1)
165 self.assertEquals(str(msg[0].dn),
166 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
167 self.assertTrue("objectSid" in msg[0])
168 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
170 oc = set(msg[0]["objectClass"])
171 self.assertEquals(oc, set(["group"]))
173 def test_search_by_objclass(self):
174 """Looking up by objectClass"""
175 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
176 self.assertEquals(set([str(m.dn) for m in msg]),
177 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
178 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
180 def test_s3sam_modify(self):
181 # Adding a record that will be fallbacked
182 self.ldb.add({"dn": "cn=Foo",
186 "showInAdvancedViewOnly": "TRUE"}
189 # Checking for existence of record (local)
190 # TODO: This record must be searched in the local database, which is
191 # currently only supported for base searches
192 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
193 # TODO: Actually, this version should work as well but doesn't...
196 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
198 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
199 self.assertEquals(len(msg), 1)
200 self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
201 self.assertEquals(msg[0]["foo"], "bar")
202 self.assertEquals(msg[0]["blah"], "Blie")
204 # Adding record that will be mapped
205 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
206 "objectClass": "user",
208 "sambaUnicodePwd": "geheim",
211 # Checking for existence of record (remote)
212 msg = self.ldb.search(expression="(unixName=bin)",
213 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
214 self.assertEquals(len(msg), 1)
215 self.assertEquals(msg[0]["cn"], "Niemand")
216 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
218 # Checking for existence of record (local && remote)
219 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
220 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
221 self.assertEquals(len(msg), 1) # TODO: should check with more records
222 self.assertEquals(msg[0]["cn"], "Niemand")
223 self.assertEquals(msg[0]["unixName"], "bin")
224 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
226 # Checking for existence of record (local || remote)
227 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
228 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
229 #print "got %d replies" % len(msg)
230 self.assertEquals(len(msg), 1) # TODO: should check with more records
231 self.assertEquals(msg[0]["cn"], "Niemand")
232 self.assertEquals(msg[0]["unixName"], "bin")
233 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
235 # Checking for data in destination database
236 msg = self.samba3.db.search(expression="(cn=Niemand)")
237 self.assertTrue(len(msg) >= 1)
238 self.assertEquals(msg[0]["sambaSID"],
239 "S-1-5-21-4231626423-2410014848-2360679739-2001")
240 self.assertEquals(msg[0]["displayName"], "Niemand")
242 # Adding attribute...
243 self.ldb.modify_ldif("""
244 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
250 # Checking whether changes are still there...
251 msg = self.ldb.search(expression="(cn=Niemand)")
252 self.assertTrue(len(msg) >= 1)
253 self.assertEquals(msg[0]["cn"], "Niemand")
254 self.assertEquals(msg[0]["description"], "Blah")
256 # Modifying attribute...
257 self.ldb.modify_ldif("""
258 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
264 # Checking whether changes are still there...
265 msg = self.ldb.search(expression="(cn=Niemand)")
266 self.assertTrue(len(msg) >= 1)
267 self.assertEquals(msg[0]["description"], "Blie")
269 # Deleting attribute...
270 self.ldb.modify_ldif("""
271 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
276 # Checking whether changes are no longer there...
277 msg = self.ldb.search(expression="(cn=Niemand)")
278 self.assertTrue(len(msg) >= 1)
279 self.assertTrue(not "description" in msg[0])
282 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
283 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
285 # Checking whether DN has changed...
286 msg = self.ldb.search(expression="(cn=Niemand2)")
287 self.assertEquals(len(msg), 1)
288 self.assertEquals(str(msg[0].dn),
289 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
292 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
294 # Checking whether record is gone...
295 msg = self.ldb.search(expression="(cn=Niemand2)")
296 self.assertEquals(len(msg), 0)
299 class MapTestCase(MapBaseTestCase):
302 super(MapTestCase, self).setUp()
303 ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
304 self.templates.setup_data("provision_samba3sam_templates.ldif")
305 ldif = read_datafile("provision_samba3sam.ldif")
306 ldb.add_ldif(self.samba4.subst(ldif))
307 self.setup_modules(ldb, self.samba3, self.samba4)
309 self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
311 def test_map_search(self):
312 """Running search tests on mapped data."""
314 "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
315 "objectclass": ["sambaDomain", "top"],
316 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
317 "sambaNextRid": "2000",
318 "sambaDomainName": "TESTS"
321 # Add a set of split records
322 self.ldb.add_ldif("""
323 dn: """+ self.samba4.dn("cn=X") + """
332 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
333 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
338 "dn": self.samba4.dn("cn=Y"),
339 "objectClass": "top",
349 "dn": self.samba4.dn("cn=Z"),
350 "objectClass": "top",
359 # Add a set of remote records
362 "dn": self.samba3.dn("cn=A"),
363 "objectClass": "posixAccount",
366 "sambaBadPasswordCount": "x",
367 "sambaLogonTime": "x",
369 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
370 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
373 "dn": self.samba3.dn("cn=B"),
374 "objectClass": "top",
377 "sambaBadPasswordCount": "x",
378 "sambaLogonTime": "y",
382 "dn": self.samba3.dn("cn=C"),
383 "objectClass": "top",
386 "sambaBadPasswordCount": "y",
387 "sambaLogonTime": "z",
390 # Testing search by DN
392 # Search remote record by local DN
393 dn = self.samba4.dn("cn=A")
394 res = self.ldb.search(dn, scope=SCOPE_BASE,
395 attrs=["dnsHostName", "lastLogon"])
396 self.assertEquals(len(res), 1)
397 self.assertEquals(str(res[0].dn), dn)
398 self.assertTrue(not "dnsHostName" in res[0])
399 self.assertEquals(res[0]["lastLogon"], "x")
401 # Search remote record by remote DN
402 dn = self.samba3.dn("cn=A")
403 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
404 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
405 self.assertEquals(len(res), 1)
406 self.assertEquals(str(res[0].dn), dn)
407 self.assertTrue(not "dnsHostName" in res[0])
408 self.assertTrue(not "lastLogon" in res[0])
409 self.assertEquals(res[0]["sambaLogonTime"], "x")
411 # Search split record by local DN
412 dn = self.samba4.dn("cn=X")
413 res = self.ldb.search(dn, scope=SCOPE_BASE,
414 attrs=["dnsHostName", "lastLogon"])
415 self.assertEquals(len(res), 1)
416 self.assertEquals(str(res[0].dn), dn)
417 self.assertEquals(res[0]["dnsHostName"], "x")
418 self.assertEquals(res[0]["lastLogon"], "x")
420 # Search split record by remote DN
421 dn = self.samba3.dn("cn=X")
422 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
423 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
424 self.assertEquals(len(res), 1)
425 self.assertEquals(str(res[0].dn), dn)
426 self.assertTrue(not "dnsHostName" in res[0])
427 self.assertTrue(not "lastLogon" in res[0])
428 self.assertEquals(res[0]["sambaLogonTime"], "x")
430 # Testing search by attribute
432 # Search by ignored attribute
433 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
434 attrs=["dnsHostName", "lastLogon"])
435 self.assertEquals(len(res), 2)
436 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
437 self.assertEquals(res[0]["dnsHostName"], "y")
438 self.assertEquals(res[0]["lastLogon"], "y")
439 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
440 self.assertEquals(res[1]["dnsHostName"], "x")
441 self.assertEquals(res[1]["lastLogon"], "x")
443 # Search by kept attribute
444 res = self.ldb.search(expression="(description=y)",
445 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
446 self.assertEquals(len(res), 2)
447 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
448 self.assertEquals(res[0]["dnsHostName"], "z")
449 self.assertEquals(res[0]["lastLogon"], "z")
450 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
451 self.assertTrue(not "dnsHostName" in res[1])
452 self.assertEquals(res[1]["lastLogon"], "z")
454 # Search by renamed attribute
455 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
456 attrs=["dnsHostName", "lastLogon"])
457 self.assertEquals(len(res), 2)
458 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
459 self.assertTrue(not "dnsHostName" in res[0])
460 self.assertEquals(res[0]["lastLogon"], "y")
461 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
462 self.assertTrue(not "dnsHostName" in res[1])
463 self.assertEquals(res[1]["lastLogon"], "x")
465 # Search by converted attribute
467 # Using the SID directly in the parse tree leads to conversion
468 # errors, letting the search fail with no results.
469 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
470 res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
471 self.assertEquals(len(res), 3)
472 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
473 self.assertEquals(res[0]["dnsHostName"], "x")
474 self.assertEquals(res[0]["lastLogon"], "x")
475 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
477 self.assertTrue("objectSid" in res[0])
478 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
479 self.assertTrue(not "dnsHostName" in res[1])
480 self.assertEquals(res[1]["lastLogon"], "x")
481 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
483 self.assertTrue("objectSid" in res[1])
485 # Search by generated attribute
486 # In most cases, this even works when the mapping is missing
487 # a `convert_operator' by enumerating the remote db.
488 res = self.ldb.search(expression="(primaryGroupID=512)",
489 attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
490 self.assertEquals(len(res), 1)
491 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
492 self.assertTrue(not "dnsHostName" in res[0])
493 self.assertEquals(res[0]["lastLogon"], "x")
494 self.assertEquals(res[0]["primaryGroupID"], "512")
496 # TODO: There should actually be two results, A and X. The
497 # primaryGroupID of X seems to get corrupted somewhere, and the
498 # objectSid isn't available during the generation of remote (!) data,
499 # which can be observed with the following search. Also note that Xs
500 # objectSid seems to be fine in the previous search for objectSid... */
501 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
502 #print len(res) + " results found"
503 #for i in range(len(res)):
504 # for (obj in res[i]) {
505 # print obj + ": " + res[i][obj]
510 # Search by remote name of renamed attribute */
511 res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
512 attrs=["dnsHostName", "lastLogon"])
513 self.assertEquals(len(res), 0)
515 # Search by objectClass
516 attrs = ["dnsHostName", "lastLogon", "objectClass"]
517 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
518 self.assertEquals(len(res), 2)
519 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
520 self.assertEquals(res[0]["dnsHostName"], "x")
521 self.assertEquals(res[0]["lastLogon"], "x")
522 self.assertEquals(res[0]["objectClass"][0], "user")
523 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
524 self.assertTrue(not "dnsHostName" in res[1])
525 self.assertEquals(res[1]["lastLogon"], "x")
526 self.assertEquals(res[1]["objectClass"][0], "user")
528 # Prove that the objectClass is actually used for the search
529 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
531 self.assertEquals(len(res), 3)
532 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
533 self.assertTrue(not "dnsHostName" in res[0])
534 self.assertEquals(res[0]["lastLogon"], "y")
535 self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
536 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
537 self.assertEquals(res[1]["dnsHostName"], "x")
538 self.assertEquals(res[1]["lastLogon"], "x")
539 self.assertEquals(res[1]["objectClass"][0], "user")
540 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
541 self.assertTrue(not "dnsHostName" in res[2])
542 self.assertEquals(res[2]["lastLogon"], "x")
543 self.assertEquals(res[2]["objectClass"][0], "user")
545 # Testing search by parse tree
547 # Search by conjunction of local attributes
548 res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
549 attrs=["dnsHostName", "lastLogon"])
550 self.assertEquals(len(res), 2)
551 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
552 self.assertEquals(res[0]["dnsHostName"], "y")
553 self.assertEquals(res[0]["lastLogon"], "y")
554 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
555 self.assertEquals(res[1]["dnsHostName"], "x")
556 self.assertEquals(res[1]["lastLogon"], "x")
558 # Search by conjunction of remote attributes
559 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
560 attrs=["dnsHostName", "lastLogon"])
561 self.assertEquals(len(res), 2)
562 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
563 self.assertEquals(res[0]["dnsHostName"], "x")
564 self.assertEquals(res[0]["lastLogon"], "x")
565 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
566 self.assertTrue(not "dnsHostName" in res[1])
567 self.assertEquals(res[1]["lastLogon"], "x")
569 # Search by conjunction of local and remote attribute
570 res = self.ldb.search(expression="(&(codePage=x)(description=x))",
571 attrs=["dnsHostName", "lastLogon"])
572 self.assertEquals(len(res), 2)
573 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
574 self.assertEquals(res[0]["dnsHostName"], "y")
575 self.assertEquals(res[0]["lastLogon"], "y")
576 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
577 self.assertEquals(res[1]["dnsHostName"], "x")
578 self.assertEquals(res[1]["lastLogon"], "x")
580 # Search by conjunction of local and remote attribute w/o match
581 attrs = ["dnsHostName", "lastLogon"]
582 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
584 self.assertEquals(len(res), 0)
585 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
587 self.assertEquals(len(res), 0)
589 # Search by disjunction of local attributes
590 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
591 attrs=["dnsHostName", "lastLogon"])
592 self.assertEquals(len(res), 2)
593 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
594 self.assertEquals(res[0]["dnsHostName"], "y")
595 self.assertEquals(res[0]["lastLogon"], "y")
596 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
597 self.assertEquals(res[1]["dnsHostName"], "x")
598 self.assertEquals(res[1]["lastLogon"], "x")
600 # Search by disjunction of remote attributes
601 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
602 attrs=["dnsHostName", "lastLogon"])
603 self.assertEquals(len(res), 3)
604 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
605 self.assertFalse("dnsHostName" in res[0])
606 self.assertEquals(res[0]["lastLogon"], "y")
607 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
608 self.assertEquals(res[1]["dnsHostName"], "x")
609 self.assertEquals(res[1]["lastLogon"], "x")
610 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
611 self.assertFalse("dnsHostName" in res[2])
612 self.assertEquals(res[2]["lastLogon"], "x")
614 # Search by disjunction of local and remote attribute
615 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
616 attrs=["dnsHostName", "lastLogon"])
617 self.assertEquals(len(res), 3)
618 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
619 self.assertEquals(res[0]["dnsHostName"], "y")
620 self.assertEquals(res[0]["lastLogon"], "y")
621 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
622 self.assertFalse("dnsHostName" in res[1])
623 self.assertEquals(res[1]["lastLogon"], "y")
624 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
625 self.assertEquals(res[2]["dnsHostName"], "x")
626 self.assertEquals(res[2]["lastLogon"], "x")
628 # Search by disjunction of local and remote attribute w/o match
629 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
630 attrs=["dnsHostName", "lastLogon"])
631 self.assertEquals(len(res), 0)
633 # Search by negated local attribute
634 res = self.ldb.search(expression="(!(revision=x))",
635 attrs=["dnsHostName", "lastLogon"])
636 self.assertEquals(len(res), 5)
637 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
638 self.assertTrue(not "dnsHostName" in res[0])
639 self.assertEquals(res[0]["lastLogon"], "y")
640 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
641 self.assertTrue(not "dnsHostName" in res[1])
642 self.assertEquals(res[1]["lastLogon"], "x")
643 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
644 self.assertEquals(res[2]["dnsHostName"], "z")
645 self.assertEquals(res[2]["lastLogon"], "z")
646 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
647 self.assertTrue(not "dnsHostName" in res[3])
648 self.assertEquals(res[3]["lastLogon"], "z")
650 # Search by negated remote attribute
651 res = self.ldb.search(expression="(!(description=x))",
652 attrs=["dnsHostName", "lastLogon"])
653 self.assertEquals(len(res), 3)
654 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
655 self.assertEquals(res[0]["dnsHostName"], "z")
656 self.assertEquals(res[0]["lastLogon"], "z")
657 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
658 self.assertTrue(not "dnsHostName" in res[1])
659 self.assertEquals(res[1]["lastLogon"], "z")
661 # Search by negated conjunction of local attributes
662 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
663 attrs=["dnsHostName", "lastLogon"])
664 self.assertEquals(len(res), 5)
665 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
666 self.assertTrue(not "dnsHostName" in res[0])
667 self.assertEquals(res[0]["lastLogon"], "y")
668 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
669 self.assertTrue(not "dnsHostName" in res[1])
670 self.assertEquals(res[1]["lastLogon"], "x")
671 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
672 self.assertEquals(res[2]["dnsHostName"], "z")
673 self.assertEquals(res[2]["lastLogon"], "z")
674 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
675 self.assertTrue(not "dnsHostName" in res[3])
676 self.assertEquals(res[3]["lastLogon"], "z")
678 # Search by negated conjunction of remote attributes
679 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
680 attrs=["dnsHostName", "lastLogon"])
681 self.assertEquals(len(res), 5)
682 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
683 self.assertEquals(res[0]["dnsHostName"], "y")
684 self.assertEquals(res[0]["lastLogon"], "y")
685 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
686 self.assertTrue(not "dnsHostName" in res[1])
687 self.assertEquals(res[1]["lastLogon"], "y")
688 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
689 self.assertEquals(res[2]["dnsHostName"], "z")
690 self.assertEquals(res[2]["lastLogon"], "z")
691 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
692 self.assertTrue(not "dnsHostName" in res[3])
693 self.assertEquals(res[3]["lastLogon"], "z")
695 # Search by negated conjunction of local and remote attribute
696 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
697 attrs=["dnsHostName", "lastLogon"])
698 self.assertEquals(len(res), 5)
699 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
700 self.assertTrue(not "dnsHostName" in res[0])
701 self.assertEquals(res[0]["lastLogon"], "y")
702 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
703 self.assertTrue(not "dnsHostName" in res[1])
704 self.assertEquals(res[1]["lastLogon"], "x")
705 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
706 self.assertEquals(res[2]["dnsHostName"], "z")
707 self.assertEquals(res[2]["lastLogon"], "z")
708 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
709 self.assertTrue(not "dnsHostName" in res[3])
710 self.assertEquals(res[3]["lastLogon"], "z")
712 # Search by negated disjunction of local attributes
713 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
714 attrs=["dnsHostName", "lastLogon"])
715 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
716 self.assertTrue(not "dnsHostName" in res[0])
717 self.assertEquals(res[0]["lastLogon"], "y")
718 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
719 self.assertTrue(not "dnsHostName" in res[1])
720 self.assertEquals(res[1]["lastLogon"], "x")
721 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
722 self.assertEquals(res[2]["dnsHostName"], "z")
723 self.assertEquals(res[2]["lastLogon"], "z")
724 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
725 self.assertTrue(not "dnsHostName" in res[3])
726 self.assertEquals(res[3]["lastLogon"], "z")
728 # Search by negated disjunction of remote attributes
729 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
730 attrs=["dnsHostName", "lastLogon"])
731 self.assertEquals(len(res), 4)
732 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
733 self.assertEquals(res[0]["dnsHostName"], "y")
734 self.assertEquals(res[0]["lastLogon"], "y")
735 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
736 self.assertEquals(res[1]["dnsHostName"], "z")
737 self.assertEquals(res[1]["lastLogon"], "z")
738 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
739 self.assertTrue(not "dnsHostName" in res[2])
740 self.assertEquals(res[2]["lastLogon"], "z")
742 # Search by negated disjunction of local and remote attribute
743 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
744 attrs=["dnsHostName", "lastLogon"])
745 self.assertEquals(len(res), 4)
746 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
747 self.assertTrue(not "dnsHostName" in res[0])
748 self.assertEquals(res[0]["lastLogon"], "x")
749 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
750 self.assertEquals(res[1]["dnsHostName"], "z")
751 self.assertEquals(res[1]["lastLogon"], "z")
752 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
753 self.assertTrue(not "dnsHostName" in res[2])
754 self.assertEquals(res[2]["lastLogon"], "z")
756 # Search by complex parse tree
757 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
758 self.assertEquals(len(res), 6)
759 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
760 self.assertTrue(not "dnsHostName" in res[0])
761 self.assertEquals(res[0]["lastLogon"], "y")
762 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
763 self.assertEquals(res[1]["dnsHostName"], "x")
764 self.assertEquals(res[1]["lastLogon"], "x")
765 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
766 self.assertTrue(not "dnsHostName" in res[2])
767 self.assertEquals(res[2]["lastLogon"], "x")
768 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
769 self.assertEquals(res[3]["dnsHostName"], "z")
770 self.assertEquals(res[3]["lastLogon"], "z")
771 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
772 self.assertTrue(not "dnsHostName" in res[4])
773 self.assertEquals(res[4]["lastLogon"], "z")
776 dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
780 def test_map_modify_local(self):
781 """Modification of local records."""
783 dn = "cn=test,dc=idealx,dc=org"
784 self.ldb.add({"dn": dn,
788 "description": "test"})
790 attrs = ["foo", "revision", "description"]
791 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
792 self.assertEquals(len(res), 1)
793 self.assertEquals(str(res[0].dn), dn)
794 self.assertEquals(res[0]["foo"], "bar")
795 self.assertEquals(res[0]["revision"], "1")
796 self.assertEquals(res[0]["description"], "test")
797 # Check it's not in the local db
798 res = self.samba4.db.search(expression="(cn=test)",
799 scope=SCOPE_DEFAULT, attrs=attrs)
800 self.assertEquals(len(res), 0)
801 # Check it's not in the remote db
802 res = self.samba3.db.search(expression="(cn=test)",
803 scope=SCOPE_DEFAULT, attrs=attrs)
804 self.assertEquals(len(res), 0)
806 # Modify local record
814 self.ldb.modify_ldif(ldif)
816 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
817 self.assertEquals(len(res), 1)
818 self.assertEquals(str(res[0].dn), dn)
819 self.assertEquals(res[0]["foo"], "baz")
820 self.assertEquals(res[0]["revision"], "1")
821 self.assertEquals(res[0]["description"], "foo")
823 # Rename local record
824 dn2 = "cn=toast,dc=idealx,dc=org"
825 self.ldb.rename(dn, dn2)
827 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
828 self.assertEquals(len(res), 1)
829 self.assertEquals(str(res[0].dn), dn2)
830 self.assertEquals(res[0]["foo"], "baz")
831 self.assertEquals(res[0]["revision"], "1")
832 self.assertEquals(res[0]["description"], "foo")
834 # Delete local record
837 res = self.ldb.search(dn2, scope=SCOPE_BASE)
838 self.assertEquals(len(res), 0)
840 def test_map_modify_remote_remote(self):
841 """Modification of remote data of remote records"""
843 dn = self.samba4.dn("cn=test")
844 dn2 = self.samba3.dn("cn=test")
845 self.samba3.db.add({"dn": dn2,
847 "description": "foo",
848 "sambaBadPasswordCount": "3",
849 "sambaNextRid": "1001"})
851 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
852 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
853 self.assertEquals(len(res), 1)
854 self.assertEquals(str(res[0].dn), dn2)
855 self.assertEquals(res[0]["description"], "foo")
856 self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
857 self.assertEquals(res[0]["sambaNextRid"], "1001")
859 attrs = ["description", "badPwdCount", "nextRid"]
860 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
861 self.assertEquals(len(res), 1)
862 self.assertEquals(str(res[0].dn), dn)
863 self.assertEquals(res[0]["description"], "foo")
864 self.assertEquals(res[0]["badPwdCount"], "3")
865 self.assertEquals(res[0]["nextRid"], "1001")
867 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
868 self.assertEquals(len(res), 0)
870 # Modify remote data of remote record
878 self.ldb.modify_ldif(ldif)
880 res = self.ldb.search(dn, scope=SCOPE_BASE,
881 attrs=["description", "badPwdCount", "nextRid"])
882 self.assertEquals(len(res), 1)
883 self.assertEquals(str(res[0].dn), dn)
884 self.assertEquals(res[0]["description"], "test")
885 self.assertEquals(res[0]["badPwdCount"], "4")
886 self.assertEquals(res[0]["nextRid"], "1001")
888 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
889 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
890 self.assertEquals(len(res), 1)
891 self.assertEquals(str(res[0].dn), dn2)
892 self.assertEquals(res[0]["description"], "test")
893 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
894 self.assertEquals(res[0]["sambaNextRid"], "1001")
896 # Rename remote record
897 dn2 = self.samba4.dn("cn=toast")
898 self.ldb.rename(dn, dn2)
901 res = self.ldb.search(dn, scope=SCOPE_BASE,
902 attrs=["description", "badPwdCount", "nextRid"])
903 self.assertEquals(len(res), 1)
904 self.assertEquals(str(res[0].dn), dn)
905 self.assertEquals(res[0]["description"], "test")
906 self.assertEquals(res[0]["badPwdCount"], "4")
907 self.assertEquals(res[0]["nextRid"], "1001")
909 dn2 = self.samba3.dn("cn=toast")
910 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
911 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
912 self.assertEquals(len(res), 1)
913 self.assertEquals(str(res[0].dn), dn2)
914 self.assertEquals(res[0]["description"], "test")
915 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
916 self.assertEquals(res[0]["sambaNextRid"], "1001")
918 # Delete remote record
920 # Check in mapped db that it's removed
921 res = self.ldb.search(dn, scope=SCOPE_BASE)
922 self.assertEquals(len(res), 0)
924 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
925 self.assertEquals(len(res), 0)
927 def test_map_modify_remote_local(self):
928 """Modification of local data of remote records"""
929 # Add remote record (same as before)
930 dn = self.samba4.dn("cn=test")
931 dn2 = self.samba3.dn("cn=test")
932 self.samba3.db.add({"dn": dn2,
934 "description": "foo",
935 "sambaBadPasswordCount": "3",
936 "sambaNextRid": "1001"})
938 # Modify local data of remote record
947 self.ldb.modify_ldif(ldif)
949 attrs = ["revision", "description"]
950 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
951 self.assertEquals(len(res), 1)
952 self.assertEquals(str(res[0].dn), dn)
953 self.assertEquals(res[0]["description"], "test")
954 self.assertEquals(res[0]["revision"], "1")
956 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
957 self.assertEquals(len(res), 1)
958 self.assertEquals(str(res[0].dn), dn2)
959 self.assertEquals(res[0]["description"], "test")
960 self.assertTrue(not "revision" in res[0])
962 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
963 self.assertEquals(len(res), 1)
964 self.assertEquals(str(res[0].dn), dn)
965 self.assertTrue(not "description" in res[0])
966 self.assertEquals(res[0]["revision"], "1")
968 # Delete (newly) split record
971 def test_map_modify_split(self):
972 """Testing modification of split records"""
974 dn = self.samba4.dn("cn=test")
975 dn2 = self.samba3.dn("cn=test")
979 "description": "foo",
984 attrs = ["description", "badPwdCount", "nextRid", "revision"]
985 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
986 self.assertEquals(len(res), 1)
987 self.assertEquals(str(res[0].dn), dn)
988 self.assertEquals(res[0]["description"], "foo")
989 self.assertEquals(res[0]["badPwdCount"], "3")
990 self.assertEquals(res[0]["nextRid"], "1001")
991 self.assertEquals(res[0]["revision"], "1")
993 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
994 self.assertEquals(len(res), 1)
995 self.assertEquals(str(res[0].dn), dn)
996 self.assertTrue(not "description" in res[0])
997 self.assertTrue(not "badPwdCount" in res[0])
998 self.assertTrue(not "nextRid" in res[0])
999 self.assertEquals(res[0]["revision"], "1")
1000 # Check in remote db
1001 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1003 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1004 self.assertEquals(len(res), 1)
1005 self.assertEquals(str(res[0].dn), dn2)
1006 self.assertEquals(res[0]["description"], "foo")
1007 self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
1008 self.assertEquals(res[0]["sambaNextRid"], "1001")
1009 self.assertTrue(not "revision" in res[0])
1011 # Modify of split record
1014 replace: description
1016 replace: badPwdCount
1021 self.ldb.modify_ldif(ldif)
1022 # Check in mapped db
1023 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1024 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1025 self.assertEquals(len(res), 1)
1026 self.assertEquals(str(res[0].dn), dn)
1027 self.assertEquals(res[0]["description"], "test")
1028 self.assertEquals(res[0]["badPwdCount"], "4")
1029 self.assertEquals(res[0]["nextRid"], "1001")
1030 self.assertEquals(res[0]["revision"], "2")
1032 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1033 self.assertEquals(len(res), 1)
1034 self.assertEquals(str(res[0].dn), dn)
1035 self.assertTrue(not "description" in res[0])
1036 self.assertTrue(not "badPwdCount" in res[0])
1037 self.assertTrue(not "nextRid" in res[0])
1038 self.assertEquals(res[0]["revision"], "2")
1039 # Check in remote db
1040 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1042 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1043 self.assertEquals(len(res), 1)
1044 self.assertEquals(str(res[0].dn), dn2)
1045 self.assertEquals(res[0]["description"], "test")
1046 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1047 self.assertEquals(res[0]["sambaNextRid"], "1001")
1048 self.assertTrue(not "revision" in res[0])
1050 # Rename split record
1051 dn2 = self.samba4.dn("cn=toast")
1052 self.ldb.rename(dn, dn2)
1053 # Check in mapped db
1055 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1056 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1057 self.assertEquals(len(res), 1)
1058 self.assertEquals(str(res[0].dn), dn)
1059 self.assertEquals(res[0]["description"], "test")
1060 self.assertEquals(res[0]["badPwdCount"], "4")
1061 self.assertEquals(res[0]["nextRid"], "1001")
1062 self.assertEquals(res[0]["revision"], "2")
1064 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1065 self.assertEquals(len(res), 1)
1066 self.assertEquals(str(res[0].dn), dn)
1067 self.assertTrue(not "description" in res[0])
1068 self.assertTrue(not "badPwdCount" in res[0])
1069 self.assertTrue(not "nextRid" in res[0])
1070 self.assertEquals(res[0]["revision"], "2")
1071 # Check in remote db
1072 dn2 = self.samba3.dn("cn=toast")
1073 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1074 attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1076 self.assertEquals(len(res), 1)
1077 self.assertEquals(str(res[0].dn), dn2)
1078 self.assertEquals(res[0]["description"], "test")
1079 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1080 self.assertEquals(res[0]["sambaNextRid"], "1001")
1081 self.assertTrue(not "revision" in res[0])
1083 # Delete split record
1085 # Check in mapped db
1086 res = self.ldb.search(dn, scope=SCOPE_BASE)
1087 self.assertEquals(len(res), 0)
1089 res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1090 self.assertEquals(len(res), 0)
1091 # Check in remote db
1092 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1093 self.assertEquals(len(res), 0)