3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2007
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
27 from samba import Ldb, substitute_var
28 from samba.tests import LdbTestCase, TestCaseInTempDir
32 class Samba3SamTestCase(TestCaseInTempDir):
33 def setup_data(self, obj, ldif):
34 self.assertTrue(ldif is not None)
35 obj.db.add_ldif(substitute_var(ldif, obj.substvars))
37 def setup_modules(self, ldb, s3, s4, ldif):
38 self.assertTrue(ldif is not None)
39 ldb.add_ldif(substitute_var(ldif, s4.substvars))
43 @FROM: """ + s4.substvars["BASEDN"] + """
44 @TO: sambaDomainName=TESTS,""" + s3.substvars["BASEDN"] + """
47 @LIST: rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition
50 partition: """ + s4.substvars["BASEDN"] + ":" + s4.url + """
51 partition: """ + s3.substvars["BASEDN"] + ":" + s3.url + """
52 replicateEntries: @SUBCLASSES
53 replicateEntries: @ATTRIBUTES
54 replicateEntries: @INDEXLIST
58 def test_s3sam_search(self, ldb):
59 print "Looking up by non-mapped attribute"
60 msg = ldb.search(expression="(cn=Administrator)")
61 self.assertEquals(len(msg), 1)
62 self.assertEquals(msg[0]["cn"], "Administrator")
64 print "Looking up by mapped attribute"
65 msg = ldb.search(expression="(name=Backup Operators)")
66 self.assertEquals(len(msg), 1)
67 self.assertEquals(msg[0]["name"], "Backup Operators")
69 print "Looking up by old name of renamed attribute"
70 msg = ldb.search(expression="(displayName=Backup Operators)")
71 self.assertEquals(len(msg), 0)
73 print "Looking up mapped entry containing SID"
74 msg = ldb.search(expression="(cn=Replicator)")
75 self.assertEquals(len(msg), 1)
77 self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
78 self.assertEquals(msg[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
80 print "Checking mapping of objectClass"
81 oc = set(msg[0]["objectClass"])
82 self.assertTrue(oc is not None)
84 self.assertEquals(oc[i] == "posixGroup" or oc[i], "group")
86 print "Looking up by objectClass"
87 msg = ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
88 self.assertEquals(len(msg), 2)
89 for i in range(len(msg)):
90 self.assertEquals((str(msg[i].dn), "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or
91 (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl"))
94 def test_s3sam_modify(ldb, s3):
95 print "Adding a record that will be fallbacked"
101 showInAdvancedViewOnly: TRUE
104 print "Checking for existence of record (local)"
105 # TODO: This record must be searched in the local database, which is currently only supported for base searches
106 # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
107 # TODO: Actually, this version should work as well but doesn't...
110 attrs = ['foo','blah','cn','showInAdvancedViewOnly']
111 msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=ldb.LDB_SCOPE_BASE, attrs=attrs)
112 self.assertEquals(len(msg), 1)
113 self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
114 self.assertEquals(msg[0]["foo"], "bar")
115 self.assertEquals(msg[0]["blah"], "Blie")
117 print "Adding record that will be mapped"
119 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
122 sambaUnicodePwd: geheim
126 print "Checking for existence of record (remote)"
127 msg = ldb.search(expression="(unixName=bin)", attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
128 self.assertEquals(len(msg), 1)
129 self.assertEquals(msg[0]["cn"], "Niemand")
130 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
132 print "Checking for existence of record (local && remote)"
133 msg = ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))",
134 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
135 self.assertEquals(len(msg), 1) # TODO: should check with more records
136 self.assertEquals(msg[0]["cn"], "Niemand")
137 self.assertEquals(msg[0]["unixName"], "bin")
138 self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
140 print "Checking for existence of record (local || remote)"
141 msg = ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))",
142 attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
143 print "got " + len(msg) + " replies"
144 self.assertEquals(len(msg), 1) # TODO: should check with more records
145 self.assertEquals(msg[0]["cn"], "Niemand")
146 self.assertEquals(msg[0]["unixName"] == "bin" or msg[0]["sambaUnicodePwd"], "geheim")
148 print "Checking for data in destination database"
149 msg = s3.db.search("(cn=Niemand)")
150 self.assertTrue(len(msg) >= 1)
151 self.assertEquals(msg[0]["sambaSID"], "S-1-5-21-4231626423-2410014848-2360679739-2001")
152 self.assertEquals(msg[0]["displayName"], "Niemand")
154 print "Adding attribute..."
156 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
162 print "Checking whether changes are still there..."
163 msg = ldb.search(expression="(cn=Niemand)")
164 self.assertTrue(len(msg) >= 1)
165 self.assertEquals(msg[0]["cn"], "Niemand")
166 self.assertEquals(msg[0]["description"], "Blah")
168 print "Modifying attribute..."
170 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
176 print "Checking whether changes are still there..."
177 msg = ldb.search(expression="(cn=Niemand)")
178 self.assertTrue(len(msg) >= 1)
179 self.assertEquals(msg[0]["description"], "Blie")
181 print "Deleting attribute..."
183 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
188 print "Checking whether changes are no longer there..."
189 msg = ldb.search(expression="(cn=Niemand)")
190 self.assertTrue(len(msg) >= 1)
191 self.assertEquals(msg[0]["description"], undefined)
193 print "Renaming record..."
194 ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
196 print "Checking whether DN has changed..."
197 msg = ldb.search(expression="(cn=Niemand2)")
198 self.assertEquals(len(msg), 1)
199 self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
201 print "Deleting record..."
202 ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
204 print "Checking whether record is gone..."
205 msg = ldb.search(expression="(cn=Niemand2)")
206 self.assertEquals(len(msg), 0)
208 def test_map_search(ldb, s3, s4):
209 print "Running search tests on mapped data"
211 dn: """ + "sambaDomainName=TESTS,""" + s3.substvars["BASEDN"] + """
212 objectclass: sambaDomain
214 sambaSID: S-1-5-21-4231626423-2410014848-2360679739
216 sambaDomainName: TESTS"""
217 self.assertTrue(ldif is not None)
218 s3.db.add_ldif(substitute_var(ldif, s3.substvars))
220 print "Add a set of split records"
222 dn: """ + s4.dn("cn=X") + """
231 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
232 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
234 dn: """ + s4.dn("cn=Y") + """
244 dn: """ + s4.dn("cn=Z") + """
255 self.assertTrue(ldif is not None)
256 ldb.add_ldif(substitute_var(ldif, s4.substvars))
258 print "Add a set of remote records"
261 dn: """ + s3.dn("cn=A") + """
262 objectClass: posixAccount
265 sambaBadPasswordCount: x
268 sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552
269 sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512
271 dn: """ + s3.dn("cn=B") + """
275 sambaBadPasswordCount: x
279 dn: """ + s3.dn("cn=C") + """
283 sambaBadPasswordCount: y
287 self.assertTrue(ldif is not None)
288 s3.add_ldif(substitute_var(ldif, s3.substvars))
290 print "Testing search by DN"
292 # Search remote record by local DN
294 attrs = ["dnsHostName", "lastLogon"]
295 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
296 self.assertEquals(len(res), 1)
297 self.assertEquals(str(str(res[0].dn)), dn)
298 self.assertEquals(res[0]["dnsHostName"], undefined)
299 self.assertEquals(res[0]["lastLogon"], "x")
301 # Search remote record by remote DN
303 attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
304 res = s3.db.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
305 self.assertEquals(len(res), 1)
306 self.assertEquals(str(str(res[0].dn)), dn)
307 self.assertEquals(res[0]["dnsHostName"], undefined)
308 self.assertEquals(res[0]["lastLogon"], undefined)
309 self.assertEquals(res[0]["sambaLogonTime"], "x")
311 # Search split record by local DN
313 attrs = ["dnsHostName", "lastLogon"]
314 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
315 self.assertEquals(len(res), 1)
316 self.assertEquals(str(str(res[0].dn)), dn)
317 self.assertEquals(res[0]["dnsHostName"], "x")
318 self.assertEquals(res[0]["lastLogon"], "x")
320 # Search split record by remote DN
322 attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
323 res = s3.db.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
324 self.assertEquals(len(res), 1)
325 self.assertEquals(str(str(res[0].dn)), dn)
326 self.assertEquals(res[0]["dnsHostName"], undefined)
327 self.assertEquals(res[0]["lastLogon"], undefined)
328 self.assertEquals(res[0]["sambaLogonTime"], "x")
330 print "Testing search by attribute"
332 # Search by ignored attribute
333 attrs = ["dnsHostName", "lastLogon"]
334 res = ldb.search(expression="(revision=x)", scope=ldb.SCOPE_DEFAULT, attrs=attrs)
335 self.assertEquals(len(res), 2)
336 self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Y"))
337 self.assertEquals(res[0]["dnsHostName"], "y")
338 self.assertEquals(res[0]["lastLogon"], "y")
339 self.assertEquals(str(str(res[1].dn)), s4.dn("cn=X"))
340 self.assertEquals(res[1]["dnsHostName"], "x")
341 self.assertEquals(res[1]["lastLogon"], "x")
343 # Search by kept attribute
344 attrs = ["dnsHostName", "lastLogon"]
345 res = ldb.search(expression="(description=y)", scope=ldb.SCOPE_DEFAULT, attrs=attrs)
346 self.assertEquals(len(res), 2)
347 self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Z"))
348 self.assertEquals(res[0]["dnsHostName"], "z")
349 self.assertEquals(res[0]["lastLogon"], "z")
350 self.assertEquals(str(str(res[1].dn)), s4.dn("cn=C"))
351 self.assertEquals(res[1]["dnsHostName"], undefined)
352 self.assertEquals(res[1]["lastLogon"], "z")
354 # Search by renamed attribute
355 attrs = ["dnsHostName", "lastLogon"]
356 res = ldb.search(expression="(badPwdCount=x)", scope=ldb.SCOPE_DEFAULT, attrs=attrs)
357 self.assertEquals(len(res), 2)
358 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
359 self.assertEquals(res[0]["dnsHostName"], undefined)
360 self.assertEquals(res[0]["lastLogon"], "y")
361 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
362 self.assertEquals(res[1]["dnsHostName"], undefined)
363 self.assertEquals(res[1]["lastLogon"], "x")
365 # Search by converted attribute
366 attrs = ["dnsHostName", "lastLogon", "objectSid"]
368 # Using the SID directly in the parse tree leads to conversion
369 # errors, letting the search fail with no results.
370 #res = ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", NULL, ldb. SCOPE_DEFAULT, attrs)
371 res = ldb.search(expression="(objectSid=*)", attrs=attrs)
372 self.assertEquals(len(res), 3)
373 self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
374 self.assertEquals(res[0]["dnsHostName"], "x")
375 self.assertEquals(res[0]["lastLogon"], "x")
376 self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
377 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
378 self.assertEquals(res[1]["dnsHostName"], undefined)
379 self.assertEquals(res[1]["lastLogon"], "x")
380 self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
382 # Search by generated attribute
383 # In most cases, this even works when the mapping is missing
384 # a `convert_operator' by enumerating the remote db.
385 attrs = ["dnsHostName", "lastLogon", "primaryGroupID"]
386 res = ldb.search(expression="(primaryGroupID=512)", attrs=attrs)
387 self.assertEquals(len(res), 1)
388 self.assertEquals(str(res[0].dn), s4.dn("cn=A"))
389 self.assertEquals(res[0]["dnsHostName"], undefined)
390 self.assertEquals(res[0]["lastLogon"], "x")
391 self.assertEquals(res[0]["primaryGroupID"], "512")
393 # TODO: There should actually be two results, A and X. The
394 # primaryGroupID of X seems to get corrupted somewhere, and the
395 # objectSid isn't available during the generation of remote (!) data,
396 # which can be observed with the following search. Also note that Xs
397 # objectSid seems to be fine in the previous search for objectSid... */
398 #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
399 #print len(res) + " results found"
400 #for i in range(len(res)):
401 # for (obj in res[i]) {
402 # print obj + ": " + res[i][obj]
407 # Search by remote name of renamed attribute */
408 attrs = ["dnsHostName", "lastLogon"]
409 res = ldb.search(expression="(sambaBadPasswordCount=*)", attrs=attrs)
410 self.assertEquals(len(res), 0)
412 # Search by objectClass
413 attrs = ["dnsHostName", "lastLogon", "objectClass"]
414 res = ldb.search(expression="(objectClass=user)", attrs=attrs)
415 self.assertEquals(len(res), 2)
416 self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
417 self.assertEquals(res[0]["dnsHostName"], "x")
418 self.assertEquals(res[0]["lastLogon"], "x")
419 self.assertTrue(res[0]["objectClass"] is not None)
420 self.assertEquals(res[0]["objectClass"][0], "user")
421 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
422 self.assertEquals(res[1]["dnsHostName"], undefined)
423 self.assertEquals(res[1]["lastLogon"], "x")
424 self.assertTrue(res[1]["objectClass"] is not None)
425 self.assertEquals(res[1]["objectClass"][0], "user")
427 # Prove that the objectClass is actually used for the search
428 res = ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs)
429 self.assertEquals(len(res), 3)
430 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
431 self.assertEquals(res[0]["dnsHostName"], undefined)
432 self.assertEquals(res[0]["lastLogon"], "y")
433 self.assertTrue(res[0]["objectClass"] is not None)
434 for oc in set(res[0]["objectClass"]):
435 self.assertEquals(oc, "user")
436 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
437 self.assertEquals(res[1]["dnsHostName"], "x")
438 self.assertEquals(res[1]["lastLogon"], "x")
439 self.assertTrue(res[1]["objectClass"] is not None)
440 self.assertEquals(res[1]["objectClass"][0], "user")
441 self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
442 self.assertEquals(res[2]["dnsHostName"], undefined)
443 self.assertEquals(res[2]["lastLogon"], "x")
444 self.assertTrue(res[2]["objectClass"] is not None)
445 self.assertEquals(res[2]["objectClass"][0], "user")
447 print "Testing search by parse tree"
449 # Search by conjunction of local attributes
450 attrs = ["dnsHostName", "lastLogon"]
451 res = ldb.search(expression="(&(codePage=x)(revision=x))", attrs=attrs)
452 self.assertEquals(len(res), 2)
453 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
454 self.assertEquals(res[0]["dnsHostName"], "y")
455 self.assertEquals(res[0]["lastLogon"], "y")
456 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
457 self.assertEquals(res[1]["dnsHostName"], "x")
458 self.assertEquals(res[1]["lastLogon"], "x")
460 # Search by conjunction of remote attributes
461 attrs = ["dnsHostName", "lastLogon"]
462 res = ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=attrs)
463 self.assertEquals(len(res), 2)
464 self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
465 self.assertEquals(res[0]["dnsHostName"], "x")
466 self.assertEquals(res[0]["lastLogon"], "x")
467 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
468 self.assertEquals(res[1]["dnsHostName"], undefined)
469 self.assertEquals(res[1]["lastLogon"], "x")
471 # Search by conjunction of local and remote attribute
472 attrs = ["dnsHostName", "lastLogon"]
473 res = ldb.search(expression="(&(codePage=x)(description=x))", attrs=attrs)
474 self.assertEquals(len(res), 2)
475 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
476 self.assertEquals(res[0]["dnsHostName"], "y")
477 self.assertEquals(res[0]["lastLogon"], "y")
478 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
479 self.assertEquals(res[1]["dnsHostName"], "x")
480 self.assertEquals(res[1]["lastLogon"], "x")
482 # Search by conjunction of local and remote attribute w/o match
483 attrs = ["dnsHostName", "lastLogon"]
484 res = ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs)
485 self.assertEquals(len(res), 0)
486 res = ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs)
487 self.assertEquals(len(res), 0)
489 # Search by disjunction of local attributes
490 attrs = ["dnsHostName", "lastLogon"]
491 res = ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=attrs)
492 self.assertEquals(len(res), 2)
493 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
494 self.assertEquals(res[0]["dnsHostName"], "y")
495 self.assertEquals(res[0]["lastLogon"], "y")
496 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
497 self.assertEquals(res[1]["dnsHostName"], "x")
498 self.assertEquals(res[1]["lastLogon"], "x")
500 # Search by disjunction of remote attributes
501 attrs = ["dnsHostName", "lastLogon"]
502 res = ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=attrs)
503 self.assertEquals(len(res), 3)
504 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
505 self.assertEquals(res[0]["dnsHostName"], undefined)
506 self.assertEquals(res[0]["lastLogon"], "y")
507 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
508 self.assertEquals(res[1]["dnsHostName"], "x")
509 self.assertEquals(res[1]["lastLogon"], "x")
510 self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
511 self.assertEquals(res[2]["dnsHostName"], undefined)
512 self.assertEquals(res[2]["lastLogon"], "x")
514 # Search by disjunction of local and remote attribute
515 attrs = ["dnsHostName", "lastLogon"]
516 res = ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=attrs)
517 self.assertEquals(len(res), 3)
518 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
519 self.assertEquals(res[0]["dnsHostName"], "y")
520 self.assertEquals(res[0]["lastLogon"], "y")
521 self.assertEquals(str(res[1].dn), s4.dn("cn=B"))
522 self.assertEquals(res[1]["dnsHostName"], undefined)
523 self.assertEquals(res[1]["lastLogon"], "y")
524 self.assertEquals(str(res[2].dn), s4.dn("cn=X"))
525 self.assertEquals(res[2]["dnsHostName"], "x")
526 self.assertEquals(res[2]["lastLogon"], "x")
528 # Search by disjunction of local and remote attribute w/o match
529 attrs = ["dnsHostName", "lastLogon"]
530 res = ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=attrs)
531 self.assertEquals(len(res), 0)
533 # Search by negated local attribute
534 attrs = ["dnsHostName", "lastLogon"]
535 res = ldb.search(expression="(!(revision=x))", attrs=attrs)
536 self.assertEquals(len(res), 5)
537 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
538 self.assertEquals(res[0]["dnsHostName"], undefined)
539 self.assertEquals(res[0]["lastLogon"], "y")
540 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
541 self.assertEquals(res[1]["dnsHostName"], undefined)
542 self.assertEquals(res[1]["lastLogon"], "x")
543 self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
544 self.assertEquals(res[2]["dnsHostName"], "z")
545 self.assertEquals(res[2]["lastLogon"], "z")
546 self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
547 self.assertEquals(res[3]["dnsHostName"], undefined)
548 self.assertEquals(res[3]["lastLogon"], "z")
550 # Search by negated remote attribute
551 attrs = ["dnsHostName", "lastLogon"]
552 res = ldb.search(expression="(!(description=x))", attrs=attrs)
553 self.assertEquals(len(res), 3)
554 self.assertEquals(str(res[0].dn), s4.dn("cn=Z"))
555 self.assertEquals(res[0]["dnsHostName"], "z")
556 self.assertEquals(res[0]["lastLogon"], "z")
557 self.assertEquals(str(res[1].dn), s4.dn("cn=C"))
558 self.assertEquals(res[1]["dnsHostName"], undefined)
559 self.assertEquals(res[1]["lastLogon"], "z")
561 # Search by negated conjunction of local attributes
562 attrs = ["dnsHostName", "lastLogon"]
563 res = ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=attrs)
564 self.assertEquals(len(res), 5)
565 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
566 self.assertEquals(res[0]["dnsHostName"], undefined)
567 self.assertEquals(res[0]["lastLogon"], "y")
568 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
569 self.assertEquals(res[1]["dnsHostName"], undefined)
570 self.assertEquals(res[1]["lastLogon"], "x")
571 self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
572 self.assertEquals(res[2]["dnsHostName"], "z")
573 self.assertEquals(res[2]["lastLogon"], "z")
574 self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
575 self.assertEquals(res[3]["dnsHostName"], undefined)
576 self.assertEquals(res[3]["lastLogon"], "z")
578 # Search by negated conjunction of remote attributes
579 attrs = ["dnsHostName", "lastLogon"]
580 res = ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=attrs)
581 self.assertEquals(len(res), 5)
582 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
583 self.assertEquals(res[0]["dnsHostName"], "y")
584 self.assertEquals(res[0]["lastLogon"], "y")
585 self.assertEquals(str(res[1].dn), s4.dn("cn=B"))
586 self.assertEquals(res[1]["dnsHostName"], undefined)
587 self.assertEquals(res[1]["lastLogon"], "y")
588 self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
589 self.assertEquals(res[2]["dnsHostName"], "z")
590 self.assertEquals(res[2]["lastLogon"], "z")
591 self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
592 self.assertEquals(res[3]["dnsHostName"], undefined)
593 self.assertEquals(res[3]["lastLogon"], "z")
595 # Search by negated conjunction of local and remote attribute
596 attrs = ["dnsHostName", "lastLogon"]
597 res = ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=attrs)
598 self.assertEquals(len(res), 5)
599 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
600 self.assertEquals(res[0]["dnsHostName"], undefined)
601 self.assertEquals(res[0]["lastLogon"], "y")
602 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
603 self.assertEquals(res[1]["dnsHostName"], undefined)
604 self.assertEquals(res[1]["lastLogon"], "x")
605 self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
606 self.assertEquals(res[2]["dnsHostName"], "z")
607 self.assertEquals(res[2]["lastLogon"], "z")
608 self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
609 self.assertEquals(res[3]["dnsHostName"], undefined)
610 self.assertEquals(res[3]["lastLogon"], "z")
612 # Search by negated disjunction of local attributes
613 attrs = ["dnsHostName", "lastLogon"]
614 res = ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=attrs)
615 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
616 self.assertEquals(res[0]["dnsHostName"], undefined)
617 self.assertEquals(res[0]["lastLogon"], "y")
618 self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
619 self.assertEquals(res[1]["dnsHostName"], undefined)
620 self.assertEquals(res[1]["lastLogon"], "x")
621 self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
622 self.assertEquals(res[2]["dnsHostName"], "z")
623 self.assertEquals(res[2]["lastLogon"], "z")
624 self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
625 self.assertEquals(res[3]["dnsHostName"], undefined)
626 self.assertEquals(res[3]["lastLogon"], "z")
628 # Search by negated disjunction of remote attributes
629 attrs = ["dnsHostName", "lastLogon"]
630 res = ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=attrs)
631 self.assertEquals(len(res), 4)
632 self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
633 self.assertEquals(res[0]["dnsHostName"], "y")
634 self.assertEquals(res[0]["lastLogon"], "y")
635 self.assertEquals(str(res[1].dn), s4.dn("cn=Z"))
636 self.assertEquals(res[1]["dnsHostName"], "z")
637 self.assertEquals(res[1]["lastLogon"], "z")
638 self.assertEquals(str(res[2].dn), s4.dn("cn=C"))
639 self.assertEquals(res[2]["dnsHostName"], undefined)
640 self.assertEquals(res[2]["lastLogon"], "z")
642 # Search by negated disjunction of local and remote attribute
643 attrs = ["dnsHostName", "lastLogon"]
644 res = ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", attrs=attrs)
645 self.assertEquals(len(res), 4)
646 self.assertEquals(str(res[0].dn), s4.dn("cn=A"))
647 self.assertEquals(res[0]["dnsHostName"], undefined)
648 self.assertEquals(res[0]["lastLogon"], "x")
649 self.assertEquals(str(res[1].dn), s4.dn("cn=Z"))
650 self.assertEquals(res[1]["dnsHostName"], "z")
651 self.assertEquals(res[1]["lastLogon"], "z")
652 self.assertEquals(str(res[2].dn), s4.dn("cn=C"))
653 self.assertEquals(res[2]["dnsHostName"], undefined)
654 self.assertEquals(res[2]["lastLogon"], "z")
656 print "Search by complex parse tree"
657 attrs = ["dnsHostName", "lastLogon"]
658 res = ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=attrs)
659 self.assertEquals(len(res), 6)
660 self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
661 self.assertEquals(res[0]["dnsHostName"], undefined)
662 self.assertEquals(res[0]["lastLogon"], "y")
663 self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
664 self.assertEquals(res[1]["dnsHostName"], "x")
665 self.assertEquals(res[1]["lastLogon"], "x")
666 self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
667 self.assertEquals(res[2]["dnsHostName"], undefined)
668 self.assertEquals(res[2]["lastLogon"], "x")
669 self.assertEquals(str(res[3].dn), s4.dn("cn=Z"))
670 self.assertEquals(res[3]["dnsHostName"], "z")
671 self.assertEquals(res[3]["lastLogon"], "z")
672 self.assertEquals(str(res[4].dn), s4.dn("cn=C"))
673 self.assertEquals(res[4]["dnsHostName"], undefined)
674 self.assertEquals(res[4]["lastLogon"], "z")
677 dns = [s4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
681 def test_map_modify(self, ldb, s3, s4):
682 print "Running modification tests on mapped data"
684 print "Testing modification of local records"
687 dn = "cn=test,dc=idealx,dc=org"
697 attrs = ["foo", "revision", "description"]
698 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
699 self.assertEquals(len(res), 1)
700 self.assertEquals(str(res[0].dn), dn)
701 self.assertEquals(res[0]["foo"], "bar")
702 self.assertEquals(res[0]["revision"], "1")
703 self.assertEquals(res[0]["description"], "test")
704 # Check it's not in the local db
705 res = s4.db.search("(cn=test)", NULL, ldb.SCOPE_DEFAULT, attrs)
706 self.assertEquals(len(res), 0)
707 # Check it's not in the remote db
708 res = s3.db.search("(cn=test)", NULL, ldb.SCOPE_DEFAULT, attrs)
709 self.assertEquals(len(res), 0)
711 # Modify local record
719 ldb.modify_ldif(ldif)
721 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
722 self.assertEquals(len(res), 1)
723 self.assertEquals(str(res[0].dn), dn)
724 self.assertEquals(res[0]["foo"], "baz")
725 self.assertEquals(res[0]["revision"], "1")
726 self.assertEquals(res[0]["description"], "foo")
728 # Rename local record
729 dn2 = "cn=toast,dc=idealx,dc=org"
732 res = ldb.search(dn2, scope=ldb.SCOPE_BASE, attrs=attrs)
733 self.assertEquals(len(res), 1)
734 self.assertEquals(str(res[0].dn), dn2)
735 self.assertEquals(res[0]["foo"], "baz")
736 self.assertEquals(res[0]["revision"], "1")
737 self.assertEquals(res[0]["description"], "foo")
739 # Delete local record
742 res = ldb.search(dn2, scope=ldb.SCOPE_BASE)
743 self.assertEquals(len(res), 0)
745 print "Testing modification of remote records"
748 dn = s4.dn("cn=test")
749 dn2 = s3.dn("cn=test")
754 sambaBadPasswordCount: 3
759 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
760 res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
761 self.assertEquals(len(res), 1)
762 self.assertEquals(str(res[0].dn), dn2)
763 self.assertEquals(res[0]["description"], "foo")
764 self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
765 self.assertEquals(res[0]["sambaNextRid"], "1001")
767 attrs = ["description", "badPwdCount", "nextRid"]
768 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
769 self.assertEquals(len(res), 1)
770 self.assertEquals(str(res[0].dn), dn)
771 self.assertEquals(res[0]["description"], "foo")
772 self.assertEquals(res[0]["badPwdCount"], "3")
773 self.assertEquals(res[0]["nextRid"], "1001")
775 res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
776 self.assertEquals(len(res), 0)
778 # Modify remote data of remote record
786 ldb.modify_ldif(ldif)
788 attrs = ["description", "badPwdCount", "nextRid"]
789 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
790 self.assertEquals(len(res), 1)
791 self.assertEquals(str(res[0].dn), dn)
792 self.assertEquals(res[0]["description"], "test")
793 self.assertEquals(res[0]["badPwdCount"], "4")
794 self.assertEquals(res[0]["nextRid"], "1001")
796 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
797 res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
798 self.assertEquals(len(res), 1)
799 self.assertEquals(str(res[0].dn), dn2)
800 self.assertEquals(res[0]["description"], "test")
801 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
802 self.assertEquals(res[0]["sambaNextRid"], "1001")
804 # Rename remote record
805 dn2 = s4.dn("cn=toast")
809 attrs = ["description", "badPwdCount", "nextRid"]
810 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
811 self.assertEquals(len(res), 1)
812 self.assertEquals(str(res[0].dn), dn)
813 self.assertEquals(res[0]["description"], "test")
814 self.assertEquals(res[0]["badPwdCount"], "4")
815 self.assertEquals(res[0]["nextRid"], "1001")
817 dn2 = s3.dn("cn=toast")
818 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
819 res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
820 self.assertEquals(len(res), 1)
821 self.assertEquals(str(res[0].dn), dn2)
822 self.assertEquals(res[0]["description"], "test")
823 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
824 self.assertEquals(res[0]["sambaNextRid"], "1001")
826 # Delete remote record
829 res = ldb.search(dn, scope=ldb.SCOPE_BASE)
830 self.assertEquals(len(res), 0)
832 res = s3.db.search("", dn2, ldb.SCOPE_BASE)
833 self.assertEquals(len(res), 0)
835 # Add remote record (same as before)
836 dn = s4.dn("cn=test")
837 dn2 = s3.dn("cn=test")
842 sambaBadPasswordCount: 3
847 # Modify local data of remote record
855 ldb.modify_ldif(ldif)
857 attrs = ["revision", "description"]
858 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
859 self.assertEquals(len(res), 1)
860 self.assertEquals(str(res[0].dn), dn)
861 self.assertEquals(res[0]["description"], "test")
862 self.assertEquals(res[0]["revision"], "1")
864 res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
865 self.assertEquals(len(res), 1)
866 self.assertEquals(str(res[0].dn), dn2)
867 self.assertEquals(res[0]["description"], "test")
868 self.assertEquals(res[0]["revision"], undefined)
870 res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
871 self.assertEquals(len(res), 1)
872 self.assertEquals(str(res[0].dn), dn)
873 self.assertEquals(res[0]["description"], undefined)
874 self.assertEquals(res[0]["revision"], "1")
876 # Delete (newly) split record
879 print "Testing modification of split records"
882 dn = s4.dn("cn=test")
883 dn2 = s3.dn("cn=test")
894 attrs = ["description", "badPwdCount", "nextRid", "revision"]
895 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
896 self.assertEquals(len(res), 1)
897 self.assertEquals(str(res[0].dn), dn)
898 self.assertEquals(res[0]["description"], "foo")
899 self.assertEquals(res[0]["badPwdCount"], "3")
900 self.assertEquals(res[0]["nextRid"], "1001")
901 self.assertEquals(res[0]["revision"], "1")
903 res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
904 self.assertEquals(len(res), 1)
905 self.assertEquals(str(res[0].dn), dn)
906 self.assertEquals(res[0]["description"], undefined)
907 self.assertEquals(res[0]["badPwdCount"], undefined)
908 self.assertEquals(res[0]["nextRid"], undefined)
909 self.assertEquals(res[0]["revision"], "1")
911 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
912 res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
913 self.assertEquals(len(res), 1)
914 self.assertEquals(str(res[0].dn), dn2)
915 self.assertEquals(res[0]["description"], "foo")
916 self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
917 self.assertEquals(res[0]["sambaNextRid"], "1001")
918 self.assertEquals(res[0]["revision"], undefined)
920 # Modify of split record
930 ldb.modify_ldif(ldif)
932 attrs = ["description", "badPwdCount", "nextRid", "revision"]
933 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
934 self.assertEquals(len(res), 1)
935 self.assertEquals(str(res[0].dn), dn)
936 self.assertEquals(res[0]["description"], "test")
937 self.assertEquals(res[0]["badPwdCount"], "4")
938 self.assertEquals(res[0]["nextRid"], "1001")
939 self.assertEquals(res[0]["revision"], "2")
941 res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
942 self.assertEquals(len(res), 1)
943 self.assertEquals(str(res[0].dn), dn)
944 self.assertEquals(res[0]["description"], undefined)
945 self.assertEquals(res[0]["badPwdCount"], undefined)
946 self.assertEquals(res[0]["nextRid"], undefined)
947 self.assertEquals(res[0]["revision"], "2")
949 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
950 res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
951 self.assertEquals(len(res), 1)
952 self.assertEquals(str(res[0].dn), dn2)
953 self.assertEquals(res[0]["description"], "test")
954 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
955 self.assertEquals(res[0]["sambaNextRid"], "1001")
956 self.assertEquals(res[0]["revision"], undefined)
958 # Rename split record
959 dn2 = s4.dn("cn=toast")
963 attrs = ["description", "badPwdCount", "nextRid", "revision"]
964 res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
965 self.assertEquals(len(res), 1)
966 self.assertEquals(str(res[0].dn), dn)
967 self.assertEquals(res[0]["description"], "test")
968 self.assertEquals(res[0]["badPwdCount"], "4")
969 self.assertEquals(res[0]["nextRid"], "1001")
970 self.assertEquals(res[0]["revision"], "2")
972 res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
973 self.assertEquals(len(res), 1)
974 self.assertEquals(str(res[0].dn), dn)
975 self.assertEquals(res[0]["description"], undefined)
976 self.assertEquals(res[0]["badPwdCount"], undefined)
977 self.assertEquals(res[0]["nextRid"], undefined)
978 self.assertEquals(res[0]["revision"], "2")
980 dn2 = s3.dn("cn=toast")
981 attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
982 res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
983 self.assertEquals(len(res), 1)
984 self.assertEquals(str(res[0].dn), dn2)
985 self.assertEquals(res[0]["description"], "test")
986 self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
987 self.assertEquals(res[0]["sambaNextRid"], "1001")
988 self.assertEquals(res[0]["revision"], undefined)
990 # Delete split record
993 res = ldb.search(dn, scope=ldb.SCOPE_BASE)
994 self.assertEquals(len(res), 0)
996 res = s4.db.search("", dn, ldb.SCOPE_BASE)
997 self.assertEquals(len(res), 0)
999 res = s3.db.search("", dn2, ldb.SCOPE_BASE)
1000 self.assertEquals(len(res), 0)
1003 super(Samba3SamTestCase, self).setUp()
1006 return rdn + ",sambaDomainName=TESTS," + this.substvars["BASEDN"]
1009 return rdn + "," + this.substvars["BASEDN"]
1013 ldbfile = os.path.join(self.tempdir, "test.ldb")
1014 ldburl = "tdb://" + ldbfile
1016 tempdir = self.tempdir
1019 def __init__(self, file, basedn, dn):
1020 self.file = os.path.join(tempdir, file)
1021 self.url = "tdb://" + self.file
1022 self.substvars = {"BASEDN": basedn}
1026 samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
1027 samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
1028 templates = Target("templates.ldb", "cn=templates", None)
1031 samba3.db.connect(samba3.url)
1032 templates.db.connect(templates.url)
1033 samba4.db.connect(samba4.url)
1035 self.setup_data(samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read())
1036 self.setup_data(templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
1037 self.setup_modules(ldb, samba3, samba4, open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read())
1042 self.test_s3sam_search(ldb)
1043 self.test_s3sam_modify(ldb, samba3)
1046 os.unlink(samba3.file)
1047 os.unlink(templates.file)
1048 os.unlink(samba4.file)
1053 samba3.db.connect(samba3.url)
1054 templates.db = Ldb()
1055 templates.db.connect(templates.url)
1057 samba4.db.connect(samba4.url)
1059 self.setup_data(templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
1060 self.setup_modules(ldb, samba3, samba4, open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read())
1065 test_map_search(ldb, samba3, samba4)
1066 test_map_modify(ldb, samba3, samba4)