3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
29 from ldb import SCOPE_DEFAULT, SCOPE_BASE, SCOPE_SUBTREE
30 from samba import Ldb, substitute_var
31 from samba.tests import LdbTestCase, TestCaseInTempDir
33 datadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3")
35 def ldb_debug(l, text):
39 class MapBaseTestCase(TestCaseInTempDir):
40 """Base test case for mapping tests."""
42 def setup_modules(self, ldb, s3, s4):
43 ldb.add({"dn": "@MAP=samba3sam",
45 "@TO": "sambaDomainName=TESTS," + s3.basedn})
47 ldb.add({"dn": "@MODULES",
48 "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
50 ldb.add({"dn": "@PARTITION",
51 "partition": [s4.basedn + ":" + s4.url, s3.basedn + ":" + s3.url],
52 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"]})
56 super(MapBaseTestCase, self).setUp()
58 def make_dn(basedn, rdn):
59 return rdn + ",sambaDomainName=TESTS," + basedn
61 def make_s4dn(basedn, rdn):
62 return rdn + "," + basedn
64 self.ldbfile = os.path.join(self.tempdir, "test.ldb")
65 self.ldburl = "tdb://" + self.ldbfile
67 tempdir = self.tempdir
70 """Simple helper class that contains data for a specific SAM connection."""
71 def __init__(self, file, basedn, dn):
72 self.file = os.path.join(tempdir, file)
73 self.url = "tdb://" + self.file
75 self.substvars = {"BASEDN": self.basedn}
80 return self._dn(rdn, self.basedn)
83 return self.db.connect(self.url)
85 def setup_data(self, path):
86 ldif = open(os.path.join(datadir, path), 'r').read()
89 def subst(self, text):
90 return substitute_var(text, self.substvars)
92 def add_ldif(self, ldif):
93 self.db.add_ldif(self.subst(ldif))
95 def modify_ldif(self, ldif):
96 self.db.modify_ldif(self.subst(ldif))
98 self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
99 self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
100 self.templates = Target("templates.ldb", "cn=templates", None)
102 self.samba3.connect()
103 self.templates.connect()
104 self.samba4.connect()
107 os.unlink(self.ldbfile)
108 os.unlink(self.samba3.file)
109 os.unlink(self.templates.file)
110 os.unlink(self.samba4.file)
111 super(MapBaseTestCase, self).tearDown()
114 class Samba3SamTestCase(MapBaseTestCase):
116 super(Samba3SamTestCase, self).setUp()
117 ldb = Ldb(self.ldburl)
118 self.samba3.setup_data("samba3.ldif")
119 self.templates.setup_data("provision_samba3sam_templates.ldif")
120 ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
121 ldb.add_ldif(self.samba4.subst(ldif))
122 self.setup_modules(ldb, self.samba3, self.samba4)
123 self.ldb = Ldb(self.ldburl)
125 def test_search_non_mapped(self):
126 """Looking up by non-mapped attribute"""
127 msg = self.ldb.search(expression="(cn=Administrator)")
128 self.assertEquals(len(msg), 1)
129 self.assertEquals(msg[0]["cn"], "Administrator")
131 def test_search_non_mapped(self):
132 """Looking up by mapped attribute"""
133 msg = self.ldb.search(expression="(name=Backup Operators)")
134 self.assertEquals(len(msg), 1)
135 self.assertEquals(msg[0]["name"], "Backup Operators")
137 def test_old_name_of_renamed(self):
138 """Looking up by old name of renamed attribute"""
139 msg = self.ldb.search(expression="(displayName=Backup Operators)")
140 self.assertEquals(len(msg), 0)
142 def test_mapped_containing_sid(self):
143 """Looking up mapped entry containing SID"""
144 msg = self.ldb.search(expression="(cn=Replicator)")
145 self.assertEquals(len(msg), 1)
146 self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
147 self.assertTrue("objectSid" in msg[0])
148 # FIXME: NDR unpack msg[0]["objectSid"] before comparing:
149 # self.assertEquals(msg[0]["objectSid"],
150 # "S-1-5-21-4231626423-2410014848-2360679739-552")
151 # Check mapping of objectClass
152 oc = set(msg[0]["objectClass"])
153 self.assertTrue(oc is not None)
154 self.assertEquals(oc, set(["group"]))
156 def test_search_by_objclass(self):
157 """Looking up by objectClass"""
158 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
159 self.assertEquals(set([str(m.dn) for m in msg]),
160 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
162 def test_s3sam_modify(self):
163 # Adding a record that will be fallbacked
164 self.ldb.add({"dn": "cn=Foo",
168 "showInAdvancedViewOnly": "TRUE"}
171 # Checking for existence of record (local)
172 # TODO: This record must be searched in the local database, which is currently only supported for base searches
173 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
174 # TODO: Actually, this version should work as well but doesn't...
177 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
179 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
180 self.assertEquals(len(msg), 1)
181 self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
182 self.assertEquals(msg[0]["foo"], "bar")
183 self.assertEquals(msg[0]["blah"], "Blie")
185 # Adding record that will be mapped
186 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
187 "objectClass": "user",
189 "sambaUnicodePwd": "geheim",
192 # Checking for existence of record (remote)
193 msg = self.ldb.search(expression="(unixName=bin)",
194 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
195 self.assertEquals(len(msg), 1)
196 self.assertEquals(msg[0]["cn"], "Niemand")
197 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
199 # Checking for existence of record (local && remote)
200 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
201 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
202 self.assertEquals(len(msg), 1) # TODO: should check with more records
203 self.assertEquals(msg[0]["cn"], "Niemand")
204 self.assertEquals(msg[0]["unixName"], "bin")
205 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
207 # Checking for existence of record (local || remote)
208 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
209 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
210 #print "got %d replies" % len(msg)
211 self.assertEquals(len(msg), 1) # TODO: should check with more records
212 self.assertEquals(msg[0]["cn"], "Niemand")
213 self.assertEquals(msg[0]["unixName"], "bin")
214 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
216 # Checking for data in destination database
217 msg = self.samba3.db.search(expression="(cn=Niemand)")
218 self.assertTrue(len(msg) >= 1)
219 self.assertEquals(msg[0]["sambaSID"],
220 "S-1-5-21-4231626423-2410014848-2360679739-2001")
221 self.assertEquals(msg[0]["displayName"], "Niemand")
223 # Adding attribute...
224 self.ldb.modify_ldif("""
225 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
231 # Checking whether changes are still there...
232 msg = self.ldb.search(expression="(cn=Niemand)")
233 self.assertTrue(len(msg) >= 1)
234 self.assertEquals(msg[0]["cn"], "Niemand")
235 self.assertEquals(msg[0]["description"], "Blah")
237 # Modifying attribute...
238 self.ldb.modify_ldif("""
239 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
245 # Checking whether changes are still there...
246 msg = self.ldb.search(expression="(cn=Niemand)")
247 self.assertTrue(len(msg) >= 1)
248 self.assertEquals(msg[0]["description"], "Blie")
250 # Deleting attribute...
251 self.ldb.modify_ldif("""
252 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
257 # Checking whether changes are no longer there...
258 msg = self.ldb.search(expression="(cn=Niemand)")
259 self.assertTrue(len(msg) >= 1)
260 self.assertTrue(not "description" in msg[0])
263 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
264 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
266 # Checking whether DN has changed...
267 msg = self.ldb.search(expression="(cn=Niemand2)")
268 self.assertEquals(len(msg), 1)
269 self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
272 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
274 # Checking whether record is gone...
275 msg = self.ldb.search(expression="(cn=Niemand2)")
276 self.assertEquals(len(msg), 0)
280 class MapTestCase(MapBaseTestCase):
282 super(MapTestCase, self).setUp()
283 ldb = Ldb(self.ldburl)
284 self.samba3.setup_data("samba3.ldif")
285 self.templates.setup_data("provision_samba3sam_templates.ldif")
286 ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
287 ldb.add_ldif(self.samba4.subst(ldif))
288 self.setup_modules(ldb, self.samba3, self.samba4)
289 self.ldb = Ldb(self.ldburl)
291 def test_map_search(self):
292 """Running search tests on mapped data."""
294 dn: sambaDomainName=TESTS,""" + self.samba3.basedn + """
295 objectclass: sambaDomain
297 sambaSID: S-1-5-21-4231626423-2410014848-2360679739
299 sambaDomainName: TESTS"""
300 self.samba3.add_ldif(ldif)
302 # Add a set of split records
304 dn: """ + self.samba4.dn("cn=X") + """
313 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
314 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
317 self.ldb.add_ldif(self.samba4.subst(ldif))
320 dn: """ + self.samba4.dn("cn=Y") + """
330 self.ldb.add_ldif(self.samba4.subst(ldif))
333 dn: """ + self.samba4.dn("cn=Z") + """
344 self.ldb.add_ldif(self.samba4.subst(ldif))
346 # Add a set of remote records
349 dn: """ + self.samba3.dn("cn=A") + """
350 objectClass: posixAccount
353 sambaBadPasswordCount: x
356 sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552
357 sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512
359 dn: """ + self.samba3.dn("cn=B") + """
363 sambaBadPasswordCount: x
367 dn: """ + self.samba3.dn("cn=C") + """
371 sambaBadPasswordCount: y
375 self.samba3.add_ldif(ldif)
377 # Testing search by DN
379 # Search remote record by local DN
380 dn = self.samba4.dn("cn=A")
381 res = self.ldb.search(dn, scope=SCOPE_BASE,
382 attrs=["dnsHostName", "lastLogon"])
383 self.assertEquals(len(res), 1)
384 self.assertEquals(str(str(res[0].dn)), dn)
385 self.assertTrue(not "dnsHostName" in res[0])
386 self.assertEquals(res[0]["lastLogon"], "x")
388 # Search remote record by remote DN
389 dn = self.samba3.dn("cn=A")
390 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
391 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
392 self.assertEquals(len(res), 1)
393 self.assertEquals(str(str(res[0].dn)), dn)
394 self.assertTrue(not "dnsHostName" in res[0])
395 self.assertTrue(not "lastLogon" in res[0])
396 self.assertEquals(res[0]["sambaLogonTime"], "x")
398 # Search split record by local DN
399 dn = self.samba4.dn("cn=X")
400 res = self.ldb.search(dn, scope=SCOPE_BASE,
401 attrs=["dnsHostName", "lastLogon"])
402 self.assertEquals(len(res), 1)
403 self.assertEquals(str(str(res[0].dn)), dn)
404 self.assertEquals(res[0]["dnsHostName"], "x")
405 self.assertEquals(res[0]["lastLogon"], "x")
407 # Search split record by remote DN
408 dn = self.samba3.dn("cn=X")
409 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
410 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
411 self.assertEquals(len(res), 1)
412 self.assertEquals(str(str(res[0].dn)), dn)
413 self.assertTrue(not "dnsHostName" in res[0])
414 self.assertTrue(not "lastLogon" in res[0])
415 self.assertEquals(res[0]["sambaLogonTime"], "x")
417 # Testing search by attribute
419 # Search by ignored attribute
420 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
421 attrs=["dnsHostName", "lastLogon"])
422 self.assertEquals(len(res), 2)
423 self.assertEquals(str(str(res[0].dn)), self.samba4.dn("cn=Y"))
424 self.assertEquals(res[0]["dnsHostName"], "y")
425 self.assertEquals(res[0]["lastLogon"], "y")
426 self.assertEquals(str(str(res[1].dn)), self.samba4.dn("cn=X"))
427 self.assertEquals(res[1]["dnsHostName"], "x")
428 self.assertEquals(res[1]["lastLogon"], "x")
430 # Search by kept attribute
431 res = self.ldb.search(expression="(description=y)",
432 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
433 self.assertEquals(len(res), 2)
434 self.assertEquals(str(str(res[0].dn)), self.samba4.dn("cn=Z"))
435 self.assertEquals(res[0]["dnsHostName"], "z")
436 self.assertEquals(res[0]["lastLogon"], "z")
437 self.assertEquals(str(str(res[1].dn)), self.samba4.dn("cn=C"))
438 self.assertTrue(not "dnsHostName" in res[1])
439 self.assertEquals(res[1]["lastLogon"], "z")
441 # Search by renamed attribute
442 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
443 self.assertEquals(len(res), 2)
444 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
445 self.assertTrue(not "dnsHostName" in res[0])
446 self.assertEquals(res[0]["lastLogon"], "y")
447 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
448 self.assertTrue(not "dnsHostName" in res[1])
449 self.assertEquals(res[1]["lastLogon"], "x")
451 # Search by converted attribute
453 # Using the SID directly in the parse tree leads to conversion
454 # errors, letting the search fail with no results.
455 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
456 res = self.ldb.search(expression="(objectSid=*)", attrs=["dnsHostName", "lastLogon", "objectSid"])
457 self.assertEquals(len(res), 3)
458 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
459 self.assertEquals(res[0]["dnsHostName"], "x")
460 self.assertEquals(res[0]["lastLogon"], "x")
461 self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
462 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
463 self.assertTrue(not "dnsHostName" in res[1])
464 self.assertEquals(res[1]["lastLogon"], "x")
465 self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
467 # Search by generated attribute
468 # In most cases, this even works when the mapping is missing
469 # a `convert_operator' by enumerating the remote db.
470 res = self.ldb.search(expression="(primaryGroupID=512)", attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
471 self.assertEquals(len(res), 1)
472 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
473 self.assertTrue(not "dnsHostName" in res[0])
474 self.assertEquals(res[0]["lastLogon"], "x")
475 self.assertEquals(res[0]["primaryGroupID"], "512")
477 # TODO: There should actually be two results, A and X. The
478 # primaryGroupID of X seems to get corrupted somewhere, and the
479 # objectSid isn't available during the generation of remote (!) data,
480 # which can be observed with the following search. Also note that Xs
481 # objectSid seems to be fine in the previous search for objectSid... */
482 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
483 #print len(res) + " results found"
484 #for i in range(len(res)):
485 # for (obj in res[i]) {
486 # print obj + ": " + res[i][obj]
491 # Search by remote name of renamed attribute */
492 res = self.ldb.search(expression="(sambaBadPasswordCount=*)", attrs=["dnsHostName", "lastLogon"])
493 self.assertEquals(len(res), 0)
495 # Search by objectClass
496 attrs = ["dnsHostName", "lastLogon", "objectClass"]
497 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
498 self.assertEquals(len(res), 2)
499 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
500 self.assertEquals(res[0]["dnsHostName"], "x")
501 self.assertEquals(res[0]["lastLogon"], "x")
502 self.assertTrue(res[0]["objectClass"] is not None)
503 self.assertEquals(res[0]["objectClass"][0], "user")
504 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
505 self.assertTrue(not "dnsHostName" in res[1])
506 self.assertEquals(res[1]["lastLogon"], "x")
507 self.assertTrue(res[1]["objectClass"] is not None)
508 self.assertEquals(res[1]["objectClass"][0], "user")
510 # Prove that the objectClass is actually used for the search
511 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs)
512 self.assertEquals(len(res), 3)
513 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
514 self.assertTrue(not "dnsHostName" in res[0])
515 self.assertEquals(res[0]["lastLogon"], "y")
516 self.assertTrue(res[0]["objectClass"] is not None)
517 self.assertEquals(set(res[0]["objectClass"]), set(["user"]))
518 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
519 self.assertEquals(res[1]["dnsHostName"], "x")
520 self.assertEquals(res[1]["lastLogon"], "x")
521 self.assertTrue(res[1]["objectClass"] is not None)
522 self.assertEquals(res[1]["objectClass"][0], "user")
523 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
524 self.assertTrue(not "dnsHostName" in res[2])
525 self.assertEquals(res[2]["lastLogon"], "x")
526 self.assertTrue(res[2]["objectClass"] is not None)
527 self.assertEquals(res[2]["objectClass"][0], "user")
529 # Testing search by parse tree
531 # Search by conjunction of local attributes
532 res = self.ldb.search(expression="(&(codePage=x)(revision=x))", attrs=["dnsHostName", "lastLogon"])
533 self.assertEquals(len(res), 2)
534 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
535 self.assertEquals(res[0]["dnsHostName"], "y")
536 self.assertEquals(res[0]["lastLogon"], "y")
537 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
538 self.assertEquals(res[1]["dnsHostName"], "x")
539 self.assertEquals(res[1]["lastLogon"], "x")
541 # Search by conjunction of remote attributes
542 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=["dnsHostName", "lastLogon"])
543 self.assertEquals(len(res), 2)
544 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
545 self.assertEquals(res[0]["dnsHostName"], "x")
546 self.assertEquals(res[0]["lastLogon"], "x")
547 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
548 self.assertTrue(not "dnsHostName" in res[1])
549 self.assertEquals(res[1]["lastLogon"], "x")
551 # Search by conjunction of local and remote attribute
552 res = self.ldb.search(expression="(&(codePage=x)(description=x))", attrs=["dnsHostName", "lastLogon"])
553 self.assertEquals(len(res), 2)
554 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
555 self.assertEquals(res[0]["dnsHostName"], "y")
556 self.assertEquals(res[0]["lastLogon"], "y")
557 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
558 self.assertEquals(res[1]["dnsHostName"], "x")
559 self.assertEquals(res[1]["lastLogon"], "x")
561 # Search by conjunction of local and remote attribute w/o match
562 attrs = ["dnsHostName", "lastLogon"]
563 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs)
564 self.assertEquals(len(res), 0)
565 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs)
566 self.assertEquals(len(res), 0)
568 # Search by disjunction of local attributes
569 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=["dnsHostName", "lastLogon"])
570 self.assertEquals(len(res), 2)
571 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
572 self.assertEquals(res[0]["dnsHostName"], "y")
573 self.assertEquals(res[0]["lastLogon"], "y")
574 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
575 self.assertEquals(res[1]["dnsHostName"], "x")
576 self.assertEquals(res[1]["lastLogon"], "x")
578 # Search by disjunction of remote attributes
579 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=["dnsHostName", "lastLogon"])
580 self.assertEquals(len(res), 3)
581 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
582 self.assertTrue("dnsHostName" in res[0])
583 self.assertEquals(res[0]["lastLogon"], "y")
584 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
585 self.assertEquals(res[1]["dnsHostName"], "x")
586 self.assertEquals(res[1]["lastLogon"], "x")
587 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
588 self.assertTrue("dnsHostName" in res[2])
589 self.assertEquals(res[2]["lastLogon"], "x")
591 # Search by disjunction of local and remote attribute
592 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=["dnsHostName", "lastLogon"])
593 self.assertEquals(len(res), 3)
594 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
595 self.assertEquals(res[0]["dnsHostName"], "y")
596 self.assertEquals(res[0]["lastLogon"], "y")
597 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
598 self.assertTrue("dnsHostName" in res[1])
599 self.assertEquals(res[1]["lastLogon"], "y")
600 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
601 self.assertEquals(res[2]["dnsHostName"], "x")
602 self.assertEquals(res[2]["lastLogon"], "x")
604 # Search by disjunction of local and remote attribute w/o match
605 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=["dnsHostName", "lastLogon"])
606 self.assertEquals(len(res), 0)
608 # Search by negated local attribute
609 res = self.ldb.search(expression="(!(revision=x))", attrs=["dnsHostName", "lastLogon"])
610 self.assertEquals(len(res), 5)
611 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
612 self.assertTrue(not "dnsHostName" in res[0])
613 self.assertEquals(res[0]["lastLogon"], "y")
614 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
615 self.assertTrue(not "dnsHostName" in res[1])
616 self.assertEquals(res[1]["lastLogon"], "x")
617 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
618 self.assertEquals(res[2]["dnsHostName"], "z")
619 self.assertEquals(res[2]["lastLogon"], "z")
620 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
621 self.assertTrue(not "dnsHostName" in res[3])
622 self.assertEquals(res[3]["lastLogon"], "z")
624 # Search by negated remote attribute
625 res = self.ldb.search(expression="(!(description=x))", attrs=["dnsHostName", "lastLogon"])
626 self.assertEquals(len(res), 3)
627 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
628 self.assertEquals(res[0]["dnsHostName"], "z")
629 self.assertEquals(res[0]["lastLogon"], "z")
630 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
631 self.assertTrue(not "dnsHostName" in res[1])
632 self.assertEquals(res[1]["lastLogon"], "z")
634 # Search by negated conjunction of local attributes
635 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=["dnsHostName", "lastLogon"])
636 self.assertEquals(len(res), 5)
637 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
638 self.assertTrue(not "dnsHostName" in res[0])
639 self.assertEquals(res[0]["lastLogon"], "y")
640 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
641 self.assertTrue(not "dnsHostName" in res[1])
642 self.assertEquals(res[1]["lastLogon"], "x")
643 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
644 self.assertEquals(res[2]["dnsHostName"], "z")
645 self.assertEquals(res[2]["lastLogon"], "z")
646 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
647 self.assertTrue(not "dnsHostName" in res[3])
648 self.assertEquals(res[3]["lastLogon"], "z")
650 # Search by negated conjunction of remote attributes
651 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=["dnsHostName", "lastLogon"])
652 self.assertEquals(len(res), 5)
653 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
654 self.assertEquals(res[0]["dnsHostName"], "y")
655 self.assertEquals(res[0]["lastLogon"], "y")
656 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
657 self.assertTrue(not "dnsHostName" in res[1])
658 self.assertEquals(res[1]["lastLogon"], "y")
659 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
660 self.assertEquals(res[2]["dnsHostName"], "z")
661 self.assertEquals(res[2]["lastLogon"], "z")
662 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
663 self.assertTrue(not "dnsHostName" in res[3])
664 self.assertEquals(res[3]["lastLogon"], "z")
666 # Search by negated conjunction of local and remote attribute
667 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=["dnsHostName", "lastLogon"])
668 self.assertEquals(len(res), 5)
669 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
670 self.assertTrue(not "dnsHostName" in res[0])
671 self.assertEquals(res[0]["lastLogon"], "y")
672 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
673 self.assertTrue(not "dnsHostName" in res[1])
674 self.assertEquals(res[1]["lastLogon"], "x")
675 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
676 self.assertEquals(res[2]["dnsHostName"], "z")
677 self.assertEquals(res[2]["lastLogon"], "z")
678 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
679 self.assertTrue(not "dnsHostName" in res[3])
680 self.assertEquals(res[3]["lastLogon"], "z")
682 # Search by negated disjunction of local attributes
683 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=["dnsHostName", "lastLogon"])
684 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
685 self.assertTrue(not "dnsHostName" in res[0])
686 self.assertEquals(res[0]["lastLogon"], "y")
687 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
688 self.assertTrue(not "dnsHostName" in res[1])
689 self.assertEquals(res[1]["lastLogon"], "x")
690 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
691 self.assertEquals(res[2]["dnsHostName"], "z")
692 self.assertEquals(res[2]["lastLogon"], "z")
693 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
694 self.assertTrue(not "dnsHostName" in res[3])
695 self.assertEquals(res[3]["lastLogon"], "z")
697 # Search by negated disjunction of remote attributes
698 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=["dnsHostName", "lastLogon"])
699 self.assertEquals(len(res), 4)
700 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
701 self.assertEquals(res[0]["dnsHostName"], "y")
702 self.assertEquals(res[0]["lastLogon"], "y")
703 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
704 self.assertEquals(res[1]["dnsHostName"], "z")
705 self.assertEquals(res[1]["lastLogon"], "z")
706 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
707 self.assertTrue(not "dnsHostName" in res[2])
708 self.assertEquals(res[2]["lastLogon"], "z")
710 # Search by negated disjunction of local and remote attribute
711 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
712 attrs=["dnsHostName", "lastLogon"])
713 self.assertEquals(len(res), 4)
714 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
715 self.assertTrue(not "dnsHostName" in res[0])
716 self.assertEquals(res[0]["lastLogon"], "x")
717 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
718 self.assertEquals(res[1]["dnsHostName"], "z")
719 self.assertEquals(res[1]["lastLogon"], "z")
720 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
721 self.assertTrue(not "dnsHostName" in res[2])
722 self.assertEquals(res[2]["lastLogon"], "z")
724 # Search by complex parse tree
725 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
726 self.assertEquals(len(res), 6)
727 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
728 self.assertTrue(not "dnsHostName" in res[0])
729 self.assertEquals(res[0]["lastLogon"], "y")
730 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
731 self.assertEquals(res[1]["dnsHostName"], "x")
732 self.assertEquals(res[1]["lastLogon"], "x")
733 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
734 self.assertTrue(not "dnsHostName" in res[2])
735 self.assertEquals(res[2]["lastLogon"], "x")
736 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
737 self.assertEquals(res[3]["dnsHostName"], "z")
738 self.assertEquals(res[3]["lastLogon"], "z")
739 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
740 self.assertTrue(not "dnsHostName" in res[4])
741 self.assertEquals(res[4]["lastLogon"], "z")
744 dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
748 def test_map_modify_local(self):
749 """Modification of local records."""
751 dn = "cn=test,dc=idealx,dc=org"
752 self.ldb.add({"dn": dn,
756 "description": "test"})
758 attrs = ["foo", "revision", "description"]
759 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
760 self.assertEquals(len(res), 1)
761 self.assertEquals(str(res[0].dn), dn)
762 self.assertEquals(res[0]["foo"], "bar")
763 self.assertEquals(res[0]["revision"], "1")
764 self.assertEquals(res[0]["description"], "test")
765 # Check it's not in the local db
766 res = self.samba4.db.search(expression="(cn=test)",
767 scope=SCOPE_DEFAULT, attrs=attrs)
768 self.assertEquals(len(res), 0)
769 # Check it's not in the remote db
770 res = self.samba3.db.search(expression="(cn=test)",
771 scope=SCOPE_DEFAULT, attrs=attrs)
772 self.assertEquals(len(res), 0)
774 # Modify local record
782 self.ldb.modify_ldif(ldif)
784 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
785 self.assertEquals(len(res), 1)
786 self.assertEquals(str(res[0].dn), dn)
787 self.assertEquals(res[0]["foo"], "baz")
788 self.assertEquals(res[0]["revision"], "1")
789 self.assertEquals(res[0]["description"], "foo")
791 # Rename local record
792 dn2 = "cn=toast,dc=idealx,dc=org"
793 self.ldb.rename(dn, dn2)
795 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
796 self.assertEquals(len(res), 1)
797 self.assertEquals(str(res[0].dn), dn2)
798 self.assertEquals(res[0]["foo"], "baz")
799 self.assertEquals(res[0]["revision"], "1")
800 self.assertEquals(res[0]["description"], "foo")
802 # Delete local record
805 res = self.ldb.search(dn2, scope=SCOPE_BASE)
806 self.assertEquals(len(res), 0)
808 def test_map_modify_remote_remote(self):
809 """Modification of remote data of remote records"""
811 dn = self.samba4.dn("cn=test")
812 dn2 = self.samba3.dn("cn=test")
813 self.samba3.db.add({"dn": dn2,
815 "description": "foo",
816 "sambaBadPasswordCount": "3",
817 "sambaNextRid": "1001"})
819 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
820 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
821 self.assertEquals(len(res), 1)
822 self.assertEquals(str(res[0].dn), dn2)
823 self.assertEquals(res[0]["description"], "foo")
824 self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
825 self.assertEquals(res[0]["sambaNextRid"], "1001")
827 attrs = ["description", "badPwdCount", "nextRid"]
828 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
829 self.assertEquals(len(res), 1)
830 self.assertEquals(str(res[0].dn), dn)
831 self.assertEquals(res[0]["description"], "foo")
832 self.assertEquals(res[0]["badPwdCount"], "3")
833 self.assertEquals(res[0]["nextRid"], "1001")
835 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
836 self.assertEquals(len(res), 0)
838 # Modify remote data of remote record
846 self.ldb.modify_ldif(ldif)
848 res = self.ldb.search(dn, scope=SCOPE_BASE,
849 attrs=["description", "badPwdCount", "nextRid"])
850 self.assertEquals(len(res), 1)
851 self.assertEquals(str(res[0].dn), dn)
852 self.assertEquals(res[0]["description"], "test")
853 self.assertEquals(res[0]["badPwdCount"], "4")
854 self.assertEquals(res[0]["nextRid"], "1001")
856 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
857 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
858 self.assertEquals(len(res), 1)
859 self.assertEquals(str(res[0].dn), dn2)
860 self.assertEquals(res[0]["description"], "test")
861 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
862 self.assertEquals(res[0]["sambaNextRid"], "1001")
864 # Rename remote record
865 dn2 = self.samba4.dn("cn=toast")
866 self.ldb.rename(dn, dn2)
869 res = self.ldb.search(dn, scope=SCOPE_BASE,
870 attrs=["description", "badPwdCount", "nextRid"])
871 self.assertEquals(len(res), 1)
872 self.assertEquals(str(res[0].dn), dn)
873 self.assertEquals(res[0]["description"], "test")
874 self.assertEquals(res[0]["badPwdCount"], "4")
875 self.assertEquals(res[0]["nextRid"], "1001")
877 dn2 = self.samba3.dn("cn=toast")
878 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
879 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
880 self.assertEquals(len(res), 1)
881 self.assertEquals(str(res[0].dn), dn2)
882 self.assertEquals(res[0]["description"], "test")
883 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
884 self.assertEquals(res[0]["sambaNextRid"], "1001")
886 # Delete remote record
888 # Check in mapped db that it's removed
889 res = self.ldb.search(dn, scope=SCOPE_BASE)
890 self.assertEquals(len(res), 0)
892 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
893 self.assertEquals(len(res), 0)
895 def test_map_modify_remote_local(self):
896 """Modification of local data of remote records"""
897 # Add remote record (same as before)
898 dn = self.samba4.dn("cn=test")
899 dn2 = self.samba3.dn("cn=test")
900 self.samba3.db.add({"dn": dn2,
902 "description": "foo",
903 "sambaBadPasswordCount": "3",
904 "sambaNextRid": "1001"})
906 # Modify local data of remote record
915 self.ldb.modify_ldif(ldif)
917 attrs = ["revision", "description"]
918 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
919 self.assertEquals(len(res), 1)
920 self.assertEquals(str(res[0].dn), dn)
921 self.assertEquals(res[0]["description"], "test")
922 self.assertEquals(res[0]["revision"], "1")
924 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
925 self.assertEquals(len(res), 1)
926 self.assertEquals(str(res[0].dn), dn2)
927 self.assertEquals(res[0]["description"], "test")
928 self.assertTrue(not "revision" in res[0])
930 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
931 self.assertEquals(len(res), 1)
932 self.assertEquals(str(res[0].dn), dn)
933 self.assertTrue(not "description" in res[0])
934 self.assertEquals(res[0]["revision"], "1")
936 # Delete (newly) split record
939 def test_map_modify_split(self):
940 """Testing modification of split records"""
942 dn = self.samba4.dn("cn=test")
943 dn2 = self.samba3.dn("cn=test")
947 "description": "foo",
952 attrs = ["description", "badPwdCount", "nextRid", "revision"]
953 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
954 self.assertEquals(len(res), 1)
955 self.assertEquals(str(res[0].dn), dn)
956 self.assertEquals(res[0]["description"], "foo")
957 self.assertEquals(res[0]["badPwdCount"], "3")
958 self.assertEquals(res[0]["nextRid"], "1001")
959 self.assertEquals(res[0]["revision"], "1")
961 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
962 self.assertEquals(len(res), 1)
963 self.assertEquals(str(res[0].dn), dn)
964 self.assertTrue(not "description" in res[0])
965 self.assertTrue(not "badPwdCount" in res[0])
966 self.assertTrue(not "nextRid" in res[0])
967 self.assertEquals(res[0]["revision"], "1")
969 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
970 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
971 self.assertEquals(len(res), 1)
972 self.assertEquals(str(res[0].dn), dn2)
973 self.assertEquals(res[0]["description"], "foo")
974 self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
975 self.assertEquals(res[0]["sambaNextRid"], "1001")
976 self.assertTrue(not "revision" in res[0])
978 # Modify of split record
988 self.ldb.modify_ldif(ldif)
990 attrs = ["description", "badPwdCount", "nextRid", "revision"]
991 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
992 self.assertEquals(len(res), 1)
993 self.assertEquals(str(res[0].dn), dn)
994 self.assertEquals(res[0]["description"], "test")
995 self.assertEquals(res[0]["badPwdCount"], "4")
996 self.assertEquals(res[0]["nextRid"], "1001")
997 self.assertEquals(res[0]["revision"], "2")
999 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1000 self.assertEquals(len(res), 1)
1001 self.assertEquals(str(res[0].dn), dn)
1002 self.assertTrue(not "description" in res[0])
1003 self.assertTrue(not "badPwdCount" in res[0])
1004 self.assertTrue(not "nextRid" in res[0])
1005 self.assertEquals(res[0]["revision"], "2")
1006 # Check in remote db
1007 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
1008 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1009 self.assertEquals(len(res), 1)
1010 self.assertEquals(str(res[0].dn), dn2)
1011 self.assertEquals(res[0]["description"], "test")
1012 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1013 self.assertEquals(res[0]["sambaNextRid"], "1001")
1014 self.assertTrue(not "revision" in res[0])
1016 # Rename split record
1017 dn2 = self.samba4.dn("cn=toast")
1018 self.ldb.rename(dn, dn2)
1019 # Check in mapped db
1021 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1022 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1023 self.assertEquals(len(res), 1)
1024 self.assertEquals(str(res[0].dn), dn)
1025 self.assertEquals(res[0]["description"], "test")
1026 self.assertEquals(res[0]["badPwdCount"], "4")
1027 self.assertEquals(res[0]["nextRid"], "1001")
1028 self.assertEquals(res[0]["revision"], "2")
1030 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1031 self.assertEquals(len(res), 1)
1032 self.assertEquals(str(res[0].dn), dn)
1033 self.assertTrue(not "description" in res[0])
1034 self.assertTrue(not "badPwdCount" in res[0])
1035 self.assertTrue(not "nextRid" in res[0])
1036 self.assertEquals(res[0]["revision"], "2")
1037 # Check in remote db
1038 dn2 = self.samba3.dn("cn=toast")
1039 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
1040 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1041 self.assertEquals(len(res), 1)
1042 self.assertEquals(str(res[0].dn), dn2)
1043 self.assertEquals(res[0]["description"], "test")
1044 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1045 self.assertEquals(res[0]["sambaNextRid"], "1001")
1046 self.assertTrue(not "revision" in res[0])
1048 # Delete split record
1050 # Check in mapped db
1051 res = self.ldb.search(dn, scope=SCOPE_BASE)
1052 self.assertEquals(len(res), 0)
1054 res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1055 self.assertEquals(len(res), 0)
1056 # Check in remote db
1057 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1058 self.assertEquals(len(res), 0)