r26570: - Trim size of the swig-generated Python bindings by removing a bunch of...
[kai/samba.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2007
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 import os
24 import sys
25 import samba
26 import ldb
27 from samba import Ldb, substitute_var
28 from samba.tests import LdbTestCase, TestCaseInTempDir
29
30 datadir = sys.argv[2]
31
32 class Samba3SamTestCase(TestCaseInTempDir):
33     def setup_data(self, obj, ldif):
34         self.assertTrue(ldif is not None)
35         obj.db.add_ldif(substitute_var(ldif, obj.substvars))
36
37     def setup_modules(self, ldb, s3, s4, ldif):
38         self.assertTrue(ldif is not None)
39         ldb.add_ldif(substitute_var(ldif, s4.substvars))
40
41         ldif = """
42 dn: @MAP=samba3sam
43 @FROM: """ + s4.substvars["BASEDN"] + """
44 @TO: sambaDomainName=TESTS,""" + s3.substvars["BASEDN"] + """
45
46 dn: @MODULES
47 @LIST: rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition
48
49 dn: @PARTITION
50 partition: """ + s4.substvars["BASEDN"] + ":" + s4.url + """
51 partition: """ + s3.substvars["BASEDN"] + ":" + s3.url + """
52 replicateEntries: @SUBCLASSES
53 replicateEntries: @ATTRIBUTES
54 replicateEntries: @INDEXLIST
55 """
56         ldb.add_ldif(ldif)
57
58     def test_s3sam_search(self, ldb):
59         print "Looking up by non-mapped attribute"
60         msg = ldb.search(expression="(cn=Administrator)")
61         self.assertEquals(len(msg), 1)
62         self.assertEquals(msg[0]["cn"], "Administrator")
63
64         print "Looking up by mapped attribute"
65         msg = ldb.search(expression="(name=Backup Operators)")
66         self.assertEquals(len(msg), 1)
67         self.assertEquals(msg[0]["name"], "Backup Operators")
68
69         print "Looking up by old name of renamed attribute"
70         msg = ldb.search(expression="(displayName=Backup Operators)")
71         self.assertEquals(len(msg), 0)
72
73         print "Looking up mapped entry containing SID"
74         msg = ldb.search(expression="(cn=Replicator)")
75         self.assertEquals(len(msg), 1)
76         print msg[0].dn
77         self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
78         self.assertEquals(msg[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
79
80         print "Checking mapping of objectClass"
81         oc = set(msg[0]["objectClass"])
82         self.assertTrue(oc is not None)
83         for i in oc:
84             self.assertEquals(oc[i] == "posixGroup" or oc[i], "group")
85
86         print "Looking up by objectClass"
87         msg = ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
88         self.assertEquals(len(msg), 2)
89         for i in range(len(msg)):
90             self.assertEquals((str(msg[i].dn), "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or
91                    (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl"))
92
93
94     def test_s3sam_modify(ldb, s3):
95         print "Adding a record that will be fallbacked"
96         ldb.add_ldif("""
97 dn: cn=Foo
98 foo: bar
99 blah: Blie
100 cn: Foo
101 showInAdvancedViewOnly: TRUE
102     """)
103
104         print "Checking for existence of record (local)"
105         # TODO: This record must be searched in the local database, which is currently only supported for base searches
106         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
107         # TODO: Actually, this version should work as well but doesn't...
108         # 
109         #    
110         attrs =  ['foo','blah','cn','showInAdvancedViewOnly']
111         msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=ldb.LDB_SCOPE_BASE, attrs=attrs)
112         self.assertEquals(len(msg), 1)
113         self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
114         self.assertEquals(msg[0]["foo"], "bar")
115         self.assertEquals(msg[0]["blah"], "Blie")
116
117         print "Adding record that will be mapped"
118         ldb.add_ldif("""
119 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
120 objectClass: user
121 unixName: bin
122 sambaUnicodePwd: geheim
123 cn: Niemand
124 """)
125
126         print "Checking for existence of record (remote)"
127         msg = ldb.search(expression="(unixName=bin)", attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
128         self.assertEquals(len(msg), 1)
129         self.assertEquals(msg[0]["cn"], "Niemand")
130         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
131
132         print "Checking for existence of record (local && remote)"
133         msg = ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
134                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
135         self.assertEquals(len(msg), 1)           # TODO: should check with more records
136         self.assertEquals(msg[0]["cn"], "Niemand")
137         self.assertEquals(msg[0]["unixName"], "bin")
138         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
139
140         print "Checking for existence of record (local || remote)"
141         msg = ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
142                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
143         print "got " + len(msg) + " replies"
144         self.assertEquals(len(msg), 1)        # TODO: should check with more records
145         self.assertEquals(msg[0]["cn"], "Niemand")
146         self.assertEquals(msg[0]["unixName"] == "bin" or msg[0]["sambaUnicodePwd"], "geheim")
147
148         print "Checking for data in destination database"
149         msg = s3.db.search("(cn=Niemand)")
150         self.assertTrue(len(msg) >= 1)
151         self.assertEquals(msg[0]["sambaSID"], "S-1-5-21-4231626423-2410014848-2360679739-2001")
152         self.assertEquals(msg[0]["displayName"], "Niemand")
153
154         print "Adding attribute..."
155         ldb.modify_ldif("""
156 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
157 changetype: modify
158 add: description
159 description: Blah
160 """)
161
162         print "Checking whether changes are still there..."
163         msg = ldb.search(expression="(cn=Niemand)")
164         self.assertTrue(len(msg) >= 1)
165         self.assertEquals(msg[0]["cn"], "Niemand")
166         self.assertEquals(msg[0]["description"], "Blah")
167
168         print "Modifying attribute..."
169         ldb.modify_ldif("""
170 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
171 changetype: modify
172 replace: description
173 description: Blie
174 """)
175
176         print "Checking whether changes are still there..."
177         msg = ldb.search(expression="(cn=Niemand)")
178         self.assertTrue(len(msg) >= 1)
179         self.assertEquals(msg[0]["description"], "Blie")
180
181         print "Deleting attribute..."
182         ldb.modify_ldif("""
183 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
184 changetype: modify
185 delete: description
186 """)
187
188         print "Checking whether changes are no longer there..."
189         msg = ldb.search(expression="(cn=Niemand)")
190         self.assertTrue(len(msg) >= 1)
191         self.assertEquals(msg[0]["description"], undefined)
192
193         print "Renaming record..."
194         ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
195
196         print "Checking whether DN has changed..."
197         msg = ldb.search(expression="(cn=Niemand2)")
198         self.assertEquals(len(msg), 1)
199         self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
200
201         print "Deleting record..."
202         ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
203
204         print "Checking whether record is gone..."
205         msg = ldb.search(expression="(cn=Niemand2)")
206         self.assertEquals(len(msg), 0)
207
208     def test_map_search(ldb, s3, s4):
209         print "Running search tests on mapped data"
210         ldif = """
211 dn: """ + "sambaDomainName=TESTS,""" + s3.substvars["BASEDN"] + """
212 objectclass: sambaDomain
213 objectclass: top
214 sambaSID: S-1-5-21-4231626423-2410014848-2360679739
215 sambaNextRid: 2000
216 sambaDomainName: TESTS"""
217         self.assertTrue(ldif is not None)
218         s3.db.add_ldif(substitute_var(ldif, s3.substvars))
219
220         print "Add a set of split records"
221         ldif = """
222 dn: """ + s4.dn("cn=X") + """
223 objectClass: user
224 cn: X
225 codePage: x
226 revision: x
227 dnsHostName: x
228 nextRid: y
229 lastLogon: x
230 description: x
231 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
232 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
233
234 dn: """ + s4.dn("cn=Y") + """
235 objectClass: top
236 cn: Y
237 codePage: x
238 revision: x
239 dnsHostName: y
240 nextRid: y
241 lastLogon: y
242 description: x
243
244 dn: """ + s4.dn("cn=Z") + """
245 objectClass: top
246 cn: Z
247 codePage: x
248 revision: y
249 dnsHostName: z
250 nextRid: y
251 lastLogon: z
252 description: y
253 """
254
255         self.assertTrue(ldif is not None)
256         ldb.add_ldif(substitute_var(ldif, s4.substvars))
257
258         print "Add a set of remote records"
259
260         ldif = """
261 dn: """ + s3.dn("cn=A") + """
262 objectClass: posixAccount
263 cn: A
264 sambaNextRid: x
265 sambaBadPasswordCount: x
266 sambaLogonTime: x
267 description: x
268 sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552
269 sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512
270
271 dn: """ + s3.dn("cn=B") + """
272 objectClass: top
273 cn:B
274 sambaNextRid: x
275 sambaBadPasswordCount: x
276 sambaLogonTime: y
277 description: x
278
279 dn: """ + s3.dn("cn=C") + """
280 objectClass: top
281 cn: C
282 sambaNextRid: x
283 sambaBadPasswordCount: y
284 sambaLogonTime: z
285 description: y
286 """
287         self.assertTrue(ldif is not None)
288         s3.add_ldif(substitute_var(ldif, s3.substvars))
289
290         print "Testing search by DN"
291
292         # Search remote record by local DN
293         dn = s4.dn("cn=A")
294         attrs = ["dnsHostName", "lastLogon"]
295         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
296         self.assertEquals(len(res), 1)
297         self.assertEquals(str(str(res[0].dn)), dn)
298         self.assertEquals(res[0]["dnsHostName"], undefined)
299         self.assertEquals(res[0]["lastLogon"], "x")
300
301         # Search remote record by remote DN
302         dn = s3.dn("cn=A")
303         attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
304         res = s3.db.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
305         self.assertEquals(len(res), 1)
306         self.assertEquals(str(str(res[0].dn)), dn)
307         self.assertEquals(res[0]["dnsHostName"], undefined)
308         self.assertEquals(res[0]["lastLogon"], undefined)
309         self.assertEquals(res[0]["sambaLogonTime"], "x")
310
311         # Search split record by local DN
312         dn = s4.dn("cn=X")
313         attrs = ["dnsHostName", "lastLogon"]
314         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
315         self.assertEquals(len(res), 1)
316         self.assertEquals(str(str(res[0].dn)), dn)
317         self.assertEquals(res[0]["dnsHostName"], "x")
318         self.assertEquals(res[0]["lastLogon"], "x")
319
320         # Search split record by remote DN
321         dn = s3.dn("cn=X")
322         attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
323         res = s3.db.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
324         self.assertEquals(len(res), 1)
325         self.assertEquals(str(str(res[0].dn)), dn)
326         self.assertEquals(res[0]["dnsHostName"], undefined)
327         self.assertEquals(res[0]["lastLogon"], undefined)
328         self.assertEquals(res[0]["sambaLogonTime"], "x")
329
330         print "Testing search by attribute"
331
332         # Search by ignored attribute
333         attrs = ["dnsHostName", "lastLogon"]
334         res = ldb.search(expression="(revision=x)", scope=ldb.SCOPE_DEFAULT, attrs=attrs)
335         self.assertEquals(len(res), 2)
336         self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Y"))
337         self.assertEquals(res[0]["dnsHostName"], "y")
338         self.assertEquals(res[0]["lastLogon"], "y")
339         self.assertEquals(str(str(res[1].dn)), s4.dn("cn=X"))
340         self.assertEquals(res[1]["dnsHostName"], "x")
341         self.assertEquals(res[1]["lastLogon"], "x")
342
343         # Search by kept attribute
344         attrs = ["dnsHostName", "lastLogon"]
345         res = ldb.search(expression="(description=y)", scope=ldb.SCOPE_DEFAULT, attrs=attrs)
346         self.assertEquals(len(res), 2)
347         self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Z"))
348         self.assertEquals(res[0]["dnsHostName"], "z")
349         self.assertEquals(res[0]["lastLogon"], "z")
350         self.assertEquals(str(str(res[1].dn)), s4.dn("cn=C"))
351         self.assertEquals(res[1]["dnsHostName"], undefined)
352         self.assertEquals(res[1]["lastLogon"], "z")
353
354         # Search by renamed attribute
355         attrs = ["dnsHostName", "lastLogon"]
356         res = ldb.search(expression="(badPwdCount=x)", scope=ldb.SCOPE_DEFAULT, attrs=attrs)
357         self.assertEquals(len(res), 2)
358         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
359         self.assertEquals(res[0]["dnsHostName"], undefined)
360         self.assertEquals(res[0]["lastLogon"], "y")
361         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
362         self.assertEquals(res[1]["dnsHostName"], undefined)
363         self.assertEquals(res[1]["lastLogon"], "x")
364
365         # Search by converted attribute
366         attrs = ["dnsHostName", "lastLogon", "objectSid"]
367         # TODO:
368         #   Using the SID directly in the parse tree leads to conversion
369         #   errors, letting the search fail with no results.
370         #res = ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", NULL, ldb. SCOPE_DEFAULT, attrs)
371         res = ldb.search(expression="(objectSid=*)", attrs=attrs)
372         self.assertEquals(len(res), 3)
373         self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
374         self.assertEquals(res[0]["dnsHostName"], "x")
375         self.assertEquals(res[0]["lastLogon"], "x")
376         self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
377         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
378         self.assertEquals(res[1]["dnsHostName"], undefined)
379         self.assertEquals(res[1]["lastLogon"], "x")
380         self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
381
382         # Search by generated attribute 
383         # In most cases, this even works when the mapping is missing
384         # a `convert_operator' by enumerating the remote db.
385         attrs = ["dnsHostName", "lastLogon", "primaryGroupID"]
386         res = ldb.search(expression="(primaryGroupID=512)", attrs=attrs)
387         self.assertEquals(len(res), 1)
388         self.assertEquals(str(res[0].dn), s4.dn("cn=A"))
389         self.assertEquals(res[0]["dnsHostName"], undefined)
390         self.assertEquals(res[0]["lastLogon"], "x")
391         self.assertEquals(res[0]["primaryGroupID"], "512")
392
393         # TODO: There should actually be two results, A and X.  The
394         # primaryGroupID of X seems to get corrupted somewhere, and the
395         # objectSid isn't available during the generation of remote (!) data,
396         # which can be observed with the following search.  Also note that Xs
397         # objectSid seems to be fine in the previous search for objectSid... */
398         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
399         #print len(res) + " results found"
400         #for i in range(len(res)):
401         #    for (obj in res[i]) {
402         #        print obj + ": " + res[i][obj]
403         #    }
404         #    print "---"
405         #    
406
407         # Search by remote name of renamed attribute */
408         attrs = ["dnsHostName", "lastLogon"]
409         res = ldb.search(expression="(sambaBadPasswordCount=*)", attrs=attrs)
410         self.assertEquals(len(res), 0)
411
412         # Search by objectClass
413         attrs = ["dnsHostName", "lastLogon", "objectClass"]
414         res = ldb.search(expression="(objectClass=user)", attrs=attrs)
415         self.assertEquals(len(res), 2)
416         self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
417         self.assertEquals(res[0]["dnsHostName"], "x")
418         self.assertEquals(res[0]["lastLogon"], "x")
419         self.assertTrue(res[0]["objectClass"] is not None)
420         self.assertEquals(res[0]["objectClass"][0], "user")
421         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
422         self.assertEquals(res[1]["dnsHostName"], undefined)
423         self.assertEquals(res[1]["lastLogon"], "x")
424         self.assertTrue(res[1]["objectClass"] is not None)
425         self.assertEquals(res[1]["objectClass"][0], "user")
426
427         # Prove that the objectClass is actually used for the search
428         res = ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs)
429         self.assertEquals(len(res), 3)
430         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
431         self.assertEquals(res[0]["dnsHostName"], undefined)
432         self.assertEquals(res[0]["lastLogon"], "y")
433         self.assertTrue(res[0]["objectClass"] is not None)
434         for oc in set(res[0]["objectClass"]):
435             self.assertEquals(oc, "user")
436         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
437         self.assertEquals(res[1]["dnsHostName"], "x")
438         self.assertEquals(res[1]["lastLogon"], "x")
439         self.assertTrue(res[1]["objectClass"] is not None)
440         self.assertEquals(res[1]["objectClass"][0], "user")
441         self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
442         self.assertEquals(res[2]["dnsHostName"], undefined)
443         self.assertEquals(res[2]["lastLogon"], "x")
444         self.assertTrue(res[2]["objectClass"] is not None)
445         self.assertEquals(res[2]["objectClass"][0], "user")
446
447         print "Testing search by parse tree"
448
449         # Search by conjunction of local attributes
450         attrs = ["dnsHostName", "lastLogon"]
451         res = ldb.search(expression="(&(codePage=x)(revision=x))", attrs=attrs)
452         self.assertEquals(len(res), 2)
453         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
454         self.assertEquals(res[0]["dnsHostName"], "y")
455         self.assertEquals(res[0]["lastLogon"], "y")
456         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
457         self.assertEquals(res[1]["dnsHostName"], "x")
458         self.assertEquals(res[1]["lastLogon"], "x")
459
460         # Search by conjunction of remote attributes
461         attrs = ["dnsHostName", "lastLogon"]
462         res = ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=attrs)
463         self.assertEquals(len(res), 2)
464         self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
465         self.assertEquals(res[0]["dnsHostName"], "x")
466         self.assertEquals(res[0]["lastLogon"], "x")
467         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
468         self.assertEquals(res[1]["dnsHostName"], undefined)
469         self.assertEquals(res[1]["lastLogon"], "x")
470         
471         # Search by conjunction of local and remote attribute 
472         attrs = ["dnsHostName", "lastLogon"]
473         res = ldb.search(expression="(&(codePage=x)(description=x))", attrs=attrs)
474         self.assertEquals(len(res), 2)
475         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
476         self.assertEquals(res[0]["dnsHostName"], "y")
477         self.assertEquals(res[0]["lastLogon"], "y")
478         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
479         self.assertEquals(res[1]["dnsHostName"], "x")
480         self.assertEquals(res[1]["lastLogon"], "x")
481
482         # Search by conjunction of local and remote attribute w/o match
483         attrs = ["dnsHostName", "lastLogon"]
484         res = ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs)
485         self.assertEquals(len(res), 0)
486         res = ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs)
487         self.assertEquals(len(res), 0)
488
489         # Search by disjunction of local attributes
490         attrs = ["dnsHostName", "lastLogon"]
491         res = ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=attrs)
492         self.assertEquals(len(res), 2)
493         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
494         self.assertEquals(res[0]["dnsHostName"], "y")
495         self.assertEquals(res[0]["lastLogon"], "y")
496         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
497         self.assertEquals(res[1]["dnsHostName"], "x")
498         self.assertEquals(res[1]["lastLogon"], "x")
499
500         # Search by disjunction of remote attributes
501         attrs = ["dnsHostName", "lastLogon"]
502         res = ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=attrs)
503         self.assertEquals(len(res), 3)
504         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
505         self.assertEquals(res[0]["dnsHostName"], undefined)
506         self.assertEquals(res[0]["lastLogon"], "y")
507         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
508         self.assertEquals(res[1]["dnsHostName"], "x")
509         self.assertEquals(res[1]["lastLogon"], "x")
510         self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
511         self.assertEquals(res[2]["dnsHostName"], undefined)
512         self.assertEquals(res[2]["lastLogon"], "x")
513
514         # Search by disjunction of local and remote attribute
515         attrs = ["dnsHostName", "lastLogon"]
516         res = ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=attrs)
517         self.assertEquals(len(res), 3)
518         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
519         self.assertEquals(res[0]["dnsHostName"], "y")
520         self.assertEquals(res[0]["lastLogon"], "y")
521         self.assertEquals(str(res[1].dn), s4.dn("cn=B"))
522         self.assertEquals(res[1]["dnsHostName"], undefined)
523         self.assertEquals(res[1]["lastLogon"], "y")
524         self.assertEquals(str(res[2].dn), s4.dn("cn=X"))
525         self.assertEquals(res[2]["dnsHostName"], "x")
526         self.assertEquals(res[2]["lastLogon"], "x")
527
528         # Search by disjunction of local and remote attribute w/o match
529         attrs = ["dnsHostName", "lastLogon"]
530         res = ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=attrs)
531         self.assertEquals(len(res), 0)
532
533         # Search by negated local attribute
534         attrs = ["dnsHostName", "lastLogon"]
535         res = ldb.search(expression="(!(revision=x))", attrs=attrs)
536         self.assertEquals(len(res), 5)
537         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
538         self.assertEquals(res[0]["dnsHostName"], undefined)
539         self.assertEquals(res[0]["lastLogon"], "y")
540         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
541         self.assertEquals(res[1]["dnsHostName"], undefined)
542         self.assertEquals(res[1]["lastLogon"], "x")
543         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
544         self.assertEquals(res[2]["dnsHostName"], "z")
545         self.assertEquals(res[2]["lastLogon"], "z")
546         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
547         self.assertEquals(res[3]["dnsHostName"], undefined)
548         self.assertEquals(res[3]["lastLogon"], "z")
549
550         # Search by negated remote attribute
551         attrs = ["dnsHostName", "lastLogon"]
552         res = ldb.search(expression="(!(description=x))", attrs=attrs)
553         self.assertEquals(len(res), 3)
554         self.assertEquals(str(res[0].dn), s4.dn("cn=Z"))
555         self.assertEquals(res[0]["dnsHostName"], "z")
556         self.assertEquals(res[0]["lastLogon"], "z")
557         self.assertEquals(str(res[1].dn), s4.dn("cn=C"))
558         self.assertEquals(res[1]["dnsHostName"], undefined)
559         self.assertEquals(res[1]["lastLogon"], "z")
560
561         # Search by negated conjunction of local attributes
562         attrs = ["dnsHostName", "lastLogon"]
563         res = ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=attrs)
564         self.assertEquals(len(res), 5)
565         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
566         self.assertEquals(res[0]["dnsHostName"], undefined)
567         self.assertEquals(res[0]["lastLogon"], "y")
568         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
569         self.assertEquals(res[1]["dnsHostName"], undefined)
570         self.assertEquals(res[1]["lastLogon"], "x")
571         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
572         self.assertEquals(res[2]["dnsHostName"], "z")
573         self.assertEquals(res[2]["lastLogon"], "z")
574         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
575         self.assertEquals(res[3]["dnsHostName"], undefined)
576         self.assertEquals(res[3]["lastLogon"], "z")
577
578         # Search by negated conjunction of remote attributes
579         attrs = ["dnsHostName", "lastLogon"]
580         res = ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=attrs)
581         self.assertEquals(len(res), 5)
582         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
583         self.assertEquals(res[0]["dnsHostName"], "y")
584         self.assertEquals(res[0]["lastLogon"], "y")
585         self.assertEquals(str(res[1].dn), s4.dn("cn=B"))
586         self.assertEquals(res[1]["dnsHostName"], undefined)
587         self.assertEquals(res[1]["lastLogon"], "y")
588         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
589         self.assertEquals(res[2]["dnsHostName"], "z")
590         self.assertEquals(res[2]["lastLogon"], "z")
591         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
592         self.assertEquals(res[3]["dnsHostName"], undefined)
593         self.assertEquals(res[3]["lastLogon"], "z")
594
595         # Search by negated conjunction of local and remote attribute
596         attrs = ["dnsHostName", "lastLogon"]
597         res = ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=attrs)
598         self.assertEquals(len(res), 5)
599         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
600         self.assertEquals(res[0]["dnsHostName"], undefined)
601         self.assertEquals(res[0]["lastLogon"], "y")
602         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
603         self.assertEquals(res[1]["dnsHostName"], undefined)
604         self.assertEquals(res[1]["lastLogon"], "x")
605         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
606         self.assertEquals(res[2]["dnsHostName"], "z")
607         self.assertEquals(res[2]["lastLogon"], "z")
608         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
609         self.assertEquals(res[3]["dnsHostName"], undefined)
610         self.assertEquals(res[3]["lastLogon"], "z")
611
612         # Search by negated disjunction of local attributes
613         attrs = ["dnsHostName", "lastLogon"]
614         res = ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=attrs)
615         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
616         self.assertEquals(res[0]["dnsHostName"], undefined)
617         self.assertEquals(res[0]["lastLogon"], "y")
618         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
619         self.assertEquals(res[1]["dnsHostName"], undefined)
620         self.assertEquals(res[1]["lastLogon"], "x")
621         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
622         self.assertEquals(res[2]["dnsHostName"], "z")
623         self.assertEquals(res[2]["lastLogon"], "z")
624         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
625         self.assertEquals(res[3]["dnsHostName"], undefined)
626         self.assertEquals(res[3]["lastLogon"], "z")
627
628         # Search by negated disjunction of remote attributes
629         attrs = ["dnsHostName", "lastLogon"]
630         res = ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=attrs)
631         self.assertEquals(len(res), 4)
632         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
633         self.assertEquals(res[0]["dnsHostName"], "y")
634         self.assertEquals(res[0]["lastLogon"], "y")
635         self.assertEquals(str(res[1].dn), s4.dn("cn=Z"))
636         self.assertEquals(res[1]["dnsHostName"], "z")
637         self.assertEquals(res[1]["lastLogon"], "z")
638         self.assertEquals(str(res[2].dn), s4.dn("cn=C"))
639         self.assertEquals(res[2]["dnsHostName"], undefined)
640         self.assertEquals(res[2]["lastLogon"], "z")
641
642         # Search by negated disjunction of local and remote attribute
643         attrs = ["dnsHostName", "lastLogon"]
644         res = ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", attrs=attrs)
645         self.assertEquals(len(res), 4)
646         self.assertEquals(str(res[0].dn), s4.dn("cn=A"))
647         self.assertEquals(res[0]["dnsHostName"], undefined)
648         self.assertEquals(res[0]["lastLogon"], "x")
649         self.assertEquals(str(res[1].dn), s4.dn("cn=Z"))
650         self.assertEquals(res[1]["dnsHostName"], "z")
651         self.assertEquals(res[1]["lastLogon"], "z")
652         self.assertEquals(str(res[2].dn), s4.dn("cn=C"))
653         self.assertEquals(res[2]["dnsHostName"], undefined)
654         self.assertEquals(res[2]["lastLogon"], "z")
655
656         print "Search by complex parse tree"
657         attrs = ["dnsHostName", "lastLogon"]
658         res = ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=attrs)
659         self.assertEquals(len(res), 6)
660         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
661         self.assertEquals(res[0]["dnsHostName"], undefined)
662         self.assertEquals(res[0]["lastLogon"], "y")
663         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
664         self.assertEquals(res[1]["dnsHostName"], "x")
665         self.assertEquals(res[1]["lastLogon"], "x")
666         self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
667         self.assertEquals(res[2]["dnsHostName"], undefined)
668         self.assertEquals(res[2]["lastLogon"], "x")
669         self.assertEquals(str(res[3].dn), s4.dn("cn=Z"))
670         self.assertEquals(res[3]["dnsHostName"], "z")
671         self.assertEquals(res[3]["lastLogon"], "z")
672         self.assertEquals(str(res[4].dn), s4.dn("cn=C"))
673         self.assertEquals(res[4]["dnsHostName"], undefined)
674         self.assertEquals(res[4]["lastLogon"], "z")
675
676         # Clean up
677         dns = [s4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
678         for dn in dns:
679             ldb.delete(dn)
680
681     def test_map_modify(self, ldb, s3, s4):
682         print "Running modification tests on mapped data"
683
684         print "Testing modification of local records"
685
686         # Add local record
687         dn = "cn=test,dc=idealx,dc=org"
688         ldif = """
689 dn: """ + dn + """
690 cn: test
691 foo: bar
692 revision: 1
693 description: test
694 """
695         ldb.add_ldif(ldif)
696         # Check it's there
697         attrs = ["foo", "revision", "description"]
698         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
699         self.assertEquals(len(res), 1)
700         self.assertEquals(str(res[0].dn), dn)
701         self.assertEquals(res[0]["foo"], "bar")
702         self.assertEquals(res[0]["revision"], "1")
703         self.assertEquals(res[0]["description"], "test")
704         # Check it's not in the local db
705         res = s4.db.search("(cn=test)", NULL, ldb.SCOPE_DEFAULT, attrs)
706         self.assertEquals(len(res), 0)
707         # Check it's not in the remote db
708         res = s3.db.search("(cn=test)", NULL, ldb.SCOPE_DEFAULT, attrs)
709         self.assertEquals(len(res), 0)
710
711         # Modify local record
712         ldif = """
713 dn: """ + dn + """
714 replace: foo
715 foo: baz
716 replace: description
717 description: foo
718 """
719         ldb.modify_ldif(ldif)
720         # Check in local db
721         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
722         self.assertEquals(len(res), 1)
723         self.assertEquals(str(res[0].dn), dn)
724         self.assertEquals(res[0]["foo"], "baz")
725         self.assertEquals(res[0]["revision"], "1")
726         self.assertEquals(res[0]["description"], "foo")
727
728         # Rename local record
729         dn2 = "cn=toast,dc=idealx,dc=org"
730         ldb.rename(dn, dn2)
731         # Check in local db
732         res = ldb.search(dn2, scope=ldb.SCOPE_BASE, attrs=attrs)
733         self.assertEquals(len(res), 1)
734         self.assertEquals(str(res[0].dn), dn2)
735         self.assertEquals(res[0]["foo"], "baz")
736         self.assertEquals(res[0]["revision"], "1")
737         self.assertEquals(res[0]["description"], "foo")
738
739         # Delete local record
740         ldb.delete(dn2)
741         # Check it's gone
742         res = ldb.search(dn2, scope=ldb.SCOPE_BASE)
743         self.assertEquals(len(res), 0)
744
745         print "Testing modification of remote records"
746
747         # Add remote record
748         dn = s4.dn("cn=test")
749         dn2 = s3.dn("cn=test")
750         ldif = """
751 dn: """ + dn2 + """
752 cn: test
753 description: foo
754 sambaBadPasswordCount: 3
755 sambaNextRid: 1001
756 """
757         s3.db.add_ldif(ldif)
758         # Check it's there
759         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
760         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
761         self.assertEquals(len(res), 1)
762         self.assertEquals(str(res[0].dn), dn2)
763         self.assertEquals(res[0]["description"], "foo")
764         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
765         self.assertEquals(res[0]["sambaNextRid"], "1001")
766         # Check in mapped db
767         attrs = ["description", "badPwdCount", "nextRid"]
768         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
769         self.assertEquals(len(res), 1)
770         self.assertEquals(str(res[0].dn), dn)
771         self.assertEquals(res[0]["description"], "foo")
772         self.assertEquals(res[0]["badPwdCount"], "3")
773         self.assertEquals(res[0]["nextRid"], "1001")
774         # Check in local db
775         res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
776         self.assertEquals(len(res), 0)
777
778         # Modify remote data of remote record
779         ldif = """
780 dn: """ + dn + """
781 replace: description
782 description: test
783 replace: badPwdCount
784 badPwdCount: 4
785 """
786         ldb.modify_ldif(ldif)
787         # Check in mapped db
788         attrs = ["description", "badPwdCount", "nextRid"]
789         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
790         self.assertEquals(len(res), 1)
791         self.assertEquals(str(res[0].dn), dn)
792         self.assertEquals(res[0]["description"], "test")
793         self.assertEquals(res[0]["badPwdCount"], "4")
794         self.assertEquals(res[0]["nextRid"], "1001")
795         # Check in remote db
796         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
797         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
798         self.assertEquals(len(res), 1)
799         self.assertEquals(str(res[0].dn), dn2)
800         self.assertEquals(res[0]["description"], "test")
801         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
802         self.assertEquals(res[0]["sambaNextRid"], "1001")
803
804         # Rename remote record
805         dn2 = s4.dn("cn=toast")
806         ldb.rename(dn, dn2)
807         # Check in mapped db
808         dn = dn2
809         attrs = ["description", "badPwdCount", "nextRid"]
810         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
811         self.assertEquals(len(res), 1)
812         self.assertEquals(str(res[0].dn), dn)
813         self.assertEquals(res[0]["description"], "test")
814         self.assertEquals(res[0]["badPwdCount"], "4")
815         self.assertEquals(res[0]["nextRid"], "1001")
816         # Check in remote db 
817         dn2 = s3.dn("cn=toast")
818         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
819         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
820         self.assertEquals(len(res), 1)
821         self.assertEquals(str(res[0].dn), dn2)
822         self.assertEquals(res[0]["description"], "test")
823         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
824         self.assertEquals(res[0]["sambaNextRid"], "1001")
825
826         # Delete remote record
827         ldb.delete(dn)
828         # Check in mapped db
829         res = ldb.search(dn, scope=ldb.SCOPE_BASE)
830         self.assertEquals(len(res), 0)
831         # Check in remote db
832         res = s3.db.search("", dn2, ldb.SCOPE_BASE)
833         self.assertEquals(len(res), 0)
834
835         # Add remote record (same as before)
836         dn = s4.dn("cn=test")
837         dn2 = s3.dn("cn=test")
838         ldif = """
839 dn: """ + dn2 + """
840 cn: test
841 description: foo
842 sambaBadPasswordCount: 3
843 sambaNextRid: 1001
844 """
845         s3.db.add_ldif(ldif)
846
847         # Modify local data of remote record
848         ldif = """
849 dn: """ + dn + """
850 add: revision
851 revision: 1
852 replace: description
853 description: test
854 """
855         ldb.modify_ldif(ldif)
856         # Check in mapped db
857         attrs = ["revision", "description"]
858         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
859         self.assertEquals(len(res), 1)
860         self.assertEquals(str(res[0].dn), dn)
861         self.assertEquals(res[0]["description"], "test")
862         self.assertEquals(res[0]["revision"], "1")
863         # Check in remote db
864         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
865         self.assertEquals(len(res), 1)
866         self.assertEquals(str(res[0].dn), dn2)
867         self.assertEquals(res[0]["description"], "test")
868         self.assertEquals(res[0]["revision"], undefined)
869         # Check in local db
870         res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
871         self.assertEquals(len(res), 1)
872         self.assertEquals(str(res[0].dn), dn)
873         self.assertEquals(res[0]["description"], undefined)
874         self.assertEquals(res[0]["revision"], "1")
875
876         # Delete (newly) split record
877         ldb.delete(dn)
878
879         print "Testing modification of split records"
880
881         # Add split record
882         dn = s4.dn("cn=test")
883         dn2 = s3.dn("cn=test")
884         ldif = """
885 dn: """ + dn + """
886 cn: test
887 description: foo
888 badPwdCount: 3
889 nextRid: 1001
890 revision: 1
891 """
892         ldb.add_ldif(ldif)
893         # Check it's there
894         attrs = ["description", "badPwdCount", "nextRid", "revision"]
895         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
896         self.assertEquals(len(res), 1)
897         self.assertEquals(str(res[0].dn), dn)
898         self.assertEquals(res[0]["description"], "foo")
899         self.assertEquals(res[0]["badPwdCount"], "3")
900         self.assertEquals(res[0]["nextRid"], "1001")
901         self.assertEquals(res[0]["revision"], "1")
902         # Check in local db
903         res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
904         self.assertEquals(len(res), 1)
905         self.assertEquals(str(res[0].dn), dn)
906         self.assertEquals(res[0]["description"], undefined)
907         self.assertEquals(res[0]["badPwdCount"], undefined)
908         self.assertEquals(res[0]["nextRid"], undefined)
909         self.assertEquals(res[0]["revision"], "1")
910         # Check in remote db
911         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
912         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
913         self.assertEquals(len(res), 1)
914         self.assertEquals(str(res[0].dn), dn2)
915         self.assertEquals(res[0]["description"], "foo")
916         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
917         self.assertEquals(res[0]["sambaNextRid"], "1001")
918         self.assertEquals(res[0]["revision"], undefined)
919
920         # Modify of split record
921         ldif = """
922 dn: """ + dn + """
923 replace: description
924 description: test
925 replace: badPwdCount
926 badPwdCount: 4
927 replace: revision
928 revision: 2
929 """
930         ldb.modify_ldif(ldif)
931         # Check in mapped db
932         attrs = ["description", "badPwdCount", "nextRid", "revision"]
933         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
934         self.assertEquals(len(res), 1)
935         self.assertEquals(str(res[0].dn), dn)
936         self.assertEquals(res[0]["description"], "test")
937         self.assertEquals(res[0]["badPwdCount"], "4")
938         self.assertEquals(res[0]["nextRid"], "1001")
939         self.assertEquals(res[0]["revision"], "2")
940         # Check in local db
941         res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
942         self.assertEquals(len(res), 1)
943         self.assertEquals(str(res[0].dn), dn)
944         self.assertEquals(res[0]["description"], undefined)
945         self.assertEquals(res[0]["badPwdCount"], undefined)
946         self.assertEquals(res[0]["nextRid"], undefined)
947         self.assertEquals(res[0]["revision"], "2")
948         # Check in remote db
949         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
950         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
951         self.assertEquals(len(res), 1)
952         self.assertEquals(str(res[0].dn), dn2)
953         self.assertEquals(res[0]["description"], "test")
954         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
955         self.assertEquals(res[0]["sambaNextRid"], "1001")
956         self.assertEquals(res[0]["revision"], undefined)
957
958         # Rename split record
959         dn2 = s4.dn("cn=toast")
960         ldb.rename(dn, dn2)
961         # Check in mapped db
962         dn = dn2
963         attrs = ["description", "badPwdCount", "nextRid", "revision"]
964         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
965         self.assertEquals(len(res), 1)
966         self.assertEquals(str(res[0].dn), dn)
967         self.assertEquals(res[0]["description"], "test")
968         self.assertEquals(res[0]["badPwdCount"], "4")
969         self.assertEquals(res[0]["nextRid"], "1001")
970         self.assertEquals(res[0]["revision"], "2")
971         # Check in local db
972         res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
973         self.assertEquals(len(res), 1)
974         self.assertEquals(str(res[0].dn), dn)
975         self.assertEquals(res[0]["description"], undefined)
976         self.assertEquals(res[0]["badPwdCount"], undefined)
977         self.assertEquals(res[0]["nextRid"], undefined)
978         self.assertEquals(res[0]["revision"], "2")
979         # Check in remote db
980         dn2 = s3.dn("cn=toast")
981         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
982         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
983         self.assertEquals(len(res), 1)
984         self.assertEquals(str(res[0].dn), dn2)
985         self.assertEquals(res[0]["description"], "test")
986         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
987         self.assertEquals(res[0]["sambaNextRid"], "1001")
988         self.assertEquals(res[0]["revision"], undefined)
989
990         # Delete split record
991         ldb.delete(dn)
992         # Check in mapped db
993         res = ldb.search(dn, scope=ldb.SCOPE_BASE)
994         self.assertEquals(len(res), 0)
995         # Check in local db
996         res = s4.db.search("", dn, ldb.SCOPE_BASE)
997         self.assertEquals(len(res), 0)
998         # Check in remote db
999         res = s3.db.search("", dn2, ldb.SCOPE_BASE)
1000         self.assertEquals(len(res), 0)
1001
1002     def setUp(self):
1003         super(Samba3SamTestCase, self).setUp()
1004
1005         def make_dn(rdn):
1006             return rdn + ",sambaDomainName=TESTS," + this.substvars["BASEDN"]
1007
1008         def make_s4dn(rdn):
1009             return rdn + "," + this.substvars["BASEDN"]
1010
1011         ldb = Ldb()
1012
1013         ldbfile = os.path.join(self.tempdir, "test.ldb")
1014         ldburl = "tdb://" + ldbfile
1015
1016         tempdir = self.tempdir
1017
1018         class Target:
1019             def __init__(self, file, basedn, dn):
1020                 self.file = os.path.join(tempdir, file)
1021                 self.url = "tdb://" + self.file
1022                 self.substvars = {"BASEDN": basedn}
1023                 self.db = Ldb()
1024                 self.dn = dn
1025
1026         samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
1027         samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
1028         templates = Target("templates.ldb", "cn=templates", None)
1029
1030         ldb.connect(ldburl)
1031         samba3.db.connect(samba3.url)
1032         templates.db.connect(templates.url)
1033         samba4.db.connect(samba4.url)
1034
1035         self.setup_data(samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read())
1036         self.setup_data(templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
1037         self.setup_modules(ldb, samba3, samba4, open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read())
1038
1039         ldb = Ldb()
1040         ldb.connect(ldburl)
1041
1042         self.test_s3sam_search(ldb)
1043         self.test_s3sam_modify(ldb, samba3)
1044
1045         os.unlink(ldbfile)
1046         os.unlink(samba3.file)
1047         os.unlink(templates.file)
1048         os.unlink(samba4.file)
1049
1050         ldb = Ldb()
1051         ldb.connect(ldburl)
1052         samba3.db = Ldb()
1053         samba3.db.connect(samba3.url)
1054         templates.db = Ldb()
1055         templates.db.connect(templates.url)
1056         samba4.db = Ldb()
1057         samba4.db.connect(samba4.url)
1058
1059         self.setup_data(templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
1060         self.setup_modules(ldb, samba3, samba4, open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read())
1061
1062         ldb = Ldb()
1063         ldb.connect(ldburl)
1064
1065         test_map_search(ldb, samba3, samba4)
1066         test_map_modify(ldb, samba3, samba4)
1067