3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
28 from samba import Ldb, substitute_var
29 from samba.tests import LdbTestCase, TestCaseInTempDir, cmdline_loadparm
30 import samba.dcerpc.security
33 datadir = os.path.join(os.path.dirname(__file__),
34 "../../../../../testdata/samba3")
36 def read_datafile(filename):
37 return open(os.path.join(datadir, filename), 'r').read()
39 def ldb_debug(l, text):
43 class MapBaseTestCase(TestCaseInTempDir):
44 """Base test case for mapping tests."""
46 def setup_modules(self, ldb, s3, s4):
47 ldb.add({"dn": "@MAP=samba3sam",
49 "@TO": "sambaDomainName=TESTS," + s3.basedn})
51 ldb.add({"dn": "@MODULES",
52 "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
54 ldb.add({"dn": "@PARTITION",
55 "partition": ["%s:%s" % (s4.basedn, s4.url),
56 "%s:%s" % (s3.basedn, s3.url)],
57 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
60 super(MapBaseTestCase, self).setUp()
62 def make_dn(basedn, rdn):
63 return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
65 def make_s4dn(basedn, rdn):
66 return "%s,%s" % (rdn, basedn)
68 self.ldbfile = os.path.join(self.tempdir, "test.ldb")
69 self.ldburl = "tdb://" + self.ldbfile
71 tempdir = self.tempdir
74 """Simple helper class that contains data for a specific SAM
76 def __init__(self, file, basedn, dn):
77 self.file = os.path.join(tempdir, file)
78 self.url = "tdb://" + self.file
80 self.substvars = {"BASEDN": self.basedn}
81 self.db = Ldb(lp=cmdline_loadparm)
85 return self._dn(self.basedn, rdn)
88 return self.db.connect(self.url)
90 def setup_data(self, path):
91 self.add_ldif(read_datafile(path))
93 def subst(self, text):
94 return substitute_var(text, self.substvars)
96 def add_ldif(self, ldif):
97 self.db.add_ldif(self.subst(ldif))
99 def modify_ldif(self, ldif):
100 self.db.modify_ldif(self.subst(ldif))
102 self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
103 self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
104 self.templates = Target("templates.ldb", "cn=templates", None)
106 self.samba3.connect()
107 self.templates.connect()
108 self.samba4.connect()
111 os.unlink(self.ldbfile)
112 os.unlink(self.samba3.file)
113 os.unlink(self.templates.file)
114 os.unlink(self.samba4.file)
115 super(MapBaseTestCase, self).tearDown()
117 def assertSidEquals(self, text, ndr_sid):
118 sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
120 sid_obj2 = samba.dcerpc.security.dom_sid(text)
121 self.assertEquals(sid_obj1, sid_obj2)
124 class Samba3SamTestCase(MapBaseTestCase):
127 super(Samba3SamTestCase, self).setUp()
128 ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
129 self.samba3.setup_data("samba3.ldif")
130 self.templates.setup_data("provision_samba3sam_templates.ldif")
131 ldif = read_datafile("provision_samba3sam.ldif")
132 ldb.add_ldif(self.samba4.subst(ldif))
133 self.setup_modules(ldb, self.samba3, self.samba4)
135 self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
137 def test_search_non_mapped(self):
138 """Looking up by non-mapped attribute"""
139 msg = self.ldb.search(expression="(cn=Administrator)")
140 self.assertEquals(len(msg), 1)
141 self.assertEquals(msg[0]["cn"], "Administrator")
143 def test_search_non_mapped(self):
144 """Looking up by mapped attribute"""
145 msg = self.ldb.search(expression="(name=Backup Operators)")
146 self.assertEquals(len(msg), 1)
147 self.assertEquals(str(msg[0]["name"]), "Backup Operators")
149 def test_old_name_of_renamed(self):
150 """Looking up by old name of renamed attribute"""
151 msg = self.ldb.search(expression="(displayName=Backup Operators)")
152 self.assertEquals(len(msg), 0)
154 def test_mapped_containing_sid(self):
155 """Looking up mapped entry containing SID"""
156 msg = self.ldb.search(expression="(cn=Replicator)")
157 self.assertEquals(len(msg), 1)
158 self.assertEquals(str(msg[0].dn),
159 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
160 self.assertTrue("objectSid" in msg[0])
161 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
163 oc = set(msg[0]["objectClass"])
164 self.assertEquals(oc, set(["group"]))
166 def test_search_by_objclass(self):
167 """Looking up by objectClass"""
168 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
169 self.assertEquals(set([str(m.dn) for m in msg]),
170 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
171 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
173 def test_s3sam_modify(self):
174 # Adding a record that will be fallbacked
175 self.ldb.add({"dn": "cn=Foo",
179 "showInAdvancedViewOnly": "TRUE"}
182 # Checking for existence of record (local)
183 # TODO: This record must be searched in the local database, which is
184 # currently only supported for base searches
185 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
186 # TODO: Actually, this version should work as well but doesn't...
189 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
191 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
192 self.assertEquals(len(msg), 1)
193 self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
194 self.assertEquals(str(msg[0]["foo"]), "bar")
195 self.assertEquals(str(msg[0]["blah"]), "Blie")
197 # Adding record that will be mapped
198 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
199 "objectClass": "user",
201 "sambaUnicodePwd": "geheim",
204 # Checking for existence of record (remote)
205 msg = self.ldb.search(expression="(unixName=bin)",
206 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
207 self.assertEquals(len(msg), 1)
208 self.assertEquals(str(msg[0]["cn"]), "Niemand")
209 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
211 # Checking for existence of record (local && remote)
212 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
213 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
214 self.assertEquals(len(msg), 1) # TODO: should check with more records
215 self.assertEquals(str(msg[0]["cn"]), "Niemand")
216 self.assertEquals(str(msg[0]["unixName"]), "bin")
217 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
219 # Checking for existence of record (local || remote)
220 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
221 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
222 #print "got %d replies" % len(msg)
223 self.assertEquals(len(msg), 1) # TODO: should check with more records
224 self.assertEquals(str(msg[0]["cn"]), "Niemand")
225 self.assertEquals(str(msg[0]["unixName"]), "bin")
226 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
228 # Checking for data in destination database
229 msg = self.samba3.db.search(expression="(cn=Niemand)")
230 self.assertTrue(len(msg) >= 1)
231 self.assertEquals(str(msg[0]["sambaSID"]),
232 "S-1-5-21-4231626423-2410014848-2360679739-2001")
233 self.assertEquals(str(msg[0]["displayName"]), "Niemand")
235 # Adding attribute...
236 self.ldb.modify_ldif("""
237 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
243 # Checking whether changes are still there...
244 msg = self.ldb.search(expression="(cn=Niemand)")
245 self.assertTrue(len(msg) >= 1)
246 self.assertEquals(str(msg[0]["cn"]), "Niemand")
247 self.assertEquals(str(msg[0]["description"]), "Blah")
249 # Modifying attribute...
250 self.ldb.modify_ldif("""
251 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
257 # Checking whether changes are still there...
258 msg = self.ldb.search(expression="(cn=Niemand)")
259 self.assertTrue(len(msg) >= 1)
260 self.assertEquals(str(msg[0]["description"]), "Blie")
262 # Deleting attribute...
263 self.ldb.modify_ldif("""
264 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
269 # Checking whether changes are no longer there...
270 msg = self.ldb.search(expression="(cn=Niemand)")
271 self.assertTrue(len(msg) >= 1)
272 self.assertTrue(not "description" in msg[0])
275 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
276 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
278 # Checking whether DN has changed...
279 msg = self.ldb.search(expression="(cn=Niemand2)")
280 self.assertEquals(len(msg), 1)
281 self.assertEquals(str(msg[0].dn),
282 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
285 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
287 # Checking whether record is gone...
288 msg = self.ldb.search(expression="(cn=Niemand2)")
289 self.assertEquals(len(msg), 0)
292 class MapTestCase(MapBaseTestCase):
295 super(MapTestCase, self).setUp()
296 ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
297 self.templates.setup_data("provision_samba3sam_templates.ldif")
298 ldif = read_datafile("provision_samba3sam.ldif")
299 ldb.add_ldif(self.samba4.subst(ldif))
300 self.setup_modules(ldb, self.samba3, self.samba4)
302 self.ldb = Ldb(self.ldburl, lp=cmdline_loadparm)
304 def test_map_search(self):
305 """Running search tests on mapped data."""
307 "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
308 "objectclass": ["sambaDomain", "top"],
309 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
310 "sambaNextRid": "2000",
311 "sambaDomainName": "TESTS"
314 # Add a set of split records
315 self.ldb.add_ldif("""
316 dn: """+ self.samba4.dn("cn=X") + """
325 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
329 "dn": self.samba4.dn("cn=Y"),
330 "objectClass": "top",
340 "dn": self.samba4.dn("cn=Z"),
341 "objectClass": "top",
350 # Add a set of remote records
353 "dn": self.samba3.dn("cn=A"),
354 "objectClass": "posixAccount",
357 "sambaBadPasswordCount": "x",
358 "sambaLogonTime": "x",
360 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
361 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
364 "dn": self.samba3.dn("cn=B"),
365 "objectClass": "top",
368 "sambaBadPasswordCount": "x",
369 "sambaLogonTime": "y",
373 "dn": self.samba3.dn("cn=C"),
374 "objectClass": "top",
377 "sambaBadPasswordCount": "y",
378 "sambaLogonTime": "z",
381 # Testing search by DN
383 # Search remote record by local DN
384 dn = self.samba4.dn("cn=A")
385 res = self.ldb.search(dn, scope=SCOPE_BASE,
386 attrs=["dnsHostName", "lastLogon"])
387 self.assertEquals(len(res), 1)
388 self.assertEquals(str(res[0].dn), dn)
389 self.assertTrue(not "dnsHostName" in res[0])
390 self.assertEquals(str(res[0]["lastLogon"]), "x")
392 # Search remote record by remote DN
393 dn = self.samba3.dn("cn=A")
394 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
395 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
396 self.assertEquals(len(res), 1)
397 self.assertEquals(str(res[0].dn), dn)
398 self.assertTrue(not "dnsHostName" in res[0])
399 self.assertTrue(not "lastLogon" in res[0])
400 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
402 # Search split record by local DN
403 dn = self.samba4.dn("cn=X")
404 res = self.ldb.search(dn, scope=SCOPE_BASE,
405 attrs=["dnsHostName", "lastLogon"])
406 self.assertEquals(len(res), 1)
407 self.assertEquals(str(res[0].dn), dn)
408 self.assertEquals(str(res[0]["dnsHostName"]), "x")
409 self.assertEquals(str(res[0]["lastLogon"]), "x")
411 # Search split record by remote DN
412 dn = self.samba3.dn("cn=X")
413 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
414 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
415 self.assertEquals(len(res), 1)
416 self.assertEquals(str(res[0].dn), dn)
417 self.assertTrue(not "dnsHostName" in res[0])
418 self.assertTrue(not "lastLogon" in res[0])
419 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
421 # Testing search by attribute
423 # Search by ignored attribute
424 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
425 attrs=["dnsHostName", "lastLogon"])
426 self.assertEquals(len(res), 2)
427 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
428 self.assertEquals(str(res[0]["dnsHostName"]), "y")
429 self.assertEquals(str(res[0]["lastLogon"]), "y")
430 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
431 self.assertEquals(str(res[1]["dnsHostName"]), "x")
432 self.assertEquals(str(res[1]["lastLogon"]), "x")
434 # Search by kept attribute
435 res = self.ldb.search(expression="(description=y)",
436 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
437 self.assertEquals(len(res), 2)
438 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
439 self.assertEquals(str(res[0]["dnsHostName"]), "z")
440 self.assertEquals(str(res[0]["lastLogon"]), "z")
441 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
442 self.assertTrue(not "dnsHostName" in res[1])
443 self.assertEquals(str(res[1]["lastLogon"]), "z")
445 # Search by renamed attribute
446 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
447 attrs=["dnsHostName", "lastLogon"])
448 self.assertEquals(len(res), 2)
449 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
450 self.assertTrue(not "dnsHostName" in res[0])
451 self.assertEquals(str(res[0]["lastLogon"]), "y")
452 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
453 self.assertTrue(not "dnsHostName" in res[1])
454 self.assertEquals(str(res[1]["lastLogon"]), "x")
456 # Search by converted attribute
458 # Using the SID directly in the parse tree leads to conversion
459 # errors, letting the search fail with no results.
460 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
461 res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
462 self.assertEquals(len(res), 3)
463 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
464 self.assertEquals(str(res[0]["dnsHostName"]), "x")
465 self.assertEquals(str(res[0]["lastLogon"]), "x")
466 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
468 self.assertTrue("objectSid" in res[0])
469 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
470 self.assertTrue(not "dnsHostName" in res[1])
471 self.assertEquals(str(res[1]["lastLogon"]), "x")
472 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
474 self.assertTrue("objectSid" in res[1])
476 # Search by generated attribute
477 # In most cases, this even works when the mapping is missing
478 # a `convert_operator' by enumerating the remote db.
479 res = self.ldb.search(expression="(primaryGroupID=512)",
480 attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
481 self.assertEquals(len(res), 1)
482 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
483 self.assertTrue(not "dnsHostName" in res[0])
484 self.assertEquals(str(res[0]["lastLogon"]), "x")
485 self.assertEquals(str(res[0]["primaryGroupID"]), "512")
487 # Note that Xs "objectSid" seems to be fine in the previous search for
489 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
490 #print len(res) + " results found"
491 #for i in range(len(res)):
492 # for (obj in res[i]) {
493 # print obj + ": " + res[i][obj]
498 # Search by remote name of renamed attribute */
499 res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
500 attrs=["dnsHostName", "lastLogon"])
501 self.assertEquals(len(res), 0)
503 # Search by objectClass
504 attrs = ["dnsHostName", "lastLogon", "objectClass"]
505 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
506 self.assertEquals(len(res), 2)
507 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
508 self.assertEquals(str(res[0]["dnsHostName"]), "x")
509 self.assertEquals(str(res[0]["lastLogon"]), "x")
510 self.assertEquals(str(res[0]["objectClass"][0]), "user")
511 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
512 self.assertTrue(not "dnsHostName" in res[1])
513 self.assertEquals(str(res[1]["lastLogon"]), "x")
514 self.assertEquals(str(res[1]["objectClass"][0]), "user")
516 # Prove that the objectClass is actually used for the search
517 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
519 self.assertEquals(len(res), 3)
520 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
521 self.assertTrue(not "dnsHostName" in res[0])
522 self.assertEquals(str(res[0]["lastLogon"]), "y")
523 self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
524 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
525 self.assertEquals(str(res[1]["dnsHostName"]), "x")
526 self.assertEquals(str(res[1]["lastLogon"]), "x")
527 self.assertEquals(str(res[1]["objectClass"][0]), "user")
528 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
529 self.assertTrue(not "dnsHostName" in res[2])
530 self.assertEquals(str(res[2]["lastLogon"]), "x")
531 self.assertEquals(res[2]["objectClass"][0], "user")
533 # Testing search by parse tree
535 # Search by conjunction of local attributes
536 res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
537 attrs=["dnsHostName", "lastLogon"])
538 self.assertEquals(len(res), 2)
539 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
540 self.assertEquals(str(res[0]["dnsHostName"]), "y")
541 self.assertEquals(str(res[0]["lastLogon"]), "y")
542 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
543 self.assertEquals(str(res[1]["dnsHostName"]), "x")
544 self.assertEquals(str(res[1]["lastLogon"]), "x")
546 # Search by conjunction of remote attributes
547 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
548 attrs=["dnsHostName", "lastLogon"])
549 self.assertEquals(len(res), 2)
550 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
551 self.assertEquals(str(res[0]["dnsHostName"]), "x")
552 self.assertEquals(str(res[0]["lastLogon"]), "x")
553 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
554 self.assertTrue(not "dnsHostName" in res[1])
555 self.assertEquals(str(res[1]["lastLogon"]), "x")
557 # Search by conjunction of local and remote attribute
558 res = self.ldb.search(expression="(&(codePage=x)(description=x))",
559 attrs=["dnsHostName", "lastLogon"])
560 self.assertEquals(len(res), 2)
561 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
562 self.assertEquals(str(res[0]["dnsHostName"]), "y")
563 self.assertEquals(str(res[0]["lastLogon"]), "y")
564 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
565 self.assertEquals(str(res[1]["dnsHostName"]), "x")
566 self.assertEquals(str(res[1]["lastLogon"]), "x")
568 # Search by conjunction of local and remote attribute w/o match
569 attrs = ["dnsHostName", "lastLogon"]
570 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
572 self.assertEquals(len(res), 0)
573 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
575 self.assertEquals(len(res), 0)
577 # Search by disjunction of local attributes
578 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
579 attrs=["dnsHostName", "lastLogon"])
580 self.assertEquals(len(res), 2)
581 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
582 self.assertEquals(str(res[0]["dnsHostName"]), "y")
583 self.assertEquals(str(res[0]["lastLogon"]), "y")
584 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
585 self.assertEquals(str(res[1]["dnsHostName"]), "x")
586 self.assertEquals(str(res[1]["lastLogon"]), "x")
588 # Search by disjunction of remote attributes
589 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
590 attrs=["dnsHostName", "lastLogon"])
591 self.assertEquals(len(res), 3)
592 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
593 self.assertFalse("dnsHostName" in res[0])
594 self.assertEquals(str(res[0]["lastLogon"]), "y")
595 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
596 self.assertEquals(str(res[1]["dnsHostName"]), "x")
597 self.assertEquals(str(res[1]["lastLogon"]), "x")
598 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
599 self.assertFalse("dnsHostName" in res[2])
600 self.assertEquals(str(res[2]["lastLogon"]), "x")
602 # Search by disjunction of local and remote attribute
603 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
604 attrs=["dnsHostName", "lastLogon"])
605 self.assertEquals(len(res), 3)
606 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
607 self.assertEquals(str(res[0]["dnsHostName"]), "y")
608 self.assertEquals(str(res[0]["lastLogon"]), "y")
609 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
610 self.assertFalse("dnsHostName" in res[1])
611 self.assertEquals(str(res[1]["lastLogon"]), "y")
612 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
613 self.assertEquals(str(res[2]["dnsHostName"]), "x")
614 self.assertEquals(str(res[2]["lastLogon"]), "x")
616 # Search by disjunction of local and remote attribute w/o match
617 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
618 attrs=["dnsHostName", "lastLogon"])
619 self.assertEquals(len(res), 0)
621 # Search by negated local attribute
622 res = self.ldb.search(expression="(!(revision=x))",
623 attrs=["dnsHostName", "lastLogon"])
624 self.assertEquals(len(res), 5)
625 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
626 self.assertTrue(not "dnsHostName" in res[0])
627 self.assertEquals(str(res[0]["lastLogon"]), "y")
628 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
629 self.assertTrue(not "dnsHostName" in res[1])
630 self.assertEquals(str(res[1]["lastLogon"]), "x")
631 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
632 self.assertEquals(str(res[2]["dnsHostName"]), "z")
633 self.assertEquals(str(res[2]["lastLogon"]), "z")
634 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
635 self.assertTrue(not "dnsHostName" in res[3])
636 self.assertEquals(str(res[3]["lastLogon"]), "z")
638 # Search by negated remote attribute
639 res = self.ldb.search(expression="(!(description=x))",
640 attrs=["dnsHostName", "lastLogon"])
641 self.assertEquals(len(res), 3)
642 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
643 self.assertEquals(str(res[0]["dnsHostName"]), "z")
644 self.assertEquals(str(res[0]["lastLogon"]), "z")
645 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
646 self.assertTrue(not "dnsHostName" in res[1])
647 self.assertEquals(str(res[1]["lastLogon"]), "z")
649 # Search by negated conjunction of local attributes
650 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
651 attrs=["dnsHostName", "lastLogon"])
652 self.assertEquals(len(res), 5)
653 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
654 self.assertTrue(not "dnsHostName" in res[0])
655 self.assertEquals(str(res[0]["lastLogon"]), "y")
656 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
657 self.assertTrue(not "dnsHostName" in res[1])
658 self.assertEquals(str(res[1]["lastLogon"]), "x")
659 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
660 self.assertEquals(str(res[2]["dnsHostName"]), "z")
661 self.assertEquals(str(res[2]["lastLogon"]), "z")
662 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
663 self.assertTrue(not "dnsHostName" in res[3])
664 self.assertEquals(str(res[3]["lastLogon"]), "z")
666 # Search by negated conjunction of remote attributes
667 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
668 attrs=["dnsHostName", "lastLogon"])
669 self.assertEquals(len(res), 5)
670 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
671 self.assertEquals(str(res[0]["dnsHostName"]), "y")
672 self.assertEquals(str(res[0]["lastLogon"]), "y")
673 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
674 self.assertTrue(not "dnsHostName" in res[1])
675 self.assertEquals(str(res[1]["lastLogon"]), "y")
676 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
677 self.assertEquals(str(res[2]["dnsHostName"]), "z")
678 self.assertEquals(str(res[2]["lastLogon"]), "z")
679 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
680 self.assertTrue(not "dnsHostName" in res[3])
681 self.assertEquals(str(res[3]["lastLogon"]), "z")
683 # Search by negated conjunction of local and remote attribute
684 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
685 attrs=["dnsHostName", "lastLogon"])
686 self.assertEquals(len(res), 5)
687 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
688 self.assertTrue(not "dnsHostName" in res[0])
689 self.assertEquals(str(res[0]["lastLogon"]), "y")
690 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
691 self.assertTrue(not "dnsHostName" in res[1])
692 self.assertEquals(str(res[1]["lastLogon"]), "x")
693 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
694 self.assertEquals(str(res[2]["dnsHostName"]), "z")
695 self.assertEquals(str(res[2]["lastLogon"]), "z")
696 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
697 self.assertTrue(not "dnsHostName" in res[3])
698 self.assertEquals(str(res[3]["lastLogon"]), "z")
700 # Search by negated disjunction of local attributes
701 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
702 attrs=["dnsHostName", "lastLogon"])
703 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
704 self.assertTrue(not "dnsHostName" in res[0])
705 self.assertEquals(str(res[0]["lastLogon"]), "y")
706 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
707 self.assertTrue(not "dnsHostName" in res[1])
708 self.assertEquals(str(res[1]["lastLogon"]), "x")
709 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
710 self.assertEquals(str(res[2]["dnsHostName"]), "z")
711 self.assertEquals(str(res[2]["lastLogon"]), "z")
712 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
713 self.assertTrue(not "dnsHostName" in res[3])
714 self.assertEquals(str(res[3]["lastLogon"]), "z")
716 # Search by negated disjunction of remote attributes
717 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
718 attrs=["dnsHostName", "lastLogon"])
719 self.assertEquals(len(res), 4)
720 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
721 self.assertEquals(str(res[0]["dnsHostName"]), "y")
722 self.assertEquals(str(res[0]["lastLogon"]), "y")
723 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
724 self.assertEquals(str(res[1]["dnsHostName"]), "z")
725 self.assertEquals(str(res[1]["lastLogon"]), "z")
726 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
727 self.assertTrue(not "dnsHostName" in res[2])
728 self.assertEquals(str(res[2]["lastLogon"]), "z")
730 # Search by negated disjunction of local and remote attribute
731 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
732 attrs=["dnsHostName", "lastLogon"])
733 self.assertEquals(len(res), 4)
734 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
735 self.assertTrue(not "dnsHostName" in res[0])
736 self.assertEquals(str(res[0]["lastLogon"]), "x")
737 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
738 self.assertEquals(str(res[1]["dnsHostName"]), "z")
739 self.assertEquals(str(res[1]["lastLogon"]), "z")
740 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
741 self.assertTrue(not "dnsHostName" in res[2])
742 self.assertEquals(str(res[2]["lastLogon"]), "z")
744 # Search by complex parse tree
745 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
746 self.assertEquals(len(res), 6)
747 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
748 self.assertTrue(not "dnsHostName" in res[0])
749 self.assertEquals(str(res[0]["lastLogon"]), "y")
750 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
751 self.assertEquals(str(res[1]["dnsHostName"]), "x")
752 self.assertEquals(str(res[1]["lastLogon"]), "x")
753 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
754 self.assertTrue(not "dnsHostName" in res[2])
755 self.assertEquals(str(res[2]["lastLogon"]), "x")
756 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
757 self.assertEquals(str(res[3]["dnsHostName"]), "z")
758 self.assertEquals(str(res[3]["lastLogon"]), "z")
759 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
760 self.assertTrue(not "dnsHostName" in res[4])
761 self.assertEquals(str(res[4]["lastLogon"]), "z")
764 dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
768 def test_map_modify_local(self):
769 """Modification of local records."""
771 dn = "cn=test,dc=idealx,dc=org"
772 self.ldb.add({"dn": dn,
776 "description": "test"})
778 attrs = ["foo", "revision", "description"]
779 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
780 self.assertEquals(len(res), 1)
781 self.assertEquals(str(res[0].dn), dn)
782 self.assertEquals(str(res[0]["foo"]), "bar")
783 self.assertEquals(str(res[0]["revision"]), "1")
784 self.assertEquals(str(res[0]["description"]), "test")
785 # Check it's not in the local db
786 res = self.samba4.db.search(expression="(cn=test)",
787 scope=SCOPE_DEFAULT, attrs=attrs)
788 self.assertEquals(len(res), 0)
789 # Check it's not in the remote db
790 res = self.samba3.db.search(expression="(cn=test)",
791 scope=SCOPE_DEFAULT, attrs=attrs)
792 self.assertEquals(len(res), 0)
794 # Modify local record
802 self.ldb.modify_ldif(ldif)
804 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
805 self.assertEquals(len(res), 1)
806 self.assertEquals(str(res[0].dn), dn)
807 self.assertEquals(str(res[0]["foo"]), "baz")
808 self.assertEquals(str(res[0]["revision"]), "1")
809 self.assertEquals(str(res[0]["description"]), "foo")
811 # Rename local record
812 dn2 = "cn=toast,dc=idealx,dc=org"
813 self.ldb.rename(dn, dn2)
815 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
816 self.assertEquals(len(res), 1)
817 self.assertEquals(str(res[0].dn), dn2)
818 self.assertEquals(str(res[0]["foo"]), "baz")
819 self.assertEquals(str(res[0]["revision"]), "1")
820 self.assertEquals(str(res[0]["description"]), "foo")
822 # Delete local record
825 res = self.ldb.search(dn2, scope=SCOPE_BASE)
826 self.assertEquals(len(res), 0)
828 def test_map_modify_remote_remote(self):
829 """Modification of remote data of remote records"""
831 dn = self.samba4.dn("cn=test")
832 dn2 = self.samba3.dn("cn=test")
833 self.samba3.db.add({"dn": dn2,
835 "description": "foo",
836 "sambaBadPasswordCount": "3",
837 "sambaNextRid": "1001"})
839 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
840 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
841 self.assertEquals(len(res), 1)
842 self.assertEquals(str(res[0].dn), dn2)
843 self.assertEquals(str(res[0]["description"]), "foo")
844 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
845 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
847 attrs = ["description", "badPwdCount", "nextRid"]
848 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
849 self.assertEquals(len(res), 1)
850 self.assertEquals(str(res[0].dn), dn)
851 self.assertEquals(str(res[0]["description"]), "foo")
852 self.assertEquals(str(res[0]["badPwdCount"]), "3")
853 self.assertEquals(str(res[0]["nextRid"]), "1001")
855 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
856 self.assertEquals(len(res), 0)
858 # Modify remote data of remote record
866 self.ldb.modify_ldif(ldif)
868 res = self.ldb.search(dn, scope=SCOPE_BASE,
869 attrs=["description", "badPwdCount", "nextRid"])
870 self.assertEquals(len(res), 1)
871 self.assertEquals(str(res[0].dn), dn)
872 self.assertEquals(str(res[0]["description"]), "test")
873 self.assertEquals(str(res[0]["badPwdCount"]), "4")
874 self.assertEquals(str(res[0]["nextRid"]), "1001")
876 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
877 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
878 self.assertEquals(len(res), 1)
879 self.assertEquals(str(res[0].dn), dn2)
880 self.assertEquals(str(res[0]["description"]), "test")
881 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
882 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
884 # Rename remote record
885 dn2 = self.samba4.dn("cn=toast")
886 self.ldb.rename(dn, dn2)
889 res = self.ldb.search(dn, scope=SCOPE_BASE,
890 attrs=["description", "badPwdCount", "nextRid"])
891 self.assertEquals(len(res), 1)
892 self.assertEquals(str(res[0].dn), dn)
893 self.assertEquals(str(res[0]["description"]), "test")
894 self.assertEquals(str(res[0]["badPwdCount"]), "4")
895 self.assertEquals(str(res[0]["nextRid"]), "1001")
897 dn2 = self.samba3.dn("cn=toast")
898 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
899 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
900 self.assertEquals(len(res), 1)
901 self.assertEquals(str(res[0].dn), dn2)
902 self.assertEquals(str(res[0]["description"]), "test")
903 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
904 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
906 # Delete remote record
908 # Check in mapped db that it's removed
909 res = self.ldb.search(dn, scope=SCOPE_BASE)
910 self.assertEquals(len(res), 0)
912 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
913 self.assertEquals(len(res), 0)
915 def test_map_modify_remote_local(self):
916 """Modification of local data of remote records"""
917 # Add remote record (same as before)
918 dn = self.samba4.dn("cn=test")
919 dn2 = self.samba3.dn("cn=test")
920 self.samba3.db.add({"dn": dn2,
922 "description": "foo",
923 "sambaBadPasswordCount": "3",
924 "sambaNextRid": "1001"})
926 # Modify local data of remote record
935 self.ldb.modify_ldif(ldif)
937 attrs = ["revision", "description"]
938 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
939 self.assertEquals(len(res), 1)
940 self.assertEquals(str(res[0].dn), dn)
941 self.assertEquals(str(res[0]["description"]), "test")
942 self.assertEquals(str(res[0]["revision"]), "1")
944 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
945 self.assertEquals(len(res), 1)
946 self.assertEquals(str(res[0].dn), dn2)
947 self.assertEquals(str(res[0]["description"]), "test")
948 self.assertTrue(not "revision" in res[0])
950 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
951 self.assertEquals(len(res), 1)
952 self.assertEquals(str(res[0].dn), dn)
953 self.assertTrue(not "description" in res[0])
954 self.assertEquals(str(res[0]["revision"]), "1")
956 # Delete (newly) split record
959 def test_map_modify_split(self):
960 """Testing modification of split records"""
962 dn = self.samba4.dn("cn=test")
963 dn2 = self.samba3.dn("cn=test")
967 "description": "foo",
972 attrs = ["description", "badPwdCount", "nextRid", "revision"]
973 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
974 self.assertEquals(len(res), 1)
975 self.assertEquals(str(res[0].dn), dn)
976 self.assertEquals(str(res[0]["description"]), "foo")
977 self.assertEquals(str(res[0]["badPwdCount"]), "3")
978 self.assertEquals(str(res[0]["nextRid"]), "1001")
979 self.assertEquals(str(res[0]["revision"]), "1")
981 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
982 self.assertEquals(len(res), 1)
983 self.assertEquals(str(res[0].dn), dn)
984 self.assertTrue(not "description" in res[0])
985 self.assertTrue(not "badPwdCount" in res[0])
986 self.assertTrue(not "nextRid" in res[0])
987 self.assertEquals(str(res[0]["revision"]), "1")
989 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
991 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
992 self.assertEquals(len(res), 1)
993 self.assertEquals(str(res[0].dn), dn2)
994 self.assertEquals(str(res[0]["description"]), "foo")
995 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
996 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
997 self.assertTrue(not "revision" in res[0])
999 # Modify of split record
1002 replace: description
1004 replace: badPwdCount
1009 self.ldb.modify_ldif(ldif)
1010 # Check in mapped db
1011 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1012 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1013 self.assertEquals(len(res), 1)
1014 self.assertEquals(str(res[0].dn), dn)
1015 self.assertEquals(str(res[0]["description"]), "test")
1016 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1017 self.assertEquals(str(res[0]["nextRid"]), "1001")
1018 self.assertEquals(str(res[0]["revision"]), "2")
1020 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1021 self.assertEquals(len(res), 1)
1022 self.assertEquals(str(res[0].dn), dn)
1023 self.assertTrue(not "description" in res[0])
1024 self.assertTrue(not "badPwdCount" in res[0])
1025 self.assertTrue(not "nextRid" in res[0])
1026 self.assertEquals(str(res[0]["revision"]), "2")
1027 # Check in remote db
1028 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1030 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1031 self.assertEquals(len(res), 1)
1032 self.assertEquals(str(res[0].dn), dn2)
1033 self.assertEquals(str(res[0]["description"]), "test")
1034 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1035 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1036 self.assertTrue(not "revision" in res[0])
1038 # Rename split record
1039 dn2 = self.samba4.dn("cn=toast")
1040 self.ldb.rename(dn, dn2)
1041 # Check in mapped db
1043 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1044 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1045 self.assertEquals(len(res), 1)
1046 self.assertEquals(str(res[0].dn), dn)
1047 self.assertEquals(str(res[0]["description"]), "test")
1048 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1049 self.assertEquals(str(res[0]["nextRid"]), "1001")
1050 self.assertEquals(str(res[0]["revision"]), "2")
1052 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1053 self.assertEquals(len(res), 1)
1054 self.assertEquals(str(res[0].dn), dn)
1055 self.assertTrue(not "description" in res[0])
1056 self.assertTrue(not "badPwdCount" in res[0])
1057 self.assertTrue(not "nextRid" in res[0])
1058 self.assertEquals(str(res[0]["revision"]), "2")
1059 # Check in remote db
1060 dn2 = self.samba3.dn("cn=toast")
1061 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1062 attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1064 self.assertEquals(len(res), 1)
1065 self.assertEquals(str(res[0].dn), dn2)
1066 self.assertEquals(str(res[0]["description"]), "test")
1067 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1068 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1069 self.assertTrue(not "revision" in res[0])
1071 # Delete split record
1073 # Check in mapped db
1074 res = self.ldb.search(dn, scope=SCOPE_BASE)
1075 self.assertEquals(len(res), 0)
1077 res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1078 self.assertEquals(len(res), 0)
1079 # Check in remote db
1080 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1081 self.assertEquals(len(res), 0)