3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2007
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
29 from ldb import SCOPE_DEFAULT, SCOPE_BASE
30 from samba import Ldb, substitute_var
31 from samba.tests import LdbTestCase, TestCaseInTempDir
33 datadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3")
35 class MapBaseTestCase(TestCaseInTempDir):
36 def setup_data(self, obj, ldif):
37 self.assertTrue(ldif is not None)
38 obj.db.add_ldif(substitute_var(ldif, obj.substvars))
40 def setup_modules(self, ldb, s3, s4):
41 ldb.add({"dn": "@MAP=samba3sam",
43 "@TO": "sambaDomainName=TESTS," + s3.basedn})
45 ldb.add({"dn": "@MODULES",
46 "@LIST": "rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition"})
48 ldb.add({"dn": "@PARTITION",
49 "partition": [s4.basedn + ":" + s4.url, s3.basedn + ":" + s3.url],
50 "replicateEntries": ["@SUBCLASSES", "@ATTRIBUTES", "@INDEXLIST"]})
53 super(MapBaseTestCase, self).setUp()
55 def make_dn(basedn, rdn):
56 return rdn + ",sambaDomainName=TESTS," + basedn
58 def make_s4dn(basedn, rdn):
59 return rdn + "," + basedn
61 self.ldbfile = os.path.join(self.tempdir, "test.ldb")
62 self.ldburl = "tdb://" + self.ldbfile
64 tempdir = self.tempdir
68 """Simple helper class that contains data for a specific SAM connection."""
69 def __init__(self, file, basedn, dn):
70 self.file = os.path.join(tempdir, file)
71 self.url = "tdb://" + self.file
73 self.substvars = {"BASEDN": self.basedn}
78 return self._dn(rdn, self.basedn)
81 return self.db.connect(self.url)
83 self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
84 self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
85 self.templates = Target("templates.ldb", "cn=templates", None)
88 self.templates.connect()
92 os.unlink(self.ldbfile)
93 os.unlink(self.samba3.file)
94 os.unlink(self.templates.file)
95 os.unlink(self.samba4.file)
96 super(MapBaseTestCase, self).tearDown()
99 class Samba3SamTestCase(MapBaseTestCase):
101 super(Samba3SamTestCase, self).setUp()
102 ldb = Ldb(self.ldburl)
103 self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read())
104 self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
105 ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
106 ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))
107 self.setup_modules(ldb, self.samba3, self.samba4)
109 self.ldb = Ldb(self.ldburl)
111 def test_s3sam_search(self):
113 print "Looking up by non-mapped attribute"
114 msg = ldb.search(expression="(cn=Administrator)")
115 self.assertEquals(len(msg), 1)
116 self.assertEquals(msg[0]["cn"], "Administrator")
118 print "Looking up by mapped attribute"
119 msg = ldb.search(expression="(name=Backup Operators)")
120 self.assertEquals(len(msg), 1)
121 self.assertEquals(msg[0]["name"], "Backup Operators")
123 print "Looking up by old name of renamed attribute"
124 msg = ldb.search(expression="(displayName=Backup Operators)")
125 self.assertEquals(len(msg), 0)
127 print "Looking up mapped entry containing SID"
128 msg = ldb.search(expression="(cn=Replicator)")
129 self.assertEquals(len(msg), 1)
131 self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
132 self.assertEquals(msg[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
134 print "Checking mapping of objectClass"
135 oc = set(msg[0]["objectClass"])
136 self.assertTrue(oc is not None)
138 self.assertEquals(oc[i] == "posixGroup" or oc[i], "group")
140 print "Looking up by objectClass"
141 msg = ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
142 self.assertEquals(len(msg), 2)
143 for i in range(len(msg)):
144 self.assertEquals((str(msg[i].dn), "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or
145 (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl"))
148 def test_s3sam_modify(self):
151 print "Adding a record that will be fallbacked"
152 ldb.add({"dn": "cn=Foo",
156 "showInAdvancedViewOnly": "TRUE"}
159 print "Checking for existence of record (local)"
160 # TODO: This record must be searched in the local database, which is currently only supported for base searches
161 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
162 # TODO: Actually, this version should work as well but doesn't...
165 msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=LDB_SCOPE_BASE, attrs=['foo','blah','cn','showInAdvancedViewOnly'])
166 self.assertEquals(len(msg), 1)
167 self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
168 self.assertEquals(msg[0]["foo"], "bar")
169 self.assertEquals(msg[0]["blah"], "Blie")
171 print "Adding record that will be mapped"
172 ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
173 "objectClass": "user",
175 "sambaUnicodePwd": "geheim",
178 print "Checking for existence of record (remote)"
179 msg = ldb.search(expression="(unixName=bin)", attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
180 self.assertEquals(len(msg), 1)
181 self.assertEquals(msg[0]["cn"], "Niemand")
182 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
184 print "Checking for existence of record (local && remote)"
185 msg = ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
186 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
187 self.assertEquals(len(msg), 1) # TODO: should check with more records
188 self.assertEquals(msg[0]["cn"], "Niemand")
189 self.assertEquals(msg[0]["unixName"], "bin")
190 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
192 print "Checking for existence of record (local || remote)"
193 msg = ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
194 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
195 print "got " + len(msg) + " replies"
196 self.assertEquals(len(msg), 1) # TODO: should check with more records
197 self.assertEquals(msg[0]["cn"], "Niemand")
198 self.assertEquals(msg[0]["unixName"] == "bin" or msg[0]["sambaUnicodePwd"], "geheim")
200 print "Checking for data in destination database"
201 msg = s3.db.search("(cn=Niemand)")
202 self.assertTrue(len(msg) >= 1)
203 self.assertEquals(msg[0]["sambaSID"], "S-1-5-21-4231626423-2410014848-2360679739-2001")
204 self.assertEquals(msg[0]["displayName"], "Niemand")
206 print "Adding attribute..."
208 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
214 print "Checking whether changes are still there..."
215 msg = ldb.search(expression="(cn=Niemand)")
216 self.assertTrue(len(msg) >= 1)
217 self.assertEquals(msg[0]["cn"], "Niemand")
218 self.assertEquals(msg[0]["description"], "Blah")
220 print "Modifying attribute..."
222 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
228 print "Checking whether changes are still there..."
229 msg = ldb.search(expression="(cn=Niemand)")
230 self.assertTrue(len(msg) >= 1)
231 self.assertEquals(msg[0]["description"], "Blie")
233 print "Deleting attribute..."
235 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
240 print "Checking whether changes are no longer there..."
241 msg = ldb.search(expression="(cn=Niemand)")
242 self.assertTrue(len(msg) >= 1)
243 self.assertEquals(msg[0]["description"], undefined)
245 print "Renaming record..."
246 ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
248 print "Checking whether DN has changed..."
249 msg = ldb.search(expression="(cn=Niemand2)")
250 self.assertEquals(len(msg), 1)
251 self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
253 print "Deleting record..."
254 ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
256 print "Checking whether record is gone..."
257 msg = ldb.search(expression="(cn=Niemand2)")
258 self.assertEquals(len(msg), 0)
262 class MapTestCase(MapBaseTestCase):
264 super(MapTestCase, self).setUp()
265 ldb = Ldb(self.ldburl)
266 self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
267 ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
268 ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))
269 self.setup_modules(ldb, self.samba3, self.samba4)
270 self.ldb = Ldb(self.ldburl)
272 def test_map_search(self):
276 print "Running search tests on mapped data"
278 dn: """ + "sambaDomainName=TESTS,""" + s3.basedn + """
279 objectclass: sambaDomain
281 sambaSID: S-1-5-21-4231626423-2410014848-2360679739
283 sambaDomainName: TESTS"""
284 s3.db.add_ldif(substitute_var(ldif, s3.substvars))
286 print "Add a set of split records"
288 dn: """ + s4.dn("cn=X") + """
297 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
298 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
300 dn: """ + s4.dn("cn=Y") + """
310 dn: """ + s4.dn("cn=Z") + """
321 ldb.add_ldif(substitute_var(ldif, s4.substvars))
323 print "Add a set of remote records"
326 dn: """ + s3.dn("cn=A") + """
327 objectClass: posixAccount
330 sambaBadPasswordCount: x
333 sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552
334 sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512
336 dn: """ + s3.dn("cn=B") + """
340 sambaBadPasswordCount: x
344 dn: """ + s3.dn("cn=C") + """
348 sambaBadPasswordCount: y
352 s3.add_ldif(substitute_var(ldif, s3.substvars))
354 print "Testing search by DN"
356 # Search remote record by local DN
358 attrs = ["dnsHostName", "lastLogon"]
359 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
360 self.assertEquals(len(res), 1)
361 self.assertEquals(str(str(res[0].dn)), dn)
362 self.assertEquals(res[0]["dnsHostName"], undefined)
363 self.assertEquals(res[0]["lastLogon"], "x")
365 # Search remote record by remote DN
367 attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
368 res = s3.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
369 self.assertEquals(len(res), 1)
370 self.assertEquals(str(str(res[0].dn)), dn)
371 self.assertEquals(res[0]["dnsHostName"], undefined)
372 self.assertEquals(res[0]["lastLogon"], undefined)
373 self.assertEquals(res[0]["sambaLogonTime"], "x")
375 # Search split record by local DN
377 attrs = ["dnsHostName", "lastLogon"]
378 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
379 self.assertEquals(len(res), 1)
380 self.assertEquals(str(str(res[0].dn)), dn)
381 self.assertEquals(res[0]["dnsHostName"], "x")
382 self.assertEquals(res[0]["lastLogon"], "x")
384 # Search split record by remote DN
386 attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
387 res = s3.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
388 self.assertEquals(len(res), 1)
389 self.assertEquals(str(str(res[0].dn)), dn)
390 self.assertEquals(res[0]["dnsHostName"], undefined)
391 self.assertEquals(res[0]["lastLogon"], undefined)
392 self.assertEquals(res[0]["sambaLogonTime"], "x")
394 print "Testing search by attribute"
396 # Search by ignored attribute
397 attrs = ["dnsHostName", "lastLogon"]
398 res = ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, attrs=attrs)
399 self.assertEquals(len(res), 2)
400 self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Y"))
401 self.assertEquals(res[0]["dnsHostName"], "y")
402 self.assertEquals(res[0]["lastLogon"], "y")
403 self.assertEquals(str(str(res[1].dn)), s4.dn("cn=X"))
404 self.assertEquals(res[1]["dnsHostName"], "x")
405 self.assertEquals(res[1]["lastLogon"], "x")
407 # Search by kept attribute
408 attrs = ["dnsHostName", "lastLogon"]
409 res = ldb.search(expression="(description=y)", scope=SCOPE_DEFAULT, attrs=attrs)
410 self.assertEquals(len(res), 2)
411 self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Z"))
412 self.assertEquals(res[0]["dnsHostName"], "z")
413 self.assertEquals(res[0]["lastLogon"], "z")
414 self.assertEquals(str(str(res[1].dn)), s4.dn("cn=C"))
415 self.assertEquals(res[1]["dnsHostName"], undefined)
416 self.assertEquals(res[1]["lastLogon"], "z")
418 # Search by renamed attribute
419 attrs = ["dnsHostName", "lastLogon"]
420 res = ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, attrs=attrs)
421 self.assertEquals(len(res), 2)
422 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
423 self.assertEquals(res[0]["dnsHostName"], undefined)
424 self.assertEquals(res[0]["lastLogon"], "y")
425 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
426 self.assertEquals(res[1]["dnsHostName"], undefined)
427 self.assertEquals(res[1]["lastLogon"], "x")
429 # Search by converted attribute
430 attrs = ["dnsHostName", "lastLogon", "objectSid"]
432 # Using the SID directly in the parse tree leads to conversion
433 # errors, letting the search fail with no results.
434 #res = ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", NULL, ldb. SCOPE_DEFAULT, attrs)
435 res = ldb.search(expression="(objectSid=*)", attrs=attrs)
436 self.assertEquals(len(res), 3)
437 self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
438 self.assertEquals(res[0]["dnsHostName"], "x")
439 self.assertEquals(res[0]["lastLogon"], "x")
440 self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
441 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
442 self.assertEquals(res[1]["dnsHostName"], undefined)
443 self.assertEquals(res[1]["lastLogon"], "x")
444 self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
446 # Search by generated attribute
447 # In most cases, this even works when the mapping is missing
448 # a `convert_operator' by enumerating the remote db.
449 attrs = ["dnsHostName", "lastLogon", "primaryGroupID"]
450 res = ldb.search(expression="(primaryGroupID=512)", attrs=attrs)
451 self.assertEquals(len(res), 1)
452 self.assertEquals(str(res[0].dn), s4.dn("cn=A"))
453 self.assertEquals(res[0]["dnsHostName"], undefined)
454 self.assertEquals(res[0]["lastLogon"], "x")
455 self.assertEquals(res[0]["primaryGroupID"], "512")
457 # TODO: There should actually be two results, A and X. The
458 # primaryGroupID of X seems to get corrupted somewhere, and the
459 # objectSid isn't available during the generation of remote (!) data,
460 # which can be observed with the following search. Also note that Xs
461 # objectSid seems to be fine in the previous search for objectSid... */
462 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
463 #print len(res) + " results found"
464 #for i in range(len(res)):
465 # for (obj in res[i]) {
466 # print obj + ": " + res[i][obj]
471 # Search by remote name of renamed attribute */
472 attrs = ["dnsHostName", "lastLogon"]
473 res = ldb.search(expression="(sambaBadPasswordCount=*)", attrs=attrs)
474 self.assertEquals(len(res), 0)
476 # Search by objectClass
477 attrs = ["dnsHostName", "lastLogon", "objectClass"]
478 res = ldb.search(expression="(objectClass=user)", attrs=attrs)
479 self.assertEquals(len(res), 2)
480 self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
481 self.assertEquals(res[0]["dnsHostName"], "x")
482 self.assertEquals(res[0]["lastLogon"], "x")
483 self.assertTrue(res[0]["objectClass"] is not None)
484 self.assertEquals(res[0]["objectClass"][0], "user")
485 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
486 self.assertEquals(res[1]["dnsHostName"], undefined)
487 self.assertEquals(res[1]["lastLogon"], "x")
488 self.assertTrue(res[1]["objectClass"] is not None)
489 self.assertEquals(res[1]["objectClass"][0], "user")
491 # Prove that the objectClass is actually used for the search
492 res = ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs)
493 self.assertEquals(len(res), 3)
494 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
495 self.assertEquals(res[0]["dnsHostName"], undefined)
496 self.assertEquals(res[0]["lastLogon"], "y")
497 self.assertTrue(res[0]["objectClass"] is not None)
498 for oc in set(res[0]["objectClass"]):
499 self.assertEquals(oc, "user")
500 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
501 self.assertEquals(res[1]["dnsHostName"], "x")
502 self.assertEquals(res[1]["lastLogon"], "x")
503 self.assertTrue(res[1]["objectClass"] is not None)
504 self.assertEquals(res[1]["objectClass"][0], "user")
505 self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
506 self.assertEquals(res[2]["dnsHostName"], undefined)
507 self.assertEquals(res[2]["lastLogon"], "x")
508 self.assertTrue(res[2]["objectClass"] is not None)
509 self.assertEquals(res[2]["objectClass"][0], "user")
511 print "Testing search by parse tree"
513 # Search by conjunction of local attributes
514 attrs = ["dnsHostName", "lastLogon"]
515 res = ldb.search(expression="(&(codePage=x)(revision=x))", attrs=attrs)
516 self.assertEquals(len(res), 2)
517 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
518 self.assertEquals(res[0]["dnsHostName"], "y")
519 self.assertEquals(res[0]["lastLogon"], "y")
520 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
521 self.assertEquals(res[1]["dnsHostName"], "x")
522 self.assertEquals(res[1]["lastLogon"], "x")
524 # Search by conjunction of remote attributes
525 attrs = ["dnsHostName", "lastLogon"]
526 res = ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=attrs)
527 self.assertEquals(len(res), 2)
528 self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
529 self.assertEquals(res[0]["dnsHostName"], "x")
530 self.assertEquals(res[0]["lastLogon"], "x")
531 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
532 self.assertEquals(res[1]["dnsHostName"], undefined)
533 self.assertEquals(res[1]["lastLogon"], "x")
535 # Search by conjunction of local and remote attribute
536 attrs = ["dnsHostName", "lastLogon"]
537 res = ldb.search(expression="(&(codePage=x)(description=x))", attrs=attrs)
538 self.assertEquals(len(res), 2)
539 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
540 self.assertEquals(res[0]["dnsHostName"], "y")
541 self.assertEquals(res[0]["lastLogon"], "y")
542 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
543 self.assertEquals(res[1]["dnsHostName"], "x")
544 self.assertEquals(res[1]["lastLogon"], "x")
546 # Search by conjunction of local and remote attribute w/o match
547 attrs = ["dnsHostName", "lastLogon"]
548 res = ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs)
549 self.assertEquals(len(res), 0)
550 res = ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs)
551 self.assertEquals(len(res), 0)
553 # Search by disjunction of local attributes
554 attrs = ["dnsHostName", "lastLogon"]
555 res = ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=attrs)
556 self.assertEquals(len(res), 2)
557 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
558 self.assertEquals(res[0]["dnsHostName"], "y")
559 self.assertEquals(res[0]["lastLogon"], "y")
560 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
561 self.assertEquals(res[1]["dnsHostName"], "x")
562 self.assertEquals(res[1]["lastLogon"], "x")
564 # Search by disjunction of remote attributes
565 attrs = ["dnsHostName", "lastLogon"]
566 res = ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=attrs)
567 self.assertEquals(len(res), 3)
568 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
569 self.assertEquals(res[0]["dnsHostName"], undefined)
570 self.assertEquals(res[0]["lastLogon"], "y")
571 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
572 self.assertEquals(res[1]["dnsHostName"], "x")
573 self.assertEquals(res[1]["lastLogon"], "x")
574 self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
575 self.assertEquals(res[2]["dnsHostName"], undefined)
576 self.assertEquals(res[2]["lastLogon"], "x")
578 # Search by disjunction of local and remote attribute
579 attrs = ["dnsHostName", "lastLogon"]
580 res = ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=attrs)
581 self.assertEquals(len(res), 3)
582 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
583 self.assertEquals(res[0]["dnsHostName"], "y")
584 self.assertEquals(res[0]["lastLogon"], "y")
585 self.assertEquals(str(res[1].dn), s4.dn("cn=B"))
586 self.assertEquals(res[1]["dnsHostName"], undefined)
587 self.assertEquals(res[1]["lastLogon"], "y")
588 self.assertEquals(str(res[2].dn), s4.dn("cn=X"))
589 self.assertEquals(res[2]["dnsHostName"], "x")
590 self.assertEquals(res[2]["lastLogon"], "x")
592 # Search by disjunction of local and remote attribute w/o match
593 attrs = ["dnsHostName", "lastLogon"]
594 res = ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=attrs)
595 self.assertEquals(len(res), 0)
597 # Search by negated local attribute
598 attrs = ["dnsHostName", "lastLogon"]
599 res = ldb.search(expression="(!(revision=x))", attrs=attrs)
600 self.assertEquals(len(res), 5)
601 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
602 self.assertEquals(res[0]["dnsHostName"], undefined)
603 self.assertEquals(res[0]["lastLogon"], "y")
604 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
605 self.assertEquals(res[1]["dnsHostName"], undefined)
606 self.assertEquals(res[1]["lastLogon"], "x")
607 self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
608 self.assertEquals(res[2]["dnsHostName"], "z")
609 self.assertEquals(res[2]["lastLogon"], "z")
610 self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
611 self.assertEquals(res[3]["dnsHostName"], undefined)
612 self.assertEquals(res[3]["lastLogon"], "z")
614 # Search by negated remote attribute
615 attrs = ["dnsHostName", "lastLogon"]
616 res = ldb.search(expression="(!(description=x))", attrs=attrs)
617 self.assertEquals(len(res), 3)
618 self.assertEquals(str(res[0].dn), s4.dn("cn=Z"))
619 self.assertEquals(res[0]["dnsHostName"], "z")
620 self.assertEquals(res[0]["lastLogon"], "z")
621 self.assertEquals(str(res[1].dn), s4.dn("cn=C"))
622 self.assertEquals(res[1]["dnsHostName"], undefined)
623 self.assertEquals(res[1]["lastLogon"], "z")
625 # Search by negated conjunction of local attributes
626 attrs = ["dnsHostName", "lastLogon"]
627 res = ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=attrs)
628 self.assertEquals(len(res), 5)
629 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
630 self.assertEquals(res[0]["dnsHostName"], undefined)
631 self.assertEquals(res[0]["lastLogon"], "y")
632 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
633 self.assertEquals(res[1]["dnsHostName"], undefined)
634 self.assertEquals(res[1]["lastLogon"], "x")
635 self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
636 self.assertEquals(res[2]["dnsHostName"], "z")
637 self.assertEquals(res[2]["lastLogon"], "z")
638 self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
639 self.assertEquals(res[3]["dnsHostName"], undefined)
640 self.assertEquals(res[3]["lastLogon"], "z")
642 # Search by negated conjunction of remote attributes
643 attrs = ["dnsHostName", "lastLogon"]
644 res = ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=attrs)
645 self.assertEquals(len(res), 5)
646 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
647 self.assertEquals(res[0]["dnsHostName"], "y")
648 self.assertEquals(res[0]["lastLogon"], "y")
649 self.assertEquals(str(res[1].dn), s4.dn("cn=B"))
650 self.assertEquals(res[1]["dnsHostName"], undefined)
651 self.assertEquals(res[1]["lastLogon"], "y")
652 self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
653 self.assertEquals(res[2]["dnsHostName"], "z")
654 self.assertEquals(res[2]["lastLogon"], "z")
655 self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
656 self.assertEquals(res[3]["dnsHostName"], undefined)
657 self.assertEquals(res[3]["lastLogon"], "z")
659 # Search by negated conjunction of local and remote attribute
660 attrs = ["dnsHostName", "lastLogon"]
661 res = ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=attrs)
662 self.assertEquals(len(res), 5)
663 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
664 self.assertEquals(res[0]["dnsHostName"], undefined)
665 self.assertEquals(res[0]["lastLogon"], "y")
666 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
667 self.assertEquals(res[1]["dnsHostName"], undefined)
668 self.assertEquals(res[1]["lastLogon"], "x")
669 self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
670 self.assertEquals(res[2]["dnsHostName"], "z")
671 self.assertEquals(res[2]["lastLogon"], "z")
672 self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
673 self.assertEquals(res[3]["dnsHostName"], undefined)
674 self.assertEquals(res[3]["lastLogon"], "z")
676 # Search by negated disjunction of local attributes
677 attrs = ["dnsHostName", "lastLogon"]
678 res = ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=attrs)
679 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
680 self.assertEquals(res[0]["dnsHostName"], undefined)
681 self.assertEquals(res[0]["lastLogon"], "y")
682 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
683 self.assertEquals(res[1]["dnsHostName"], undefined)
684 self.assertEquals(res[1]["lastLogon"], "x")
685 self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
686 self.assertEquals(res[2]["dnsHostName"], "z")
687 self.assertEquals(res[2]["lastLogon"], "z")
688 self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
689 self.assertEquals(res[3]["dnsHostName"], undefined)
690 self.assertEquals(res[3]["lastLogon"], "z")
692 # Search by negated disjunction of remote attributes
693 attrs = ["dnsHostName", "lastLogon"]
694 res = ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=attrs)
695 self.assertEquals(len(res), 4)
696 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
697 self.assertEquals(res[0]["dnsHostName"], "y")
698 self.assertEquals(res[0]["lastLogon"], "y")
699 self.assertEquals(str(res[1].dn), s4.dn("cn=Z"))
700 self.assertEquals(res[1]["dnsHostName"], "z")
701 self.assertEquals(res[1]["lastLogon"], "z")
702 self.assertEquals(str(res[2].dn), s4.dn("cn=C"))
703 self.assertEquals(res[2]["dnsHostName"], undefined)
704 self.assertEquals(res[2]["lastLogon"], "z")
706 # Search by negated disjunction of local and remote attribute
707 attrs = ["dnsHostName", "lastLogon"]
708 res = ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", attrs=attrs)
709 self.assertEquals(len(res), 4)
710 self.assertEquals(str(res[0].dn), s4.dn("cn=A"))
711 self.assertEquals(res[0]["dnsHostName"], undefined)
712 self.assertEquals(res[0]["lastLogon"], "x")
713 self.assertEquals(str(res[1].dn), s4.dn("cn=Z"))
714 self.assertEquals(res[1]["dnsHostName"], "z")
715 self.assertEquals(res[1]["lastLogon"], "z")
716 self.assertEquals(str(res[2].dn), s4.dn("cn=C"))
717 self.assertEquals(res[2]["dnsHostName"], undefined)
718 self.assertEquals(res[2]["lastLogon"], "z")
720 print "Search by complex parse tree"
721 attrs = ["dnsHostName", "lastLogon"]
722 res = ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=attrs)
723 self.assertEquals(len(res), 6)
724 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
725 self.assertEquals(res[0]["dnsHostName"], undefined)
726 self.assertEquals(res[0]["lastLogon"], "y")
727 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
728 self.assertEquals(res[1]["dnsHostName"], "x")
729 self.assertEquals(res[1]["lastLogon"], "x")
730 self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
731 self.assertEquals(res[2]["dnsHostName"], undefined)
732 self.assertEquals(res[2]["lastLogon"], "x")
733 self.assertEquals(str(res[3].dn), s4.dn("cn=Z"))
734 self.assertEquals(res[3]["dnsHostName"], "z")
735 self.assertEquals(res[3]["lastLogon"], "z")
736 self.assertEquals(str(res[4].dn), s4.dn("cn=C"))
737 self.assertEquals(res[4]["dnsHostName"], undefined)
738 self.assertEquals(res[4]["lastLogon"], "z")
741 dns = [s4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
745 def test_map_modify_local(self):
746 """Modification of local records."""
752 dn = "cn=test,dc=idealx,dc=org"
757 "description": "test"})
759 attrs = ["foo", "revision", "description"]
760 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
761 self.assertEquals(len(res), 1)
762 self.assertEquals(str(res[0].dn), dn)
763 self.assertEquals(res[0]["foo"], "bar")
764 self.assertEquals(res[0]["revision"], "1")
765 self.assertEquals(res[0]["description"], "test")
766 # Check it's not in the local db
767 res = s4.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs)
768 self.assertEquals(len(res), 0)
769 # Check it's not in the remote db
770 res = s3.db.search(expression="(cn=test)", scope=SCOPE_DEFAULT, attrs=attrs)
771 self.assertEquals(len(res), 0)
773 # Modify local record
781 ldb.modify_ldif(ldif)
783 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
784 self.assertEquals(len(res), 1)
785 self.assertEquals(str(res[0].dn), dn)
786 self.assertEquals(res[0]["foo"], "baz")
787 self.assertEquals(res[0]["revision"], "1")
788 self.assertEquals(res[0]["description"], "foo")
790 # Rename local record
791 dn2 = "cn=toast,dc=idealx,dc=org"
794 res = ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
795 self.assertEquals(len(res), 1)
796 self.assertEquals(str(res[0].dn), dn2)
797 self.assertEquals(res[0]["foo"], "baz")
798 self.assertEquals(res[0]["revision"], "1")
799 self.assertEquals(res[0]["description"], "foo")
801 # Delete local record
804 res = ldb.search(dn2, scope=SCOPE_BASE)
805 self.assertEquals(len(res), 0)
807 def test_map_modify_remote_remote(self):
808 """Modification of remote data of remote records"""
814 dn = s4.dn("cn=test")
815 dn2 = s3.dn("cn=test")
816 s3.db.add({"dn": dn2,
818 "description": "foo",
819 "sambaBadPasswordCount": "3",
820 "sambaNextRid": "1001"})
822 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
823 res = s3.db.search("", dn2, SCOPE_BASE, attrs)
824 self.assertEquals(len(res), 1)
825 self.assertEquals(str(res[0].dn), dn2)
826 self.assertEquals(res[0]["description"], "foo")
827 self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
828 self.assertEquals(res[0]["sambaNextRid"], "1001")
830 attrs = ["description", "badPwdCount", "nextRid"]
831 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
832 self.assertEquals(len(res), 1)
833 self.assertEquals(str(res[0].dn), dn)
834 self.assertEquals(res[0]["description"], "foo")
835 self.assertEquals(res[0]["badPwdCount"], "3")
836 self.assertEquals(res[0]["nextRid"], "1001")
838 res = s4.db.search("", dn, SCOPE_BASE, attrs)
839 self.assertEquals(len(res), 0)
841 # Modify remote data of remote record
849 ldb.modify_ldif(ldif)
851 attrs = ["description", "badPwdCount", "nextRid"]
852 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
853 self.assertEquals(len(res), 1)
854 self.assertEquals(str(res[0].dn), dn)
855 self.assertEquals(res[0]["description"], "test")
856 self.assertEquals(res[0]["badPwdCount"], "4")
857 self.assertEquals(res[0]["nextRid"], "1001")
859 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
860 res = s3.db.search("", dn2, SCOPE_BASE, attrs)
861 self.assertEquals(len(res), 1)
862 self.assertEquals(str(res[0].dn), dn2)
863 self.assertEquals(res[0]["description"], "test")
864 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
865 self.assertEquals(res[0]["sambaNextRid"], "1001")
867 # Rename remote record
868 dn2 = s4.dn("cn=toast")
872 attrs = ["description", "badPwdCount", "nextRid"]
873 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
874 self.assertEquals(len(res), 1)
875 self.assertEquals(str(res[0].dn), dn)
876 self.assertEquals(res[0]["description"], "test")
877 self.assertEquals(res[0]["badPwdCount"], "4")
878 self.assertEquals(res[0]["nextRid"], "1001")
880 dn2 = s3.dn("cn=toast")
881 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
882 res = s3.db.search("", dn2, SCOPE_BASE, attrs)
883 self.assertEquals(len(res), 1)
884 self.assertEquals(str(res[0].dn), dn2)
885 self.assertEquals(res[0]["description"], "test")
886 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
887 self.assertEquals(res[0]["sambaNextRid"], "1001")
889 # Delete remote record
892 res = ldb.search(dn, scope=SCOPE_BASE)
893 self.assertEquals(len(res), 0)
895 res = s3.db.search("", dn2, SCOPE_BASE)
896 self.assertEquals(len(res), 0)
898 def test_map_modify_remote_local(self):
899 """Modification of local data of remote records"""
904 # Add remote record (same as before)
905 dn = s4.dn("cn=test")
906 dn2 = s3.dn("cn=test")
907 s3.db.add({"dn": dn2,
909 "description": "foo",
910 "sambaBadPasswordCount": "3",
911 "sambaNextRid": "1001"})
913 # Modify local data of remote record
921 ldb.modify_ldif(ldif)
923 attrs = ["revision", "description"]
924 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
925 self.assertEquals(len(res), 1)
926 self.assertEquals(str(res[0].dn), dn)
927 self.assertEquals(res[0]["description"], "test")
928 self.assertEquals(res[0]["revision"], "1")
930 res = s3.db.search("", dn2, SCOPE_BASE, attrs)
931 self.assertEquals(len(res), 1)
932 self.assertEquals(str(res[0].dn), dn2)
933 self.assertEquals(res[0]["description"], "test")
934 self.assertEquals(res[0]["revision"], undefined)
936 res = s4.db.search("", dn, SCOPE_BASE, attrs)
937 self.assertEquals(len(res), 1)
938 self.assertEquals(str(res[0].dn), dn)
939 self.assertEquals(res[0]["description"], undefined)
940 self.assertEquals(res[0]["revision"], "1")
942 # Delete (newly) split record
945 def test_map_modify_split(self):
946 """Testing modification of split records"""
952 dn = s4.dn("cn=test")
953 dn2 = s3.dn("cn=test")
957 "description": "foo",
962 attrs = ["description", "badPwdCount", "nextRid", "revision"]
963 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
964 self.assertEquals(len(res), 1)
965 self.assertEquals(str(res[0].dn), dn)
966 self.assertEquals(res[0]["description"], "foo")
967 self.assertEquals(res[0]["badPwdCount"], "3")
968 self.assertEquals(res[0]["nextRid"], "1001")
969 self.assertEquals(res[0]["revision"], "1")
971 res = s4.db.search("", dn, SCOPE_BASE, attrs)
972 self.assertEquals(len(res), 1)
973 self.assertEquals(str(res[0].dn), dn)
974 self.assertEquals(res[0]["description"], undefined)
975 self.assertEquals(res[0]["badPwdCount"], undefined)
976 self.assertEquals(res[0]["nextRid"], undefined)
977 self.assertEquals(res[0]["revision"], "1")
979 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
980 res = s3.db.search("", dn2, SCOPE_BASE, attrs)
981 self.assertEquals(len(res), 1)
982 self.assertEquals(str(res[0].dn), dn2)
983 self.assertEquals(res[0]["description"], "foo")
984 self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
985 self.assertEquals(res[0]["sambaNextRid"], "1001")
986 self.assertEquals(res[0]["revision"], undefined)
988 # Modify of split record
998 ldb.modify_ldif(ldif)
1000 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1001 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1002 self.assertEquals(len(res), 1)
1003 self.assertEquals(str(res[0].dn), dn)
1004 self.assertEquals(res[0]["description"], "test")
1005 self.assertEquals(res[0]["badPwdCount"], "4")
1006 self.assertEquals(res[0]["nextRid"], "1001")
1007 self.assertEquals(res[0]["revision"], "2")
1009 res = s4.db.search("", dn, SCOPE_BASE, attrs)
1010 self.assertEquals(len(res), 1)
1011 self.assertEquals(str(res[0].dn), dn)
1012 self.assertEquals(res[0]["description"], undefined)
1013 self.assertEquals(res[0]["badPwdCount"], undefined)
1014 self.assertEquals(res[0]["nextRid"], undefined)
1015 self.assertEquals(res[0]["revision"], "2")
1016 # Check in remote db
1017 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
1018 res = s3.db.search("", dn2, SCOPE_BASE, attrs)
1019 self.assertEquals(len(res), 1)
1020 self.assertEquals(str(res[0].dn), dn2)
1021 self.assertEquals(res[0]["description"], "test")
1022 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1023 self.assertEquals(res[0]["sambaNextRid"], "1001")
1024 self.assertEquals(res[0]["revision"], undefined)
1026 # Rename split record
1027 dn2 = s4.dn("cn=toast")
1029 # Check in mapped db
1031 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1032 res = ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1033 self.assertEquals(len(res), 1)
1034 self.assertEquals(str(res[0].dn), dn)
1035 self.assertEquals(res[0]["description"], "test")
1036 self.assertEquals(res[0]["badPwdCount"], "4")
1037 self.assertEquals(res[0]["nextRid"], "1001")
1038 self.assertEquals(res[0]["revision"], "2")
1040 res = s4.db.search("", dn, SCOPE_BASE, attrs)
1041 self.assertEquals(len(res), 1)
1042 self.assertEquals(str(res[0].dn), dn)
1043 self.assertEquals(res[0]["description"], undefined)
1044 self.assertEquals(res[0]["badPwdCount"], undefined)
1045 self.assertEquals(res[0]["nextRid"], undefined)
1046 self.assertEquals(res[0]["revision"], "2")
1047 # Check in remote db
1048 dn2 = s3.dn("cn=toast")
1049 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
1050 res = s3.db.search("", dn2, SCOPE_BASE, attrs)
1051 self.assertEquals(len(res), 1)
1052 self.assertEquals(str(res[0].dn), dn2)
1053 self.assertEquals(res[0]["description"], "test")
1054 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
1055 self.assertEquals(res[0]["sambaNextRid"], "1001")
1056 self.assertEquals(res[0]["revision"], undefined)
1058 # Delete split record
1060 # Check in mapped db
1061 res = ldb.search(dn, scope=SCOPE_BASE)
1062 self.assertEquals(len(res), 0)
1064 res = s4.db.search("", dn, SCOPE_BASE)
1065 self.assertEquals(len(res), 0)
1066 # Check in remote db
1067 res = s3.db.search("", dn2, SCOPE_BASE)
1068 self.assertEquals(len(res), 0)