3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 """Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
27 from ldb import SCOPE_DEFAULT, SCOPE_BASE
28 from samba import Ldb, substitute_var
29 from samba.tests import TestCaseInTempDir, env_loadparm
30 import samba.dcerpc.security
32 from samba.auth import system_session
34 datadir = os.path.join(os.path.dirname(__file__),
35 "../../../../../testdata/samba3")
37 def read_datafile(filename):
38 return open(os.path.join(datadir, filename), 'r').read()
40 def ldb_debug(l, text):
44 class MapBaseTestCase(TestCaseInTempDir):
45 """Base test case for mapping tests."""
47 def setup_modules(self, ldb, s3, s4):
48 ldb.add({"dn": "@MAP=samba3sam",
50 "@TO": "sambaDomainName=TESTS," + s3.basedn})
52 ldb.add({"dn": "@MODULES",
53 "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,partition"})
55 ldb.add({"dn": "@PARTITION",
56 "partition": ["%s" % (s4.basedn_casefold),
57 "%s" % (s3.basedn_casefold)],
58 "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"],
62 self.lp = env_loadparm()
63 self.lp.set("sid generator", "backend")
64 self.lp.set("workgroup", "TESTS")
65 self.lp.set("netbios name", "TESTS")
66 super(MapBaseTestCase, self).setUp()
68 def make_dn(basedn, rdn):
69 return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn)
71 def make_s4dn(basedn, rdn):
72 return "%s,%s" % (rdn, basedn)
74 self.ldbfile = os.path.join(self.tempdir, "test.ldb")
75 self.ldburl = "tdb://" + self.ldbfile
77 tempdir = self.tempdir
80 """Simple helper class that contains data for a specific SAM
83 def __init__(self, basedn, dn, lp):
84 self.db = Ldb(lp=lp, session_info=system_session())
86 self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold()
87 self.substvars = {"BASEDN": self.basedn}
88 self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold)
89 self.url = "tdb://" + self.file
93 return self._dn(self.basedn, rdn)
96 return self.db.connect(self.url)
98 def setup_data(self, path):
99 self.add_ldif(read_datafile(path))
101 def subst(self, text):
102 return substitute_var(text, self.substvars)
104 def add_ldif(self, ldif):
105 self.db.add_ldif(self.subst(ldif))
107 def modify_ldif(self, ldif):
108 self.db.modify_ldif(self.subst(ldif))
110 self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn, self.lp)
111 self.samba3 = Target("cn=Samba3Sam", make_dn, self.lp)
113 self.samba3.connect()
114 self.samba4.connect()
117 os.unlink(self.ldbfile)
118 os.unlink(self.samba3.file)
119 os.unlink(self.samba4.file)
120 super(MapBaseTestCase, self).tearDown()
122 def assertSidEquals(self, text, ndr_sid):
123 sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid,
125 sid_obj2 = samba.dcerpc.security.dom_sid(text)
126 self.assertEquals(sid_obj1, sid_obj2)
129 class Samba3SamTestCase(MapBaseTestCase):
132 super(Samba3SamTestCase, self).setUp()
133 ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
134 self.samba3.setup_data("samba3.ldif")
135 ldif = read_datafile("provision_samba3sam.ldif")
136 ldb.add_ldif(self.samba4.subst(ldif))
137 self.setup_modules(ldb, self.samba3, self.samba4)
139 self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
141 def test_search_non_mapped(self):
142 """Looking up by non-mapped attribute"""
143 msg = self.ldb.search(expression="(cn=Administrator)")
144 self.assertEquals(len(msg), 1)
145 self.assertEquals(msg[0]["cn"], "Administrator")
147 def test_search_non_mapped(self):
148 """Looking up by mapped attribute"""
149 msg = self.ldb.search(expression="(name=Backup Operators)")
150 self.assertEquals(len(msg), 1)
151 self.assertEquals(str(msg[0]["name"]), "Backup Operators")
153 def test_old_name_of_renamed(self):
154 """Looking up by old name of renamed attribute"""
155 msg = self.ldb.search(expression="(displayName=Backup Operators)")
156 self.assertEquals(len(msg), 0)
158 def test_mapped_containing_sid(self):
159 """Looking up mapped entry containing SID"""
160 msg = self.ldb.search(expression="(cn=Replicator)")
161 self.assertEquals(len(msg), 1)
162 self.assertEquals(str(msg[0].dn),
163 "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
164 self.assertTrue("objectSid" in msg[0])
165 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
167 oc = set(msg[0]["objectClass"])
168 self.assertEquals(oc, set(["group"]))
170 def test_search_by_objclass(self):
171 """Looking up by objectClass"""
172 msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
173 self.assertEquals(set([str(m.dn) for m in msg]),
174 set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl",
175 "unixName=nobody,ou=Users,dc=vernstok,dc=nl"]))
177 def test_s3sam_modify(self):
178 # Adding a record that will be fallbacked
179 self.ldb.add({"dn": "cn=Foo",
183 "showInAdvancedViewOnly": "TRUE"}
186 # Checking for existence of record (local)
187 # TODO: This record must be searched in the local database, which is
188 # currently only supported for base searches
189 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
190 # TODO: Actually, this version should work as well but doesn't...
193 msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo",
195 attrs=['foo','blah','cn','showInAdvancedViewOnly'])
196 self.assertEquals(len(msg), 1)
197 self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE")
198 self.assertEquals(str(msg[0]["foo"]), "bar")
199 self.assertEquals(str(msg[0]["blah"]), "Blie")
201 # Adding record that will be mapped
202 self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl",
203 "objectClass": "user",
205 "sambaUnicodePwd": "geheim",
208 # Checking for existence of record (remote)
209 msg = self.ldb.search(expression="(unixName=bin)",
210 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
211 self.assertEquals(len(msg), 1)
212 self.assertEquals(str(msg[0]["cn"]), "Niemand")
213 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
215 # Checking for existence of record (local && remote)
216 msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
217 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
218 self.assertEquals(len(msg), 1) # TODO: should check with more records
219 self.assertEquals(str(msg[0]["cn"]), "Niemand")
220 self.assertEquals(str(msg[0]["unixName"]), "bin")
221 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
223 # Checking for existence of record (local || remote)
224 msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
225 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
226 #print "got %d replies" % len(msg)
227 self.assertEquals(len(msg), 1) # TODO: should check with more records
228 self.assertEquals(str(msg[0]["cn"]), "Niemand")
229 self.assertEquals(str(msg[0]["unixName"]), "bin")
230 self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim")
232 # Checking for data in destination database
233 msg = self.samba3.db.search(expression="(cn=Niemand)")
234 self.assertTrue(len(msg) >= 1)
235 self.assertEquals(str(msg[0]["sambaSID"]),
236 "S-1-5-21-4231626423-2410014848-2360679739-2001")
237 self.assertEquals(str(msg[0]["displayName"]), "Niemand")
239 # Adding attribute...
240 self.ldb.modify_ldif("""
241 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
247 # Checking whether changes are still there...
248 msg = self.ldb.search(expression="(cn=Niemand)")
249 self.assertTrue(len(msg) >= 1)
250 self.assertEquals(str(msg[0]["cn"]), "Niemand")
251 self.assertEquals(str(msg[0]["description"]), "Blah")
253 # Modifying attribute...
254 self.ldb.modify_ldif("""
255 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
261 # Checking whether changes are still there...
262 msg = self.ldb.search(expression="(cn=Niemand)")
263 self.assertTrue(len(msg) >= 1)
264 self.assertEquals(str(msg[0]["description"]), "Blie")
266 # Deleting attribute...
267 self.ldb.modify_ldif("""
268 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
273 # Checking whether changes are no longer there...
274 msg = self.ldb.search(expression="(cn=Niemand)")
275 self.assertTrue(len(msg) >= 1)
276 self.assertTrue(not "description" in msg[0])
279 self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl",
280 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
282 # Checking whether DN has changed...
283 msg = self.ldb.search(expression="(cn=Niemand2)")
284 self.assertEquals(len(msg), 1)
285 self.assertEquals(str(msg[0].dn),
286 "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
289 self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
291 # Checking whether record is gone...
292 msg = self.ldb.search(expression="(cn=Niemand2)")
293 self.assertEquals(len(msg), 0)
296 class MapTestCase(MapBaseTestCase):
299 super(MapTestCase, self).setUp()
300 ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
301 ldif = read_datafile("provision_samba3sam.ldif")
302 ldb.add_ldif(self.samba4.subst(ldif))
303 self.setup_modules(ldb, self.samba3, self.samba4)
305 self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session())
307 def test_map_search(self):
308 """Running search tests on mapped data."""
310 "dn": "sambaDomainName=TESTS," + self.samba3.basedn,
311 "objectclass": ["sambaDomain", "top"],
312 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739",
313 "sambaNextRid": "2000",
314 "sambaDomainName": "TESTS"
317 # Add a set of split records
318 self.ldb.add_ldif("""
319 dn: """+ self.samba4.dn("cn=Domain Users") + """
322 objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
325 # Add a set of split records
326 self.ldb.add_ldif("""
327 dn: """+ self.samba4.dn("cn=X") + """
336 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
340 "dn": self.samba4.dn("cn=Y"),
341 "objectClass": "top",
351 "dn": self.samba4.dn("cn=Z"),
352 "objectClass": "top",
361 # Add a set of remote records
364 "dn": self.samba3.dn("cn=A"),
365 "objectClass": "posixAccount",
368 "sambaBadPasswordCount": "x",
369 "sambaLogonTime": "x",
371 "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552",
372 "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"})
375 "dn": self.samba3.dn("cn=B"),
376 "objectClass": "top",
379 "sambaBadPasswordCount": "x",
380 "sambaLogonTime": "y",
384 "dn": self.samba3.dn("cn=C"),
385 "objectClass": "top",
388 "sambaBadPasswordCount": "y",
389 "sambaLogonTime": "z",
392 # Testing search by DN
394 # Search remote record by local DN
395 dn = self.samba4.dn("cn=A")
396 res = self.ldb.search(dn, scope=SCOPE_BASE,
397 attrs=["dnsHostName", "lastLogon"])
398 self.assertEquals(len(res), 1)
399 self.assertEquals(str(res[0].dn), dn)
400 self.assertTrue(not "dnsHostName" in res[0])
401 self.assertEquals(str(res[0]["lastLogon"]), "x")
403 # Search remote record by remote DN
404 dn = self.samba3.dn("cn=A")
405 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
406 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
407 self.assertEquals(len(res), 1)
408 self.assertEquals(str(res[0].dn), dn)
409 self.assertTrue(not "dnsHostName" in res[0])
410 self.assertTrue(not "lastLogon" in res[0])
411 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
413 # Search split record by local DN
414 dn = self.samba4.dn("cn=X")
415 res = self.ldb.search(dn, scope=SCOPE_BASE,
416 attrs=["dnsHostName", "lastLogon"])
417 self.assertEquals(len(res), 1)
418 self.assertEquals(str(res[0].dn), dn)
419 self.assertEquals(str(res[0]["dnsHostName"]), "x")
420 self.assertEquals(str(res[0]["lastLogon"]), "x")
422 # Search split record by remote DN
423 dn = self.samba3.dn("cn=X")
424 res = self.samba3.db.search(dn, scope=SCOPE_BASE,
425 attrs=["dnsHostName", "lastLogon", "sambaLogonTime"])
426 self.assertEquals(len(res), 1)
427 self.assertEquals(str(res[0].dn), dn)
428 self.assertTrue(not "dnsHostName" in res[0])
429 self.assertTrue(not "lastLogon" in res[0])
430 self.assertEquals(str(res[0]["sambaLogonTime"]), "x")
432 # Testing search by attribute
434 # Search by ignored attribute
435 res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT,
436 attrs=["dnsHostName", "lastLogon"])
437 self.assertEquals(len(res), 2)
438 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
439 self.assertEquals(str(res[0]["dnsHostName"]), "y")
440 self.assertEquals(str(res[0]["lastLogon"]), "y")
441 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
442 self.assertEquals(str(res[1]["dnsHostName"]), "x")
443 self.assertEquals(str(res[1]["lastLogon"]), "x")
445 # Search by kept attribute
446 res = self.ldb.search(expression="(description=y)",
447 scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"])
448 self.assertEquals(len(res), 2)
449 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
450 self.assertEquals(str(res[0]["dnsHostName"]), "z")
451 self.assertEquals(str(res[0]["lastLogon"]), "z")
452 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
453 self.assertTrue(not "dnsHostName" in res[1])
454 self.assertEquals(str(res[1]["lastLogon"]), "z")
456 # Search by renamed attribute
457 res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT,
458 attrs=["dnsHostName", "lastLogon"])
459 self.assertEquals(len(res), 2)
460 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
461 self.assertTrue(not "dnsHostName" in res[0])
462 self.assertEquals(str(res[0]["lastLogon"]), "y")
463 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
464 self.assertTrue(not "dnsHostName" in res[1])
465 self.assertEquals(str(res[1]["lastLogon"]), "x")
467 # Search by converted attribute
469 # Using the SID directly in the parse tree leads to conversion
470 # errors, letting the search fail with no results.
471 #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs)
472 res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"])
473 self.assertEquals(len(res), 4)
474 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
475 self.assertEquals(str(res[0]["dnsHostName"]), "x")
476 self.assertEquals(str(res[0]["lastLogon"]), "x")
477 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
479 self.assertTrue("objectSid" in res[0])
480 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
481 self.assertTrue(not "dnsHostName" in res[1])
482 self.assertEquals(str(res[1]["lastLogon"]), "x")
483 self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552",
485 self.assertTrue("objectSid" in res[1])
487 # Search by generated attribute
488 # In most cases, this even works when the mapping is missing
489 # a `convert_operator' by enumerating the remote db.
490 res = self.ldb.search(expression="(primaryGroupID=512)",
491 attrs=["dnsHostName", "lastLogon", "primaryGroupID"])
492 self.assertEquals(len(res), 1)
493 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
494 self.assertTrue(not "dnsHostName" in res[0])
495 self.assertEquals(str(res[0]["lastLogon"]), "x")
496 self.assertEquals(str(res[0]["primaryGroupID"]), "512")
498 # Note that Xs "objectSid" seems to be fine in the previous search for
500 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
501 #print len(res) + " results found"
502 #for i in range(len(res)):
503 # for (obj in res[i]) {
504 # print obj + ": " + res[i][obj]
509 # Search by remote name of renamed attribute */
510 res = self.ldb.search(expression="(sambaBadPasswordCount=*)",
511 attrs=["dnsHostName", "lastLogon"])
512 self.assertEquals(len(res), 0)
514 # Search by objectClass
515 attrs = ["dnsHostName", "lastLogon", "objectClass"]
516 res = self.ldb.search(expression="(objectClass=user)", attrs=attrs)
517 self.assertEquals(len(res), 2)
518 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
519 self.assertEquals(str(res[0]["dnsHostName"]), "x")
520 self.assertEquals(str(res[0]["lastLogon"]), "x")
521 self.assertEquals(str(res[0]["objectClass"][0]), "user")
522 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
523 self.assertTrue(not "dnsHostName" in res[1])
524 self.assertEquals(str(res[1]["lastLogon"]), "x")
525 self.assertEquals(str(res[1]["objectClass"][0]), "user")
527 # Prove that the objectClass is actually used for the search
528 res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))",
530 self.assertEquals(len(res), 3)
531 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
532 self.assertTrue(not "dnsHostName" in res[0])
533 self.assertEquals(str(res[0]["lastLogon"]), "y")
534 self.assertEquals(set(res[0]["objectClass"]), set(["top"]))
535 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
536 self.assertEquals(str(res[1]["dnsHostName"]), "x")
537 self.assertEquals(str(res[1]["lastLogon"]), "x")
538 self.assertEquals(str(res[1]["objectClass"][0]), "user")
539 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
540 self.assertTrue(not "dnsHostName" in res[2])
541 self.assertEquals(str(res[2]["lastLogon"]), "x")
542 self.assertEquals(res[2]["objectClass"][0], "user")
544 # Testing search by parse tree
546 # Search by conjunction of local attributes
547 res = self.ldb.search(expression="(&(codePage=x)(revision=x))",
548 attrs=["dnsHostName", "lastLogon"])
549 self.assertEquals(len(res), 2)
550 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
551 self.assertEquals(str(res[0]["dnsHostName"]), "y")
552 self.assertEquals(str(res[0]["lastLogon"]), "y")
553 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
554 self.assertEquals(str(res[1]["dnsHostName"]), "x")
555 self.assertEquals(str(res[1]["lastLogon"]), "x")
557 # Search by conjunction of remote attributes
558 res = self.ldb.search(expression="(&(lastLogon=x)(description=x))",
559 attrs=["dnsHostName", "lastLogon"])
560 self.assertEquals(len(res), 2)
561 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X"))
562 self.assertEquals(str(res[0]["dnsHostName"]), "x")
563 self.assertEquals(str(res[0]["lastLogon"]), "x")
564 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
565 self.assertTrue(not "dnsHostName" in res[1])
566 self.assertEquals(str(res[1]["lastLogon"]), "x")
568 # Search by conjunction of local and remote attribute
569 res = self.ldb.search(expression="(&(codePage=x)(description=x))",
570 attrs=["dnsHostName", "lastLogon"])
571 self.assertEquals(len(res), 2)
572 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
573 self.assertEquals(str(res[0]["dnsHostName"]), "y")
574 self.assertEquals(str(res[0]["lastLogon"]), "y")
575 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
576 self.assertEquals(str(res[1]["dnsHostName"]), "x")
577 self.assertEquals(str(res[1]["lastLogon"]), "x")
579 # Search by conjunction of local and remote attribute w/o match
580 attrs = ["dnsHostName", "lastLogon"]
581 res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))",
583 self.assertEquals(len(res), 0)
584 res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))",
586 self.assertEquals(len(res), 0)
588 # Search by disjunction of local attributes
589 res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))",
590 attrs=["dnsHostName", "lastLogon"])
591 self.assertEquals(len(res), 2)
592 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
593 self.assertEquals(str(res[0]["dnsHostName"]), "y")
594 self.assertEquals(str(res[0]["lastLogon"]), "y")
595 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
596 self.assertEquals(str(res[1]["dnsHostName"]), "x")
597 self.assertEquals(str(res[1]["lastLogon"]), "x")
599 # Search by disjunction of remote attributes
600 res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))",
601 attrs=["dnsHostName", "lastLogon"])
602 self.assertEquals(len(res), 3)
603 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
604 self.assertFalse("dnsHostName" in res[0])
605 self.assertEquals(str(res[0]["lastLogon"]), "y")
606 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
607 self.assertEquals(str(res[1]["dnsHostName"]), "x")
608 self.assertEquals(str(res[1]["lastLogon"]), "x")
609 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
610 self.assertFalse("dnsHostName" in res[2])
611 self.assertEquals(str(res[2]["lastLogon"]), "x")
613 # Search by disjunction of local and remote attribute
614 res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))",
615 attrs=["dnsHostName", "lastLogon"])
616 self.assertEquals(len(res), 3)
617 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
618 self.assertEquals(str(res[0]["dnsHostName"]), "y")
619 self.assertEquals(str(res[0]["lastLogon"]), "y")
620 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
621 self.assertFalse("dnsHostName" in res[1])
622 self.assertEquals(str(res[1]["lastLogon"]), "y")
623 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X"))
624 self.assertEquals(str(res[2]["dnsHostName"]), "x")
625 self.assertEquals(str(res[2]["lastLogon"]), "x")
627 # Search by disjunction of local and remote attribute w/o match
628 res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))",
629 attrs=["dnsHostName", "lastLogon"])
630 self.assertEquals(len(res), 0)
632 # Search by negated local attribute
633 res = self.ldb.search(expression="(!(revision=x))",
634 attrs=["dnsHostName", "lastLogon"])
635 self.assertEquals(len(res), 6)
636 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
637 self.assertTrue(not "dnsHostName" in res[0])
638 self.assertEquals(str(res[0]["lastLogon"]), "y")
639 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
640 self.assertTrue(not "dnsHostName" in res[1])
641 self.assertEquals(str(res[1]["lastLogon"]), "x")
642 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
643 self.assertEquals(str(res[2]["dnsHostName"]), "z")
644 self.assertEquals(str(res[2]["lastLogon"]), "z")
645 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
646 self.assertTrue(not "dnsHostName" in res[3])
647 self.assertEquals(str(res[3]["lastLogon"]), "z")
649 # Search by negated remote attribute
650 res = self.ldb.search(expression="(!(description=x))",
651 attrs=["dnsHostName", "lastLogon"])
652 self.assertEquals(len(res), 4)
653 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Z"))
654 self.assertEquals(str(res[0]["dnsHostName"]), "z")
655 self.assertEquals(str(res[0]["lastLogon"]), "z")
656 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C"))
657 self.assertTrue(not "dnsHostName" in res[1])
658 self.assertEquals(str(res[1]["lastLogon"]), "z")
660 # Search by negated conjunction of local attributes
661 res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))",
662 attrs=["dnsHostName", "lastLogon"])
663 self.assertEquals(len(res), 6)
664 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
665 self.assertTrue(not "dnsHostName" in res[0])
666 self.assertEquals(str(res[0]["lastLogon"]), "y")
667 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
668 self.assertTrue(not "dnsHostName" in res[1])
669 self.assertEquals(str(res[1]["lastLogon"]), "x")
670 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
671 self.assertEquals(str(res[2]["dnsHostName"]), "z")
672 self.assertEquals(str(res[2]["lastLogon"]), "z")
673 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
674 self.assertTrue(not "dnsHostName" in res[3])
675 self.assertEquals(str(res[3]["lastLogon"]), "z")
677 # Search by negated conjunction of remote attributes
678 res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))",
679 attrs=["dnsHostName", "lastLogon"])
680 self.assertEquals(len(res), 6)
681 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
682 self.assertEquals(str(res[0]["dnsHostName"]), "y")
683 self.assertEquals(str(res[0]["lastLogon"]), "y")
684 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B"))
685 self.assertTrue(not "dnsHostName" in res[1])
686 self.assertEquals(str(res[1]["lastLogon"]), "y")
687 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
688 self.assertEquals(str(res[2]["dnsHostName"]), "z")
689 self.assertEquals(str(res[2]["lastLogon"]), "z")
690 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
691 self.assertTrue(not "dnsHostName" in res[3])
692 self.assertEquals(str(res[3]["lastLogon"]), "z")
694 # Search by negated conjunction of local and remote attribute
695 res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))",
696 attrs=["dnsHostName", "lastLogon"])
697 self.assertEquals(len(res), 6)
698 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
699 self.assertTrue(not "dnsHostName" in res[0])
700 self.assertEquals(str(res[0]["lastLogon"]), "y")
701 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
702 self.assertTrue(not "dnsHostName" in res[1])
703 self.assertEquals(str(res[1]["lastLogon"]), "x")
704 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
705 self.assertEquals(str(res[2]["dnsHostName"]), "z")
706 self.assertEquals(str(res[2]["lastLogon"]), "z")
707 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
708 self.assertTrue(not "dnsHostName" in res[3])
709 self.assertEquals(str(res[3]["lastLogon"]), "z")
711 # Search by negated disjunction of local attributes
712 res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))",
713 attrs=["dnsHostName", "lastLogon"])
714 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
715 self.assertTrue(not "dnsHostName" in res[0])
716 self.assertEquals(str(res[0]["lastLogon"]), "y")
717 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=A"))
718 self.assertTrue(not "dnsHostName" in res[1])
719 self.assertEquals(str(res[1]["lastLogon"]), "x")
720 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z"))
721 self.assertEquals(str(res[2]["dnsHostName"]), "z")
722 self.assertEquals(str(res[2]["lastLogon"]), "z")
723 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=C"))
724 self.assertTrue(not "dnsHostName" in res[3])
725 self.assertEquals(str(res[3]["lastLogon"]), "z")
727 # Search by negated disjunction of remote attributes
728 res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))",
729 attrs=["dnsHostName", "lastLogon"])
730 self.assertEquals(len(res), 5)
731 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=Y"))
732 self.assertEquals(str(res[0]["dnsHostName"]), "y")
733 self.assertEquals(str(res[0]["lastLogon"]), "y")
734 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
735 self.assertEquals(str(res[1]["dnsHostName"]), "z")
736 self.assertEquals(str(res[1]["lastLogon"]), "z")
737 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
738 self.assertTrue(not "dnsHostName" in res[2])
739 self.assertEquals(str(res[2]["lastLogon"]), "z")
741 # Search by negated disjunction of local and remote attribute
742 res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))",
743 attrs=["dnsHostName", "lastLogon"])
744 self.assertEquals(len(res), 5)
745 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A"))
746 self.assertTrue(not "dnsHostName" in res[0])
747 self.assertEquals(str(res[0]["lastLogon"]), "x")
748 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z"))
749 self.assertEquals(str(res[1]["dnsHostName"]), "z")
750 self.assertEquals(str(res[1]["lastLogon"]), "z")
751 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C"))
752 self.assertTrue(not "dnsHostName" in res[2])
753 self.assertEquals(str(res[2]["lastLogon"]), "z")
755 # Search by complex parse tree
756 res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"])
757 self.assertEquals(len(res), 7)
758 self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B"))
759 self.assertTrue(not "dnsHostName" in res[0])
760 self.assertEquals(str(res[0]["lastLogon"]), "y")
761 self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X"))
762 self.assertEquals(str(res[1]["dnsHostName"]), "x")
763 self.assertEquals(str(res[1]["lastLogon"]), "x")
764 self.assertEquals(str(res[2].dn), self.samba4.dn("cn=A"))
765 self.assertTrue(not "dnsHostName" in res[2])
766 self.assertEquals(str(res[2]["lastLogon"]), "x")
767 self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z"))
768 self.assertEquals(str(res[3]["dnsHostName"]), "z")
769 self.assertEquals(str(res[3]["lastLogon"]), "z")
770 self.assertEquals(str(res[4].dn), self.samba4.dn("cn=C"))
771 self.assertTrue(not "dnsHostName" in res[4])
772 self.assertEquals(str(res[4]["lastLogon"]), "z")
775 dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
779 def test_map_modify_local(self):
780 """Modification of local records."""
782 dn = "cn=test,dc=idealx,dc=org"
783 self.ldb.add({"dn": dn,
787 "description": "test"})
789 attrs = ["foo", "revision", "description"]
790 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
791 self.assertEquals(len(res), 1)
792 self.assertEquals(str(res[0].dn), dn)
793 self.assertEquals(str(res[0]["foo"]), "bar")
794 self.assertEquals(str(res[0]["revision"]), "1")
795 self.assertEquals(str(res[0]["description"]), "test")
796 # Check it's not in the local db
797 res = self.samba4.db.search(expression="(cn=test)",
798 scope=SCOPE_DEFAULT, attrs=attrs)
799 self.assertEquals(len(res), 0)
800 # Check it's not in the remote db
801 res = self.samba3.db.search(expression="(cn=test)",
802 scope=SCOPE_DEFAULT, attrs=attrs)
803 self.assertEquals(len(res), 0)
805 # Modify local record
813 self.ldb.modify_ldif(ldif)
815 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
816 self.assertEquals(len(res), 1)
817 self.assertEquals(str(res[0].dn), dn)
818 self.assertEquals(str(res[0]["foo"]), "baz")
819 self.assertEquals(str(res[0]["revision"]), "1")
820 self.assertEquals(str(res[0]["description"]), "foo")
822 # Rename local record
823 dn2 = "cn=toast,dc=idealx,dc=org"
824 self.ldb.rename(dn, dn2)
826 res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs)
827 self.assertEquals(len(res), 1)
828 self.assertEquals(str(res[0].dn), dn2)
829 self.assertEquals(str(res[0]["foo"]), "baz")
830 self.assertEquals(str(res[0]["revision"]), "1")
831 self.assertEquals(str(res[0]["description"]), "foo")
833 # Delete local record
836 res = self.ldb.search(dn2, scope=SCOPE_BASE)
837 self.assertEquals(len(res), 0)
839 def test_map_modify_remote_remote(self):
840 """Modification of remote data of remote records"""
842 dn = self.samba4.dn("cn=test")
843 dn2 = self.samba3.dn("cn=test")
844 self.samba3.db.add({"dn": dn2,
846 "description": "foo",
847 "sambaBadPasswordCount": "3",
848 "sambaNextRid": "1001"})
850 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
851 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
852 self.assertEquals(len(res), 1)
853 self.assertEquals(str(res[0].dn), dn2)
854 self.assertEquals(str(res[0]["description"]), "foo")
855 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
856 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
858 attrs = ["description", "badPwdCount", "nextRid"]
859 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="")
860 self.assertEquals(len(res), 1)
861 self.assertEquals(str(res[0].dn), dn)
862 self.assertEquals(str(res[0]["description"]), "foo")
863 self.assertEquals(str(res[0]["badPwdCount"]), "3")
864 self.assertEquals(str(res[0]["nextRid"]), "1001")
866 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
867 self.assertEquals(len(res), 0)
869 # Modify remote data of remote record
877 self.ldb.modify_ldif(ldif)
879 res = self.ldb.search(dn, scope=SCOPE_BASE,
880 attrs=["description", "badPwdCount", "nextRid"])
881 self.assertEquals(len(res), 1)
882 self.assertEquals(str(res[0].dn), dn)
883 self.assertEquals(str(res[0]["description"]), "test")
884 self.assertEquals(str(res[0]["badPwdCount"]), "4")
885 self.assertEquals(str(res[0]["nextRid"]), "1001")
887 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
888 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
889 self.assertEquals(len(res), 1)
890 self.assertEquals(str(res[0].dn), dn2)
891 self.assertEquals(str(res[0]["description"]), "test")
892 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
893 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
895 # Rename remote record
896 dn2 = self.samba4.dn("cn=toast")
897 self.ldb.rename(dn, dn2)
900 res = self.ldb.search(dn, scope=SCOPE_BASE,
901 attrs=["description", "badPwdCount", "nextRid"])
902 self.assertEquals(len(res), 1)
903 self.assertEquals(str(res[0].dn), dn)
904 self.assertEquals(str(res[0]["description"]), "test")
905 self.assertEquals(str(res[0]["badPwdCount"]), "4")
906 self.assertEquals(str(res[0]["nextRid"]), "1001")
908 dn2 = self.samba3.dn("cn=toast")
909 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
910 attrs=["description", "sambaBadPasswordCount", "sambaNextRid"])
911 self.assertEquals(len(res), 1)
912 self.assertEquals(str(res[0].dn), dn2)
913 self.assertEquals(str(res[0]["description"]), "test")
914 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
915 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
917 # Delete remote record
919 # Check in mapped db that it's removed
920 res = self.ldb.search(dn, scope=SCOPE_BASE)
921 self.assertEquals(len(res), 0)
923 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
924 self.assertEquals(len(res), 0)
926 def test_map_modify_remote_local(self):
927 """Modification of local data of remote records"""
928 # Add remote record (same as before)
929 dn = self.samba4.dn("cn=test")
930 dn2 = self.samba3.dn("cn=test")
931 self.samba3.db.add({"dn": dn2,
933 "description": "foo",
934 "sambaBadPasswordCount": "3",
935 "sambaNextRid": "1001"})
937 # Modify local data of remote record
946 self.ldb.modify_ldif(ldif)
948 attrs = ["revision", "description"]
949 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
950 self.assertEquals(len(res), 1)
951 self.assertEquals(str(res[0].dn), dn)
952 self.assertEquals(str(res[0]["description"]), "test")
953 self.assertEquals(str(res[0]["revision"]), "1")
955 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
956 self.assertEquals(len(res), 1)
957 self.assertEquals(str(res[0].dn), dn2)
958 self.assertEquals(str(res[0]["description"]), "test")
959 self.assertTrue(not "revision" in res[0])
961 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
962 self.assertEquals(len(res), 1)
963 self.assertEquals(str(res[0].dn), dn)
964 self.assertTrue(not "description" in res[0])
965 self.assertEquals(str(res[0]["revision"]), "1")
967 # Delete (newly) split record
970 def test_map_modify_split(self):
971 """Testing modification of split records"""
973 dn = self.samba4.dn("cn=test")
974 dn2 = self.samba3.dn("cn=test")
978 "description": "foo",
983 attrs = ["description", "badPwdCount", "nextRid", "revision"]
984 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
985 self.assertEquals(len(res), 1)
986 self.assertEquals(str(res[0].dn), dn)
987 self.assertEquals(str(res[0]["description"]), "foo")
988 self.assertEquals(str(res[0]["badPwdCount"]), "3")
989 self.assertEquals(str(res[0]["nextRid"]), "1001")
990 self.assertEquals(str(res[0]["revision"]), "1")
992 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
993 self.assertEquals(len(res), 1)
994 self.assertEquals(str(res[0].dn), dn)
995 self.assertTrue(not "description" in res[0])
996 self.assertTrue(not "badPwdCount" in res[0])
997 self.assertTrue(not "nextRid" in res[0])
998 self.assertEquals(str(res[0]["revision"]), "1")
1000 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1002 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1003 self.assertEquals(len(res), 1)
1004 self.assertEquals(str(res[0].dn), dn2)
1005 self.assertEquals(str(res[0]["description"]), "foo")
1006 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3")
1007 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1008 self.assertTrue(not "revision" in res[0])
1010 # Modify of split record
1013 replace: description
1015 replace: badPwdCount
1020 self.ldb.modify_ldif(ldif)
1021 # Check in mapped db
1022 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1023 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1024 self.assertEquals(len(res), 1)
1025 self.assertEquals(str(res[0].dn), dn)
1026 self.assertEquals(str(res[0]["description"]), "test")
1027 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1028 self.assertEquals(str(res[0]["nextRid"]), "1001")
1029 self.assertEquals(str(res[0]["revision"]), "2")
1031 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1032 self.assertEquals(len(res), 1)
1033 self.assertEquals(str(res[0].dn), dn)
1034 self.assertTrue(not "description" in res[0])
1035 self.assertTrue(not "badPwdCount" in res[0])
1036 self.assertTrue(not "nextRid" in res[0])
1037 self.assertEquals(str(res[0]["revision"]), "2")
1038 # Check in remote db
1039 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid",
1041 res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs)
1042 self.assertEquals(len(res), 1)
1043 self.assertEquals(str(res[0].dn), dn2)
1044 self.assertEquals(str(res[0]["description"]), "test")
1045 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1046 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1047 self.assertTrue(not "revision" in res[0])
1049 # Rename split record
1050 dn2 = self.samba4.dn("cn=toast")
1051 self.ldb.rename(dn, dn2)
1052 # Check in mapped db
1054 attrs = ["description", "badPwdCount", "nextRid", "revision"]
1055 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
1056 self.assertEquals(len(res), 1)
1057 self.assertEquals(str(res[0].dn), dn)
1058 self.assertEquals(str(res[0]["description"]), "test")
1059 self.assertEquals(str(res[0]["badPwdCount"]), "4")
1060 self.assertEquals(str(res[0]["nextRid"]), "1001")
1061 self.assertEquals(str(res[0]["revision"]), "2")
1063 res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs)
1064 self.assertEquals(len(res), 1)
1065 self.assertEquals(str(res[0].dn), dn)
1066 self.assertTrue(not "description" in res[0])
1067 self.assertTrue(not "badPwdCount" in res[0])
1068 self.assertTrue(not "nextRid" in res[0])
1069 self.assertEquals(str(res[0]["revision"]), "2")
1070 # Check in remote db
1071 dn2 = self.samba3.dn("cn=toast")
1072 res = self.samba3.db.search(dn2, scope=SCOPE_BASE,
1073 attrs=["description", "sambaBadPasswordCount", "sambaNextRid",
1075 self.assertEquals(len(res), 1)
1076 self.assertEquals(str(res[0].dn), dn2)
1077 self.assertEquals(str(res[0]["description"]), "test")
1078 self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4")
1079 self.assertEquals(str(res[0]["sambaNextRid"]), "1001")
1080 self.assertTrue(not "revision" in res[0])
1082 # Delete split record
1084 # Check in mapped db
1085 res = self.ldb.search(dn, scope=SCOPE_BASE)
1086 self.assertEquals(len(res), 0)
1088 res = self.samba4.db.search(dn, scope=SCOPE_BASE)
1089 self.assertEquals(len(res), 0)
1090 # Check in remote db
1091 res = self.samba3.db.search(dn2, scope=SCOPE_BASE)
1092 self.assertEquals(len(res), 0)