3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm
30 import samba.dcerpc.security
33 datadir = os.path.join(os.path.dirname(__file__),
34 "../../../../../testdata/samba3")
36 def read_datafile(filename):
37 return open(os.path.join(datadir, filename), 'r').read()
39 def ldb_debug(l, text):
43 class MapBaseTestCase(TestCaseInTempDir):
44 """Base test case for mapping tests."""
46 def setup_modules(self, ldb, s3, s4):
47 ldb.add({"dn": "@MAP=samba3sam",
49 "@TO": "sambaDomainName=TESTS," + s3.basedn})
51 ldb.add({"dn": "@MODULES",
52 "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
54 ldb.add({"dn": "@PARTITION",
55 "partition": ["%s:%s" % (s4.basedn, s4.url),
56 "%s:%s" % (s3.basedn, s3.url)],
57 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
60 super(MapBaseTestCase, self).setUp()
62 def make_dn(basedn, rdn):
63 return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
65 def make_s4dn(basedn, rdn):
66 return "%s,%s" % (rdn, basedn)
68 self.ldbfile = os.path.join(self.tempdir, "test.ldb")
69 self.ldburl = "tdb://" + self.ldbfile
71 tempdir = self.tempdir
74 """Simple helper class that contains data for a specific SAM
76 def __init__(self, file, basedn, dn):
77 self.file = os.path.join(tempdir, file)
78 self.url = "tdb://" + self.file
80 self.substvars = {"BASEDN": self.basedn}
81 self.db = Ldb(lp=cmdline_loadparm)
85 return self._dn(self.basedn, rdn)
88 return self.db.connect(self.url)
90 def setup_data(self, path):
91 self.add_ldif(read_datafile(path))
93 def subst(self, text):
94 return substitute_var(text, self.substvars)
96 def add_ldif(self, ldif):
97 self.db.add_ldif(self.subst(ldif))
99 def modify_ldif(self, ldif):
100 self.db.modify_ldif(self.subst(ldif))
102 self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
103 self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
104 self.templates = Target("templates.ldb", "cn=templates", None)
106 self.samba3.connect()
107 self.templates.connect()
108 self.samba4.connect()
111 os.unlink(self.ldbfile)
112 os.unlink(self.samba3.file)
113 os.unlink(self.templates.file)
114 os.unlink(self.samba4.file)
115 super(MapBaseTestCase, self).tearDown()
117 def assertSidEquals(self, text, ndr_sid):
118 sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
120 sid_obj2 = samba.dcerpc.security.dom_sid(text)
121 self.assertEquals(sid_obj1, sid_obj2)
124 class Samba3SamTestCase(MapBaseTestCase):
127 super(Samba3SamTestCase, self).setUp()
128 ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
129 self.samba3.setup_data("samba3.ldif")
130 self.templates.setup_data("provision_samba3sam_templates.ldif")
131 ldif = read_datafile("provision_samba3sam.ldif")
132 ldb.add_ldif(self.samba4.subst(ldif))
133 self.setup_modules(ldb, self.samba3, self.samba4)
135 self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
137 def test_search_non_mapped(self):
138 """Looking up by non-mapped attribute"""
139 msg = self.ldb.search(expression="(cn=Administrator)")
140 self.assertEquals(len(msg), 1)
141 self.assertEquals(msg[0]["cn"], "Administrator")
143 def test_search_non_mapped(self):
144 """Looking up by mapped attribute"""
145 msg = self.ldb.search(expression="(name=Backup Operators)")
146 self.assertEquals(len(msg), 1)
147 self.assertEquals(str(msg[0]["name"]), "Backup Operators")
149 def test_old_name_of_renamed(self):
150 """Looking up by old name of renamed attribute"""
151 msg = self.ldb.search(expression="(displayName=Backup Operators)")
152 self.assertEquals(len(msg), 0)
154 def test_mapped_containing_sid(self):
155 """Looking up mapped entry containing SID"""
156 msg = self.ldb.search(expression="(cn=Replicator)")
157 self.assertEquals(len(msg), 1)
158 self.assertEquals(str(msg[0].dn),
159 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
160 self.assertTrue("objectSid" in msg[0])
161 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
163 oc = set(msg[0]["objectClass"])
164 self.assertEquals(oc, set(["group"]))
166 def test_search_by_objclass(self):
167 """Looking up by objectClass"""
168 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
169 self.assertEquals(set([str(m.dn) for m in msg]),
170 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
171 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
173 def test_s3sam_modify(self):
174 # Adding a record that will be fallbacked
175 self.ldb.add({"dn": "cn=Foo",
179 "showInAdvancedViewOnly": "TRUE"}
182 # Checking for existence of record (local)
183 # TODO: This record must be searched in the local database, which is
184 # currently only supported for base searches
185 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
186 # TODO: Actually, this version should work as well but doesn't...
189 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
191 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
192 self.assertEquals(len(msg), 1)
193 self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
194 self.assertEquals(str(msg[0]["foo"]), "bar")
195 self.assertEquals(str(msg[0]["blah"]), "Blie")
197 # Adding record that will be mapped
198 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
199 "objectClass": "user",
201 "sambaUnicodePwd": "geheim",
204 # Checking for existence of record (remote)
205 msg = self.ldb.search(expression="(unixName=bin)",
206 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
207 self.assertEquals(len(msg), 1)
208 self.assertEquals(str(msg[0]["cn"]), "Niemand")
209 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
211 # Checking for existence of record (local && remote)
212 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
213 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
214 self.assertEquals(len(msg), 1) # TODO: should check with more records
215 self.assertEquals(str(msg[0]["cn"]), "Niemand")
216 self.assertEquals(str(msg[0]["unixName"]), "bin")
217 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
219 # Checking for existence of record (local || remote)
220 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
221 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
222 #print "got %d replies" % len(msg)
223 self.assertEquals(len(msg), 1) # TODO: should check with more records
224 self.assertEquals(str(msg[0]["cn"]), "Niemand")
225 self.assertEquals(str(msg[0]["unixName"]), "bin")
226 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
228 # Checking for data in destination database
229 msg = self.samba3.db.search(expression="(cn=Niemand)")
230 self.assertTrue(len(msg) >= 1)
231 self.assertEquals(str(msg[0]["sambaSID"]),
232 "S-1-5-21-4231626423-2410014848-2360679739-2001")
233 self.assertEquals(str(msg[0]["displayName"]), "Niemand")
235 # Adding attribute...
236 self.ldb.modify_ldif("""
237 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
243 # Checking whether changes are still there...
244 msg = self.ldb.search(expression="(cn=Niemand)")
245 self.assertTrue(len(msg) >= 1)
246 self.assertEquals(str(msg[0]["cn"]), "Niemand")
247 self.assertEquals(str(msg[0]["description"]), "Blah")
249 # Modifying attribute...
250 self.ldb.modify_ldif("""
251 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
257 # Checking whether changes are still there...
258 msg = self.ldb.search(expression="(cn=Niemand)")
259 self.assertTrue(len(msg) >= 1)
260 self.assertEquals(str(msg[0]["description"]), "Blie")
262 # Deleting attribute...
263 self.ldb.modify_ldif("""
264 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
269 # Checking whether changes are no longer there...
270 msg = self.ldb.search(expression="(cn=Niemand)")
271 self.assertTrue(len(msg) >= 1)
272 self.assertTrue(not "description" in msg[0])
275 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
276 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
278 # Checking whether DN has changed...
279 msg = self.ldb.search(expression="(cn=Niemand2)")
280 self.assertEquals(len(msg), 1)
281 self.assertEquals(str(msg[0].dn),
282 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
285 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
287 # Checking whether record is gone...
288 msg = self.ldb.search(expression="(cn=Niemand2)")
289 self.assertEquals(len(msg), 0)
292 class MapTestCase(MapBaseTestCase):
295 super(MapTestCase, self).setUp()
296 ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
297 self.templates.setup_data("provision_samba3sam_templates.ldif")
298 ldif = read_datafile("provision_samba3sam.ldif")
299 ldb.add_ldif(self.samba4.subst(ldif))
300 self.setup_modules(ldb, self.samba3, self.samba4)
302 self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
304 def test_map_search(self):
305 """Running search tests on mapped data."""
307 "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
308 "objectclass": ["sambaDomain", "top"],
309 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
310 "sambaNextRid": "2000",
311 "sambaDomainName": "TESTS"
314 # Add a set of split records
315 self.ldb.add_ldif("""
316 dn: """+ self.samba4.dn("cn=Domain Users") + """
319 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
322 # Add a set of split records
323 self.ldb.add_ldif("""
324 dn: """+ self.samba4.dn("cn=X") + """
333 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
337 "dn": self.samba4.dn("cn=Y"),
338 "objectClass": "top",
348 "dn": self.samba4.dn("cn=Z"),
349 "objectClass": "top",
358 # Add a set of remote records
361 "dn": self.samba3.dn("cn=A"),
362 "objectClass": "posixAccount",
365 "sambaBadPasswordCount": "x",
366 "sambaLogonTime": "x",
368 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
369 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
372 "dn": self.samba3.dn("cn=B"),
373 "objectClass": "top",
376 "sambaBadPasswordCount": "x",
377 "sambaLogonTime": "y",
381 "dn": self.samba3.dn("cn=C"),
382 "objectClass": "top",
385 "sambaBadPasswordCount": "y",
386 "sambaLogonTime": "z",
389 # Testing search by DN
391 # Search remote record by local DN
392 dn = self.samba4.dn("cn=A")
393 res = self.ldb.search(dn, scope=SCOPE_BASE,
394 attrs=["dnsHostName", "lastLogon"])
395 self.assertEquals(len(res), 1)
396 self.assertEquals(str(res[0].dn), dn)
397 self.assertTrue(not "dnsHostName" in res[0])
398 self.assertEquals(str(res[0]["lastLogon"]), "x")
400 # Search remote record by remote DN
401 dn = self.samba3.dn("cn=A")
402 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
403 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
404 self.assertEquals(len(res), 1)
405 self.assertEquals(str(res[0].dn), dn)
406 self.assertTrue(not "dnsHostName" in res[0])
407 self.assertTrue(not "lastLogon" in res[0])
408 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
410 # Search split record by local DN
411 dn = self.samba4.dn("cn=X")
412 res = self.ldb.search(dn, scope=SCOPE_BASE,
413 attrs=["dnsHostName", "lastLogon"])
414 self.assertEquals(len(res), 1)
415 self.assertEquals(str(res[0].dn), dn)
416 self.assertEquals(str(res[0]["dnsHostName"]), "x")
417 self.assertEquals(str(res[0]["lastLogon"]), "x")
419 # Search split record by remote DN
420 dn = self.samba3.dn("cn=X")
421 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
422 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
423 self.assertEquals(len(res), 1)
424 self.assertEquals(str(res[0].dn), dn)
425 self.assertTrue(not "dnsHostName" in res[0])
426 self.assertTrue(not "lastLogon" in res[0])
427 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
429 # Testing search by attribute
431 # Search by ignored attribute
432 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
433 attrs=["dnsHostName", "lastLogon"])
434 self.assertEquals(len(res), 2)
435 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
436 self.assertEquals(str(res[0]["dnsHostName"]), "y")
437 self.assertEquals(str(res[0]["lastLogon"]), "y")
438 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
439 self.assertEquals(str(res[1]["dnsHostName"]), "x")
440 self.assertEquals(str(res[1]["lastLogon"]), "x")
442 # Search by kept attribute
443 res = self.ldb.search(expression="(description=y)",
444 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
445 self.assertEquals(len(res), 2)
446 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
447 self.assertEquals(str(res[0]["dnsHostName"]), "z")
448 self.assertEquals(str(res[0]["lastLogon"]), "z")
449 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
450 self.assertTrue(not "dnsHostName" in res[1])
451 self.assertEquals(str(res[1]["lastLogon"]), "z")
453 # Search by renamed attribute
454 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
455 attrs=["dnsHostName", "lastLogon"])
456 self.assertEquals(len(res), 2)
457 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
458 self.assertTrue(not "dnsHostName" in res[0])
459 self.assertEquals(str(res[0]["lastLogon"]), "y")
460 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
461 self.assertTrue(not "dnsHostName" in res[1])
462 self.assertEquals(str(res[1]["lastLogon"]), "x")
464 # Search by converted attribute
466 # Using the SID directly in the parse tree leads to conversion
467 # errors, letting the search fail with no results.
468 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
469 res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
470 self.assertEquals(len(res), 4)
471 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
472 self.assertEquals(str(res[0]["dnsHostName"]), "x")
473 self.assertEquals(str(res[0]["lastLogon"]), "x")
474 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
476 self.assertTrue("objectSid" in res[0])
477 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
478 self.assertTrue(not "dnsHostName" in res[1])
479 self.assertEquals(str(res[1]["lastLogon"]), "x")
480 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
482 self.assertTrue("objectSid" in res[1])
484 # Search by generated attribute
485 # In most cases, this even works when the mapping is missing
486 # a `convert_operator' by enumerating the remote db.
487 res = self.ldb.search(expression="(primaryGroupID=512)",
488 attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
489 self.assertEquals(len(res), 1)
490 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
491 self.assertTrue(not "dnsHostName" in res[0])
492 self.assertEquals(str(res[0]["lastLogon"]), "x")
493 self.assertEquals(str(res[0]["primaryGroupID"]), "512")
495 # Note that Xs "objectSid" seems to be fine in the previous search for
497 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
498 #print len(res) + " results found"
499 #for i in range(len(res)):
500 # for (obj in res[i]) {
501 # print obj + ": " + res[i][obj]
506 # Search by remote name of renamed attribute */
507 res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
508 attrs=["dnsHostName", "lastLogon"])
509 self.assertEquals(len(res), 0)
511 # Search by objectClass
512 attrs = ["dnsHostName", "lastLogon", "objectClass"]
513 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
514 self.assertEquals(len(res), 2)
515 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
516 self.assertEquals(str(res[0]["dnsHostName"]), "x")
517 self.assertEquals(str(res[0]["lastLogon"]), "x")
518 self.assertEquals(str(res[0]["objectClass"][0]), "user")
519 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
520 self.assertTrue(not "dnsHostName" in res[1])
521 self.assertEquals(str(res[1]["lastLogon"]), "x")
522 self.assertEquals(str(res[1]["objectClass"][0]), "user")
524 # Prove that the objectClass is actually used for the search
525 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
527 self.assertEquals(len(res), 3)
528 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
529 self.assertTrue(not "dnsHostName" in res[0])
530 self.assertEquals(str(res[0]["lastLogon"]), "y")
531 self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
532 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
533 self.assertEquals(str(res[1]["dnsHostName"]), "x")
534 self.assertEquals(str(res[1]["lastLogon"]), "x")
535 self.assertEquals(str(res[1]["objectClass"][0]), "user")
536 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
537 self.assertTrue(not "dnsHostName" in res[2])
538 self.assertEquals(str(res[2]["lastLogon"]), "x")
539 self.assertEquals(res[2]["objectClass"][0], "user")
541 # Testing search by parse tree
543 # Search by conjunction of local attributes
544 res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
545 attrs=["dnsHostName", "lastLogon"])
546 self.assertEquals(len(res), 2)
547 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
548 self.assertEquals(str(res[0]["dnsHostName"]), "y")
549 self.assertEquals(str(res[0]["lastLogon"]), "y")
550 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
551 self.assertEquals(str(res[1]["dnsHostName"]), "x")
552 self.assertEquals(str(res[1]["lastLogon"]), "x")
554 # Search by conjunction of remote attributes
555 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
556 attrs=["dnsHostName", "lastLogon"])
557 self.assertEquals(len(res), 2)
558 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
559 self.assertEquals(str(res[0]["dnsHostName"]), "x")
560 self.assertEquals(str(res[0]["lastLogon"]), "x")
561 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
562 self.assertTrue(not "dnsHostName" in res[1])
563 self.assertEquals(str(res[1]["lastLogon"]), "x")
565 # Search by conjunction of local and remote attribute
566 res = self.ldb.search(expression="(&(codePage=x)(description=x))",
567 attrs=["dnsHostName", "lastLogon"])
568 self.assertEquals(len(res), 2)
569 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
570 self.assertEquals(str(res[0]["dnsHostName"]), "y")
571 self.assertEquals(str(res[0]["lastLogon"]), "y")
572 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
573 self.assertEquals(str(res[1]["dnsHostName"]), "x")
574 self.assertEquals(str(res[1]["lastLogon"]), "x")
576 # Search by conjunction of local and remote attribute w/o match
577 attrs = ["dnsHostName", "lastLogon"]
578 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
580 self.assertEquals(len(res), 0)
581 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
583 self.assertEquals(len(res), 0)
585 # Search by disjunction of local attributes
586 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
587 attrs=["dnsHostName", "lastLogon"])
588 self.assertEquals(len(res), 2)
589 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
590 self.assertEquals(str(res[0]["dnsHostName"]), "y")
591 self.assertEquals(str(res[0]["lastLogon"]), "y")
592 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
593 self.assertEquals(str(res[1]["dnsHostName"]), "x")
594 self.assertEquals(str(res[1]["lastLogon"]), "x")
596 # Search by disjunction of remote attributes
597 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
598 attrs=["dnsHostName", "lastLogon"])
599 self.assertEquals(len(res), 3)
600 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
601 self.assertFalse("dnsHostName" in res[0])
602 self.assertEquals(str(res[0]["lastLogon"]), "y")
603 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
604 self.assertEquals(str(res[1]["dnsHostName"]), "x")
605 self.assertEquals(str(res[1]["lastLogon"]), "x")
606 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
607 self.assertFalse("dnsHostName" in res[2])
608 self.assertEquals(str(res[2]["lastLogon"]), "x")
610 # Search by disjunction of local and remote attribute
611 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
612 attrs=["dnsHostName", "lastLogon"])
613 self.assertEquals(len(res), 3)
614 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
615 self.assertEquals(str(res[0]["dnsHostName"]), "y")
616 self.assertEquals(str(res[0]["lastLogon"]), "y")
617 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
618 self.assertFalse("dnsHostName" in res[1])
619 self.assertEquals(str(res[1]["lastLogon"]), "y")
620 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
621 self.assertEquals(str(res[2]["dnsHostName"]), "x")
622 self.assertEquals(str(res[2]["lastLogon"]), "x")
624 # Search by disjunction of local and remote attribute w/o match
625 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
626 attrs=["dnsHostName", "lastLogon"])
627 self.assertEquals(len(res), 0)
629 # Search by negated local attribute
630 res = self.ldb.search(expression="(!(revision=x))",
631 attrs=["dnsHostName", "lastLogon"])
632 self.assertEquals(len(res), 6)
633 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
634 self.assertTrue(not "dnsHostName" in res[0])
635 self.assertEquals(str(res[0]["lastLogon"]), "y")
636 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
637 self.assertTrue(not "dnsHostName" in res[1])
638 self.assertEquals(str(res[1]["lastLogon"]), "x")
639 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
640 self.assertEquals(str(res[2]["dnsHostName"]), "z")
641 self.assertEquals(str(res[2]["lastLogon"]), "z")
642 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
643 self.assertTrue(not "dnsHostName" in res[3])
644 self.assertEquals(str(res[3]["lastLogon"]), "z")
646 # Search by negated remote attribute
647 res = self.ldb.search(expression="(!(description=x))",
648 attrs=["dnsHostName", "lastLogon"])
649 self.assertEquals(len(res), 4)
650 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
651 self.assertEquals(str(res[0]["dnsHostName"]), "z")
652 self.assertEquals(str(res[0]["lastLogon"]), "z")
653 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
654 self.assertTrue(not "dnsHostName" in res[1])
655 self.assertEquals(str(res[1]["lastLogon"]), "z")
657 # Search by negated conjunction of local attributes
658 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
659 attrs=["dnsHostName", "lastLogon"])
660 self.assertEquals(len(res), 6)
661 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
662 self.assertTrue(not "dnsHostName" in res[0])
663 self.assertEquals(str(res[0]["lastLogon"]), "y")
664 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
665 self.assertTrue(not "dnsHostName" in res[1])
666 self.assertEquals(str(res[1]["lastLogon"]), "x")
667 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
668 self.assertEquals(str(res[2]["dnsHostName"]), "z")
669 self.assertEquals(str(res[2]["lastLogon"]), "z")
670 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
671 self.assertTrue(not "dnsHostName" in res[3])
672 self.assertEquals(str(res[3]["lastLogon"]), "z")
674 # Search by negated conjunction of remote attributes
675 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
676 attrs=["dnsHostName", "lastLogon"])
677 self.assertEquals(len(res), 6)
678 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
679 self.assertEquals(str(res[0]["dnsHostName"]), "y")
680 self.assertEquals(str(res[0]["lastLogon"]), "y")
681 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
682 self.assertTrue(not "dnsHostName" in res[1])
683 self.assertEquals(str(res[1]["lastLogon"]), "y")
684 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
685 self.assertEquals(str(res[2]["dnsHostName"]), "z")
686 self.assertEquals(str(res[2]["lastLogon"]), "z")
687 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
688 self.assertTrue(not "dnsHostName" in res[3])
689 self.assertEquals(str(res[3]["lastLogon"]), "z")
691 # Search by negated conjunction of local and remote attribute
692 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
693 attrs=["dnsHostName", "lastLogon"])
694 self.assertEquals(len(res), 6)
695 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
696 self.assertTrue(not "dnsHostName" in res[0])
697 self.assertEquals(str(res[0]["lastLogon"]), "y")
698 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
699 self.assertTrue(not "dnsHostName" in res[1])
700 self.assertEquals(str(res[1]["lastLogon"]), "x")
701 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
702 self.assertEquals(str(res[2]["dnsHostName"]), "z")
703 self.assertEquals(str(res[2]["lastLogon"]), "z")
704 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
705 self.assertTrue(not "dnsHostName" in res[3])
706 self.assertEquals(str(res[3]["lastLogon"]), "z")
708 # Search by negated disjunction of local attributes
709 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
710 attrs=["dnsHostName", "lastLogon"])
711 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
712 self.assertTrue(not "dnsHostName" in res[0])
713 self.assertEquals(str(res[0]["lastLogon"]), "y")
714 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
715 self.assertTrue(not "dnsHostName" in res[1])
716 self.assertEquals(str(res[1]["lastLogon"]), "x")
717 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
718 self.assertEquals(str(res[2]["dnsHostName"]), "z")
719 self.assertEquals(str(res[2]["lastLogon"]), "z")
720 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
721 self.assertTrue(not "dnsHostName" in res[3])
722 self.assertEquals(str(res[3]["lastLogon"]), "z")
724 # Search by negated disjunction of remote attributes
725 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
726 attrs=["dnsHostName", "lastLogon"])
727 self.assertEquals(len(res), 5)
728 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
729 self.assertEquals(str(res[0]["dnsHostName"]), "y")
730 self.assertEquals(str(res[0]["lastLogon"]), "y")
731 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
732 self.assertEquals(str(res[1]["dnsHostName"]), "z")
733 self.assertEquals(str(res[1]["lastLogon"]), "z")
734 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
735 self.assertTrue(not "dnsHostName" in res[2])
736 self.assertEquals(str(res[2]["lastLogon"]), "z")
738 # Search by negated disjunction of local and remote attribute
739 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
740 attrs=["dnsHostName", "lastLogon"])
741 self.assertEquals(len(res), 5)
742 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
743 self.assertTrue(not "dnsHostName" in res[0])
744 self.assertEquals(str(res[0]["lastLogon"]), "x")
745 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
746 self.assertEquals(str(res[1]["dnsHostName"]), "z")
747 self.assertEquals(str(res[1]["lastLogon"]), "z")
748 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
749 self.assertTrue(not "dnsHostName" in res[2])
750 self.assertEquals(str(res[2]["lastLogon"]), "z")
752 # Search by complex parse tree
753 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
754 self.assertEquals(len(res), 7)
755 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
756 self.assertTrue(not "dnsHostName" in res[0])
757 self.assertEquals(str(res[0]["lastLogon"]), "y")
758 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
759 self.assertEquals(str(res[1]["dnsHostName"]), "x")
760 self.assertEquals(str(res[1]["lastLogon"]), "x")
761 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
762 self.assertTrue(not "dnsHostName" in res[2])
763 self.assertEquals(str(res[2]["lastLogon"]), "x")
764 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
765 self.assertEquals(str(res[3]["dnsHostName"]), "z")
766 self.assertEquals(str(res[3]["lastLogon"]), "z")
767 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
768 self.assertTrue(not "dnsHostName" in res[4])
769 self.assertEquals(str(res[4]["lastLogon"]), "z")
772 dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
776 def test_map_modify_local(self):
777 """Modification of local records."""
779 dn = "cn=test,dc=idealx,dc=org"
780 self.ldb.add({"dn": dn,
784 "description": "test"})
786 attrs = ["foo", "revision", "description"]
787 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
788 self.assertEquals(len(res), 1)
789 self.assertEquals(str(res[0].dn), dn)
790 self.assertEquals(str(res[0]["foo"]), "bar")
791 self.assertEquals(str(res[0]["revision"]), "1")
792 self.assertEquals(str(res[0]["description"]), "test")
793 # Check it's not in the local db
794 res = self.samba4.db.search(expression="(cn=test)",
795 scope=SCOPE_DEFAULT, attrs=attrs)
796 self.assertEquals(len(res), 0)
797 # Check it's not in the remote db
798 res = self.samba3.db.search(expression="(cn=test)",
799 scope=SCOPE_DEFAULT, attrs=attrs)
800 self.assertEquals(len(res), 0)
802 # Modify local record
810 self.ldb.modify_ldif(ldif)
812 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
813 self.assertEquals(len(res), 1)
814 self.assertEquals(str(res[0].dn), dn)
815 self.assertEquals(str(res[0]["foo"]), "baz")
816 self.assertEquals(str(res[0]["revision"]), "1")
817 self.assertEquals(str(res[0]["description"]), "foo")
819 # Rename local record
820 dn2 = "cn=toast,dc=idealx,dc=org"
821 self.ldb.rename(dn, dn2)
823 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
824 self.assertEquals(len(res), 1)
825 self.assertEquals(str(res[0].dn), dn2)
826 self.assertEquals(str(res[0]["foo"]), "baz")
827 self.assertEquals(str(res[0]["revision"]), "1")
828 self.assertEquals(str(res[0]["description"]), "foo")
830 # Delete local record
833 res = self.ldb.search(dn2, scope=SCOPE_BASE)
834 self.assertEquals(len(res), 0)
836 def test_map_modify_remote_remote(self):
837 """Modification of remote data of remote records"""
839 dn = self.samba4.dn("cn=test")
840 dn2 = self.samba3.dn("cn=test")
841 self.samba3.db.add({"dn": dn2,
843 "description": "foo",
844 "sambaBadPasswordCount": "3",
845 "sambaNextRid": "1001"})
847 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
848 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
849 self.assertEquals(len(res), 1)
850 self.assertEquals(str(res[0].dn), dn2)
851 self.assertEquals(str(res[0]["description"]), "foo")
852 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
853 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
855 attrs = ["description", "badPwdCount", "nextRid"]
856 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
857 self.assertEquals(len(res), 1)
858 self.assertEquals(str(res[0].dn), dn)
859 self.assertEquals(str(res[0]["description"]), "foo")
860 self.assertEquals(str(res[0]["badPwdCount"]), "3")
861 self.assertEquals(str(res[0]["nextRid"]), "1001")
863 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
864 self.assertEquals(len(res), 0)
866 # Modify remote data of remote record
874 self.ldb.modify_ldif(ldif)
876 res = self.ldb.search(dn, scope=SCOPE_BASE,
877 attrs=["description", "badPwdCount", "nextRid"])
878 self.assertEquals(len(res), 1)
879 self.assertEquals(str(res[0].dn), dn)
880 self.assertEquals(str(res[0]["description"]), "test")
881 self.assertEquals(str(res[0]["badPwdCount"]), "4")
882 self.assertEquals(str(res[0]["nextRid"]), "1001")
884 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
885 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
886 self.assertEquals(len(res), 1)
887 self.assertEquals(str(res[0].dn), dn2)
888 self.assertEquals(str(res[0]["description"]), "test")
889 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
890 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
892 # Rename remote record
893 dn2 = self.samba4.dn("cn=toast")
894 self.ldb.rename(dn, dn2)
897 res = self.ldb.search(dn, scope=SCOPE_BASE,
898 attrs=["description", "badPwdCount", "nextRid"])
899 self.assertEquals(len(res), 1)
900 self.assertEquals(str(res[0].dn), dn)
901 self.assertEquals(str(res[0]["description"]), "test")
902 self.assertEquals(str(res[0]["badPwdCount"]), "4")
903 self.assertEquals(str(res[0]["nextRid"]), "1001")
905 dn2 = self.samba3.dn("cn=toast")
906 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
907 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
908 self.assertEquals(len(res), 1)
909 self.assertEquals(str(res[0].dn), dn2)
910 self.assertEquals(str(res[0]["description"]), "test")
911 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
912 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
914 # Delete remote record
916 # Check in mapped db that it's removed
917 res = self.ldb.search(dn, scope=SCOPE_BASE)
918 self.assertEquals(len(res), 0)
920 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
921 self.assertEquals(len(res), 0)
923 def test_map_modify_remote_local(self):
924 """Modification of local data of remote records"""
925 # Add remote record (same as before)
926 dn = self.samba4.dn("cn=test")
927 dn2 = self.samba3.dn("cn=test")
928 self.samba3.db.add({"dn": dn2,
930 "description": "foo",
931 "sambaBadPasswordCount": "3",
932 "sambaNextRid": "1001"})
934 # Modify local data of remote record
943 self.ldb.modify_ldif(ldif)
945 attrs = ["revision", "description"]
946 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
947 self.assertEquals(len(res), 1)
948 self.assertEquals(str(res[0].dn), dn)
949 self.assertEquals(str(res[0]["description"]), "test")
950 self.assertEquals(str(res[0]["revision"]), "1")
952 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
953 self.assertEquals(len(res), 1)
954 self.assertEquals(str(res[0].dn), dn2)
955 self.assertEquals(str(res[0]["description"]), "test")
956 self.assertTrue(not "revision" in res[0])
958 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
959 self.assertEquals(len(res), 1)
960 self.assertEquals(str(res[0].dn), dn)
961 self.assertTrue(not "description" in res[0])
962 self.assertEquals(str(res[0]["revision"]), "1")
964 # Delete (newly) split record
967 def test_map_modify_split(self):
968 """Testing modification of split records"""
970 dn = self.samba4.dn("cn=test")
971 dn2 = self.samba3.dn("cn=test")
975 "description": "foo",
980 attrs = ["description", "badPwdCount", "nextRid", "revision"]
981 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
982 self.assertEquals(len(res), 1)
983 self.assertEquals(str(res[0].dn), dn)
984 self.assertEquals(str(res[0]["description"]), "foo")
985 self.assertEquals(str(res[0]["badPwdCount"]), "3")
986 self.assertEquals(str(res[0]["nextRid"]), "1001")
987 self.assertEquals(str(res[0]["revision"]), "1")
989 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
990 self.assertEquals(len(res), 1)
991 self.assertEquals(str(res[0].dn), dn)
992 self.assertTrue(not "description" in res[0])
993 self.assertTrue(not "badPwdCount" in res[0])
994 self.assertTrue(not "nextRid" in res[0])
995 self.assertEquals(str(res[0]["revision"]), "1")
997 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
999 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1000 self.assertEquals(len(res), 1)
1001 self.assertEquals(str(res[0].dn), dn2)
1002 self.assertEquals(str(res[0]["description"]), "foo")
1003 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1004 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1005 self.assertTrue(not "revision" in res[0])
1007 # Modify of split record
1010 replace: description
1012 replace: badPwdCount
1017 self.ldb.modify_ldif(ldif)
1018 # Check in mapped db
1019 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1020 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1021 self.assertEquals(len(res), 1)
1022 self.assertEquals(str(res[0].dn), dn)
1023 self.assertEquals(str(res[0]["description"]), "test")
1024 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1025 self.assertEquals(str(res[0]["nextRid"]), "1001")
1026 self.assertEquals(str(res[0]["revision"]), "2")
1028 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1029 self.assertEquals(len(res), 1)
1030 self.assertEquals(str(res[0].dn), dn)
1031 self.assertTrue(not "description" in res[0])
1032 self.assertTrue(not "badPwdCount" in res[0])
1033 self.assertTrue(not "nextRid" in res[0])
1034 self.assertEquals(str(res[0]["revision"]), "2")
1035 # Check in remote db
1036 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1038 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1039 self.assertEquals(len(res), 1)
1040 self.assertEquals(str(res[0].dn), dn2)
1041 self.assertEquals(str(res[0]["description"]), "test")
1042 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1043 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1044 self.assertTrue(not "revision" in res[0])
1046 # Rename split record
1047 dn2 = self.samba4.dn("cn=toast")
1048 self.ldb.rename(dn, dn2)
1049 # Check in mapped db
1051 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1052 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1053 self.assertEquals(len(res), 1)
1054 self.assertEquals(str(res[0].dn), dn)
1055 self.assertEquals(str(res[0]["description"]), "test")
1056 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1057 self.assertEquals(str(res[0]["nextRid"]), "1001")
1058 self.assertEquals(str(res[0]["revision"]), "2")
1060 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1061 self.assertEquals(len(res), 1)
1062 self.assertEquals(str(res[0].dn), dn)
1063 self.assertTrue(not "description" in res[0])
1064 self.assertTrue(not "badPwdCount" in res[0])
1065 self.assertTrue(not "nextRid" in res[0])
1066 self.assertEquals(str(res[0]["revision"]), "2")
1067 # Check in remote db
1068 dn2 = self.samba3.dn("cn=toast")
1069 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1070 attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1072 self.assertEquals(len(res), 1)
1073 self.assertEquals(str(res[0].dn), dn2)
1074 self.assertEquals(str(res[0]["description"]), "test")
1075 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1076 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1077 self.assertTrue(not "revision" in res[0])
1079 # Delete split record
1081 # Check in mapped db
1082 res = self.ldb.search(dn, scope=SCOPE_BASE)
1083 self.assertEquals(len(res), 0)
1085 res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1086 self.assertEquals(len(res), 0)
1087 # Check in remote db
1088 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1089 self.assertEquals(len(res), 0)