r26598: Simplify the way Python tests are run.
[samba.git] / source4 / dsdb / samdb / ldb_modules / tests / samba3sam.py
1 #!/usr/bin/python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2007
5 # Copyright (C) Martin Kuehl <mkhl@samba.org> 2006
6 #
7 # This is a Python port of the original in testprogs/ejs/samba3sam.js
8 #   
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #   
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #   
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 import os
24 import sys
25 import samba
26 import ldb
27 from samba import Ldb, substitute_var
28 from samba.tests import LdbTestCase, TestCaseInTempDir
29
30 datadir = os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3")
31
32 class Samba3SamTestCase(TestCaseInTempDir):
33     def setup_data(self, obj, ldif):
34         self.assertTrue(ldif is not None)
35         obj.db.add_ldif(substitute_var(ldif, obj.substvars))
36
37     def setup_modules(self, ldb, s3, s4):
38
39         ldif = """
40 dn: @MAP=samba3sam
41 @FROM: """ + s4.basedn + """
42 @TO: sambaDomainName=TESTS,""" + s3.basedn + """
43
44 dn: @MODULES
45 @LIST: rootdse,paged_results,server_sort,extended_dn,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,partition
46
47 dn: @PARTITION
48 partition: """ + s4.basedn + ":" + s4.url + """
49 partition: """ + s3.basedn + ":" + s3.url + """
50 replicateEntries: @SUBCLASSES
51 replicateEntries: @ATTRIBUTES
52 replicateEntries: @INDEXLIST
53 """
54         ldb.add_ldif(ldif)
55
56     def _test_s3sam_search(self, ldb):
57         print "Looking up by non-mapped attribute"
58         msg = ldb.search(expression="(cn=Administrator)")
59         self.assertEquals(len(msg), 1)
60         self.assertEquals(msg[0]["cn"], "Administrator")
61
62         print "Looking up by mapped attribute"
63         msg = ldb.search(expression="(name=Backup Operators)")
64         self.assertEquals(len(msg), 1)
65         self.assertEquals(msg[0]["name"], "Backup Operators")
66
67         print "Looking up by old name of renamed attribute"
68         msg = ldb.search(expression="(displayName=Backup Operators)")
69         self.assertEquals(len(msg), 0)
70
71         print "Looking up mapped entry containing SID"
72         msg = ldb.search(expression="(cn=Replicator)")
73         self.assertEquals(len(msg), 1)
74         print msg[0].dn
75         self.assertEquals(str(msg[0].dn), "cn=Replicator,ou=Groups,dc=vernstok,dc=nl")
76         self.assertEquals(msg[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
77
78         print "Checking mapping of objectClass"
79         oc = set(msg[0]["objectClass"])
80         self.assertTrue(oc is not None)
81         for i in oc:
82             self.assertEquals(oc[i] == "posixGroup" or oc[i], "group")
83
84         print "Looking up by objectClass"
85         msg = ldb.search(expression="(|(objectClass=user)(cn=Administrator))")
86         self.assertEquals(len(msg), 2)
87         for i in range(len(msg)):
88             self.assertEquals((str(msg[i].dn), "unixName=Administrator,ou=Users,dc=vernstok,dc=nl") or
89                    (str(msg[i].dn) == "unixName=nobody,ou=Users,dc=vernstok,dc=nl"))
90
91
92     def _test_s3sam_modify(ldb, s3):
93         print "Adding a record that will be fallbacked"
94         ldb.add_ldif("""
95 dn: cn=Foo
96 foo: bar
97 blah: Blie
98 cn: Foo
99 showInAdvancedViewOnly: TRUE
100     """)
101
102         print "Checking for existence of record (local)"
103         # TODO: This record must be searched in the local database, which is currently only supported for base searches
104         # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')]
105         # TODO: Actually, this version should work as well but doesn't...
106         # 
107         #    
108         attrs =  ['foo','blah','cn','showInAdvancedViewOnly']
109         msg = ldb.search(expression="(cn=Foo)", base="cn=Foo", scope=ldb.LDB_SCOPE_BASE, attrs=attrs)
110         self.assertEquals(len(msg), 1)
111         self.assertEquals(msg[0]["showInAdvancedViewOnly"], "TRUE")
112         self.assertEquals(msg[0]["foo"], "bar")
113         self.assertEquals(msg[0]["blah"], "Blie")
114
115         print "Adding record that will be mapped"
116         ldb.add_ldif("""
117 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
118 objectClass: user
119 unixName: bin
120 sambaUnicodePwd: geheim
121 cn: Niemand
122 """)
123
124         print "Checking for existence of record (remote)"
125         msg = ldb.search(expression="(unixName=bin)", attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
126         self.assertEquals(len(msg), 1)
127         self.assertEquals(msg[0]["cn"], "Niemand")
128         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
129
130         print "Checking for existence of record (local && remote)"
131         msg = ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", 
132                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
133         self.assertEquals(len(msg), 1)           # TODO: should check with more records
134         self.assertEquals(msg[0]["cn"], "Niemand")
135         self.assertEquals(msg[0]["unixName"], "bin")
136         self.assertEquals(msg[0]["sambaUnicodePwd"], "geheim")
137
138         print "Checking for existence of record (local || remote)"
139         msg = ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", 
140                          attrs=['unixName','cn','dn', 'sambaUnicodePwd'])
141         print "got " + len(msg) + " replies"
142         self.assertEquals(len(msg), 1)        # TODO: should check with more records
143         self.assertEquals(msg[0]["cn"], "Niemand")
144         self.assertEquals(msg[0]["unixName"] == "bin" or msg[0]["sambaUnicodePwd"], "geheim")
145
146         print "Checking for data in destination database"
147         msg = s3.db.search("(cn=Niemand)")
148         self.assertTrue(len(msg) >= 1)
149         self.assertEquals(msg[0]["sambaSID"], "S-1-5-21-4231626423-2410014848-2360679739-2001")
150         self.assertEquals(msg[0]["displayName"], "Niemand")
151
152         print "Adding attribute..."
153         ldb.modify_ldif("""
154 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
155 changetype: modify
156 add: description
157 description: Blah
158 """)
159
160         print "Checking whether changes are still there..."
161         msg = ldb.search(expression="(cn=Niemand)")
162         self.assertTrue(len(msg) >= 1)
163         self.assertEquals(msg[0]["cn"], "Niemand")
164         self.assertEquals(msg[0]["description"], "Blah")
165
166         print "Modifying attribute..."
167         ldb.modify_ldif("""
168 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
169 changetype: modify
170 replace: description
171 description: Blie
172 """)
173
174         print "Checking whether changes are still there..."
175         msg = ldb.search(expression="(cn=Niemand)")
176         self.assertTrue(len(msg) >= 1)
177         self.assertEquals(msg[0]["description"], "Blie")
178
179         print "Deleting attribute..."
180         ldb.modify_ldif("""
181 dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl
182 changetype: modify
183 delete: description
184 """)
185
186         print "Checking whether changes are no longer there..."
187         msg = ldb.search(expression="(cn=Niemand)")
188         self.assertTrue(len(msg) >= 1)
189         self.assertEquals(msg[0]["description"], undefined)
190
191         print "Renaming record..."
192         ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
193
194         print "Checking whether DN has changed..."
195         msg = ldb.search(expression="(cn=Niemand2)")
196         self.assertEquals(len(msg), 1)
197         self.assertEquals(str(msg[0].dn), "cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
198
199         print "Deleting record..."
200         ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl")
201
202         print "Checking whether record is gone..."
203         msg = ldb.search(expression="(cn=Niemand2)")
204         self.assertEquals(len(msg), 0)
205
206     def _test_map_search(self, ldb, s3, s4):
207         print "Running search tests on mapped data"
208         ldif = """
209 dn: """ + "sambaDomainName=TESTS,""" + s3.basedn + """
210 objectclass: sambaDomain
211 objectclass: top
212 sambaSID: S-1-5-21-4231626423-2410014848-2360679739
213 sambaNextRid: 2000
214 sambaDomainName: TESTS"""
215         s3.db.add_ldif(substitute_var(ldif, s3.substvars))
216
217         print "Add a set of split records"
218         ldif = """
219 dn: """ + s4.dn("cn=X") + """
220 objectClass: user
221 cn: X
222 codePage: x
223 revision: x
224 dnsHostName: x
225 nextRid: y
226 lastLogon: x
227 description: x
228 objectSid: S-1-5-21-4231626423-2410014848-2360679739-552
229 primaryGroupID: 1-5-21-4231626423-2410014848-2360679739-512
230
231 dn: """ + s4.dn("cn=Y") + """
232 objectClass: top
233 cn: Y
234 codePage: x
235 revision: x
236 dnsHostName: y
237 nextRid: y
238 lastLogon: y
239 description: x
240
241 dn: """ + s4.dn("cn=Z") + """
242 objectClass: top
243 cn: Z
244 codePage: x
245 revision: y
246 dnsHostName: z
247 nextRid: y
248 lastLogon: z
249 description: y
250 """
251
252         ldb.add_ldif(substitute_var(ldif, s4.substvars))
253
254         print "Add a set of remote records"
255
256         ldif = """
257 dn: """ + s3.dn("cn=A") + """
258 objectClass: posixAccount
259 cn: A
260 sambaNextRid: x
261 sambaBadPasswordCount: x
262 sambaLogonTime: x
263 description: x
264 sambaSID: S-1-5-21-4231626423-2410014848-2360679739-552
265 sambaPrimaryGroupSID: S-1-5-21-4231626423-2410014848-2360679739-512
266
267 dn: """ + s3.dn("cn=B") + """
268 objectClass: top
269 cn:B
270 sambaNextRid: x
271 sambaBadPasswordCount: x
272 sambaLogonTime: y
273 description: x
274
275 dn: """ + s3.dn("cn=C") + """
276 objectClass: top
277 cn: C
278 sambaNextRid: x
279 sambaBadPasswordCount: y
280 sambaLogonTime: z
281 description: y
282 """
283         s3.add_ldif(substitute_var(ldif, s3.substvars))
284
285         print "Testing search by DN"
286
287         # Search remote record by local DN
288         dn = s4.dn("cn=A")
289         attrs = ["dnsHostName", "lastLogon"]
290         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
291         self.assertEquals(len(res), 1)
292         self.assertEquals(str(str(res[0].dn)), dn)
293         self.assertEquals(res[0]["dnsHostName"], undefined)
294         self.assertEquals(res[0]["lastLogon"], "x")
295
296         # Search remote record by remote DN
297         dn = s3.dn("cn=A")
298         attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
299         res = s3.db.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
300         self.assertEquals(len(res), 1)
301         self.assertEquals(str(str(res[0].dn)), dn)
302         self.assertEquals(res[0]["dnsHostName"], undefined)
303         self.assertEquals(res[0]["lastLogon"], undefined)
304         self.assertEquals(res[0]["sambaLogonTime"], "x")
305
306         # Search split record by local DN
307         dn = s4.dn("cn=X")
308         attrs = ["dnsHostName", "lastLogon"]
309         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
310         self.assertEquals(len(res), 1)
311         self.assertEquals(str(str(res[0].dn)), dn)
312         self.assertEquals(res[0]["dnsHostName"], "x")
313         self.assertEquals(res[0]["lastLogon"], "x")
314
315         # Search split record by remote DN
316         dn = s3.dn("cn=X")
317         attrs = ["dnsHostName", "lastLogon", "sambaLogonTime"]
318         res = s3.db.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
319         self.assertEquals(len(res), 1)
320         self.assertEquals(str(str(res[0].dn)), dn)
321         self.assertEquals(res[0]["dnsHostName"], undefined)
322         self.assertEquals(res[0]["lastLogon"], undefined)
323         self.assertEquals(res[0]["sambaLogonTime"], "x")
324
325         print "Testing search by attribute"
326
327         # Search by ignored attribute
328         attrs = ["dnsHostName", "lastLogon"]
329         res = ldb.search(expression="(revision=x)", scope=ldb.SCOPE_DEFAULT, attrs=attrs)
330         self.assertEquals(len(res), 2)
331         self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Y"))
332         self.assertEquals(res[0]["dnsHostName"], "y")
333         self.assertEquals(res[0]["lastLogon"], "y")
334         self.assertEquals(str(str(res[1].dn)), s4.dn("cn=X"))
335         self.assertEquals(res[1]["dnsHostName"], "x")
336         self.assertEquals(res[1]["lastLogon"], "x")
337
338         # Search by kept attribute
339         attrs = ["dnsHostName", "lastLogon"]
340         res = ldb.search(expression="(description=y)", scope=ldb.SCOPE_DEFAULT, attrs=attrs)
341         self.assertEquals(len(res), 2)
342         self.assertEquals(str(str(res[0].dn)), s4.dn("cn=Z"))
343         self.assertEquals(res[0]["dnsHostName"], "z")
344         self.assertEquals(res[0]["lastLogon"], "z")
345         self.assertEquals(str(str(res[1].dn)), s4.dn("cn=C"))
346         self.assertEquals(res[1]["dnsHostName"], undefined)
347         self.assertEquals(res[1]["lastLogon"], "z")
348
349         # Search by renamed attribute
350         attrs = ["dnsHostName", "lastLogon"]
351         res = ldb.search(expression="(badPwdCount=x)", scope=ldb.SCOPE_DEFAULT, attrs=attrs)
352         self.assertEquals(len(res), 2)
353         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
354         self.assertEquals(res[0]["dnsHostName"], undefined)
355         self.assertEquals(res[0]["lastLogon"], "y")
356         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
357         self.assertEquals(res[1]["dnsHostName"], undefined)
358         self.assertEquals(res[1]["lastLogon"], "x")
359
360         # Search by converted attribute
361         attrs = ["dnsHostName", "lastLogon", "objectSid"]
362         # TODO:
363         #   Using the SID directly in the parse tree leads to conversion
364         #   errors, letting the search fail with no results.
365         #res = ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", NULL, ldb. SCOPE_DEFAULT, attrs)
366         res = ldb.search(expression="(objectSid=*)", attrs=attrs)
367         self.assertEquals(len(res), 3)
368         self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
369         self.assertEquals(res[0]["dnsHostName"], "x")
370         self.assertEquals(res[0]["lastLogon"], "x")
371         self.assertEquals(res[0]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
372         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
373         self.assertEquals(res[1]["dnsHostName"], undefined)
374         self.assertEquals(res[1]["lastLogon"], "x")
375         self.assertEquals(res[1]["objectSid"], "S-1-5-21-4231626423-2410014848-2360679739-552")
376
377         # Search by generated attribute 
378         # In most cases, this even works when the mapping is missing
379         # a `convert_operator' by enumerating the remote db.
380         attrs = ["dnsHostName", "lastLogon", "primaryGroupID"]
381         res = ldb.search(expression="(primaryGroupID=512)", attrs=attrs)
382         self.assertEquals(len(res), 1)
383         self.assertEquals(str(res[0].dn), s4.dn("cn=A"))
384         self.assertEquals(res[0]["dnsHostName"], undefined)
385         self.assertEquals(res[0]["lastLogon"], "x")
386         self.assertEquals(res[0]["primaryGroupID"], "512")
387
388         # TODO: There should actually be two results, A and X.  The
389         # primaryGroupID of X seems to get corrupted somewhere, and the
390         # objectSid isn't available during the generation of remote (!) data,
391         # which can be observed with the following search.  Also note that Xs
392         # objectSid seems to be fine in the previous search for objectSid... */
393         #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs)
394         #print len(res) + " results found"
395         #for i in range(len(res)):
396         #    for (obj in res[i]) {
397         #        print obj + ": " + res[i][obj]
398         #    }
399         #    print "---"
400         #    
401
402         # Search by remote name of renamed attribute */
403         attrs = ["dnsHostName", "lastLogon"]
404         res = ldb.search(expression="(sambaBadPasswordCount=*)", attrs=attrs)
405         self.assertEquals(len(res), 0)
406
407         # Search by objectClass
408         attrs = ["dnsHostName", "lastLogon", "objectClass"]
409         res = ldb.search(expression="(objectClass=user)", attrs=attrs)
410         self.assertEquals(len(res), 2)
411         self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
412         self.assertEquals(res[0]["dnsHostName"], "x")
413         self.assertEquals(res[0]["lastLogon"], "x")
414         self.assertTrue(res[0]["objectClass"] is not None)
415         self.assertEquals(res[0]["objectClass"][0], "user")
416         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
417         self.assertEquals(res[1]["dnsHostName"], undefined)
418         self.assertEquals(res[1]["lastLogon"], "x")
419         self.assertTrue(res[1]["objectClass"] is not None)
420         self.assertEquals(res[1]["objectClass"][0], "user")
421
422         # Prove that the objectClass is actually used for the search
423         res = ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", attrs=attrs)
424         self.assertEquals(len(res), 3)
425         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
426         self.assertEquals(res[0]["dnsHostName"], undefined)
427         self.assertEquals(res[0]["lastLogon"], "y")
428         self.assertTrue(res[0]["objectClass"] is not None)
429         for oc in set(res[0]["objectClass"]):
430             self.assertEquals(oc, "user")
431         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
432         self.assertEquals(res[1]["dnsHostName"], "x")
433         self.assertEquals(res[1]["lastLogon"], "x")
434         self.assertTrue(res[1]["objectClass"] is not None)
435         self.assertEquals(res[1]["objectClass"][0], "user")
436         self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
437         self.assertEquals(res[2]["dnsHostName"], undefined)
438         self.assertEquals(res[2]["lastLogon"], "x")
439         self.assertTrue(res[2]["objectClass"] is not None)
440         self.assertEquals(res[2]["objectClass"][0], "user")
441
442         print "Testing search by parse tree"
443
444         # Search by conjunction of local attributes
445         attrs = ["dnsHostName", "lastLogon"]
446         res = ldb.search(expression="(&(codePage=x)(revision=x))", attrs=attrs)
447         self.assertEquals(len(res), 2)
448         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
449         self.assertEquals(res[0]["dnsHostName"], "y")
450         self.assertEquals(res[0]["lastLogon"], "y")
451         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
452         self.assertEquals(res[1]["dnsHostName"], "x")
453         self.assertEquals(res[1]["lastLogon"], "x")
454
455         # Search by conjunction of remote attributes
456         attrs = ["dnsHostName", "lastLogon"]
457         res = ldb.search(expression="(&(lastLogon=x)(description=x))", attrs=attrs)
458         self.assertEquals(len(res), 2)
459         self.assertEquals(str(res[0].dn), s4.dn("cn=X"))
460         self.assertEquals(res[0]["dnsHostName"], "x")
461         self.assertEquals(res[0]["lastLogon"], "x")
462         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
463         self.assertEquals(res[1]["dnsHostName"], undefined)
464         self.assertEquals(res[1]["lastLogon"], "x")
465         
466         # Search by conjunction of local and remote attribute 
467         attrs = ["dnsHostName", "lastLogon"]
468         res = ldb.search(expression="(&(codePage=x)(description=x))", attrs=attrs)
469         self.assertEquals(len(res), 2)
470         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
471         self.assertEquals(res[0]["dnsHostName"], "y")
472         self.assertEquals(res[0]["lastLogon"], "y")
473         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
474         self.assertEquals(res[1]["dnsHostName"], "x")
475         self.assertEquals(res[1]["lastLogon"], "x")
476
477         # Search by conjunction of local and remote attribute w/o match
478         attrs = ["dnsHostName", "lastLogon"]
479         res = ldb.search(expression="(&(codePage=x)(nextRid=x))", attrs=attrs)
480         self.assertEquals(len(res), 0)
481         res = ldb.search(expression="(&(revision=x)(lastLogon=z))", attrs=attrs)
482         self.assertEquals(len(res), 0)
483
484         # Search by disjunction of local attributes
485         attrs = ["dnsHostName", "lastLogon"]
486         res = ldb.search(expression="(|(revision=x)(dnsHostName=x))", attrs=attrs)
487         self.assertEquals(len(res), 2)
488         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
489         self.assertEquals(res[0]["dnsHostName"], "y")
490         self.assertEquals(res[0]["lastLogon"], "y")
491         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
492         self.assertEquals(res[1]["dnsHostName"], "x")
493         self.assertEquals(res[1]["lastLogon"], "x")
494
495         # Search by disjunction of remote attributes
496         attrs = ["dnsHostName", "lastLogon"]
497         res = ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", attrs=attrs)
498         self.assertEquals(len(res), 3)
499         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
500         self.assertEquals(res[0]["dnsHostName"], undefined)
501         self.assertEquals(res[0]["lastLogon"], "y")
502         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
503         self.assertEquals(res[1]["dnsHostName"], "x")
504         self.assertEquals(res[1]["lastLogon"], "x")
505         self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
506         self.assertEquals(res[2]["dnsHostName"], undefined)
507         self.assertEquals(res[2]["lastLogon"], "x")
508
509         # Search by disjunction of local and remote attribute
510         attrs = ["dnsHostName", "lastLogon"]
511         res = ldb.search(expression="(|(revision=x)(lastLogon=y))", attrs=attrs)
512         self.assertEquals(len(res), 3)
513         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
514         self.assertEquals(res[0]["dnsHostName"], "y")
515         self.assertEquals(res[0]["lastLogon"], "y")
516         self.assertEquals(str(res[1].dn), s4.dn("cn=B"))
517         self.assertEquals(res[1]["dnsHostName"], undefined)
518         self.assertEquals(res[1]["lastLogon"], "y")
519         self.assertEquals(str(res[2].dn), s4.dn("cn=X"))
520         self.assertEquals(res[2]["dnsHostName"], "x")
521         self.assertEquals(res[2]["lastLogon"], "x")
522
523         # Search by disjunction of local and remote attribute w/o match
524         attrs = ["dnsHostName", "lastLogon"]
525         res = ldb.search(expression="(|(codePage=y)(nextRid=z))", attrs=attrs)
526         self.assertEquals(len(res), 0)
527
528         # Search by negated local attribute
529         attrs = ["dnsHostName", "lastLogon"]
530         res = ldb.search(expression="(!(revision=x))", attrs=attrs)
531         self.assertEquals(len(res), 5)
532         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
533         self.assertEquals(res[0]["dnsHostName"], undefined)
534         self.assertEquals(res[0]["lastLogon"], "y")
535         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
536         self.assertEquals(res[1]["dnsHostName"], undefined)
537         self.assertEquals(res[1]["lastLogon"], "x")
538         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
539         self.assertEquals(res[2]["dnsHostName"], "z")
540         self.assertEquals(res[2]["lastLogon"], "z")
541         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
542         self.assertEquals(res[3]["dnsHostName"], undefined)
543         self.assertEquals(res[3]["lastLogon"], "z")
544
545         # Search by negated remote attribute
546         attrs = ["dnsHostName", "lastLogon"]
547         res = ldb.search(expression="(!(description=x))", attrs=attrs)
548         self.assertEquals(len(res), 3)
549         self.assertEquals(str(res[0].dn), s4.dn("cn=Z"))
550         self.assertEquals(res[0]["dnsHostName"], "z")
551         self.assertEquals(res[0]["lastLogon"], "z")
552         self.assertEquals(str(res[1].dn), s4.dn("cn=C"))
553         self.assertEquals(res[1]["dnsHostName"], undefined)
554         self.assertEquals(res[1]["lastLogon"], "z")
555
556         # Search by negated conjunction of local attributes
557         attrs = ["dnsHostName", "lastLogon"]
558         res = ldb.search(expression="(!(&(codePage=x)(revision=x)))", attrs=attrs)
559         self.assertEquals(len(res), 5)
560         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
561         self.assertEquals(res[0]["dnsHostName"], undefined)
562         self.assertEquals(res[0]["lastLogon"], "y")
563         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
564         self.assertEquals(res[1]["dnsHostName"], undefined)
565         self.assertEquals(res[1]["lastLogon"], "x")
566         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
567         self.assertEquals(res[2]["dnsHostName"], "z")
568         self.assertEquals(res[2]["lastLogon"], "z")
569         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
570         self.assertEquals(res[3]["dnsHostName"], undefined)
571         self.assertEquals(res[3]["lastLogon"], "z")
572
573         # Search by negated conjunction of remote attributes
574         attrs = ["dnsHostName", "lastLogon"]
575         res = ldb.search(expression="(!(&(lastLogon=x)(description=x)))", attrs=attrs)
576         self.assertEquals(len(res), 5)
577         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
578         self.assertEquals(res[0]["dnsHostName"], "y")
579         self.assertEquals(res[0]["lastLogon"], "y")
580         self.assertEquals(str(res[1].dn), s4.dn("cn=B"))
581         self.assertEquals(res[1]["dnsHostName"], undefined)
582         self.assertEquals(res[1]["lastLogon"], "y")
583         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
584         self.assertEquals(res[2]["dnsHostName"], "z")
585         self.assertEquals(res[2]["lastLogon"], "z")
586         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
587         self.assertEquals(res[3]["dnsHostName"], undefined)
588         self.assertEquals(res[3]["lastLogon"], "z")
589
590         # Search by negated conjunction of local and remote attribute
591         attrs = ["dnsHostName", "lastLogon"]
592         res = ldb.search(expression="(!(&(codePage=x)(description=x)))", attrs=attrs)
593         self.assertEquals(len(res), 5)
594         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
595         self.assertEquals(res[0]["dnsHostName"], undefined)
596         self.assertEquals(res[0]["lastLogon"], "y")
597         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
598         self.assertEquals(res[1]["dnsHostName"], undefined)
599         self.assertEquals(res[1]["lastLogon"], "x")
600         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
601         self.assertEquals(res[2]["dnsHostName"], "z")
602         self.assertEquals(res[2]["lastLogon"], "z")
603         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
604         self.assertEquals(res[3]["dnsHostName"], undefined)
605         self.assertEquals(res[3]["lastLogon"], "z")
606
607         # Search by negated disjunction of local attributes
608         attrs = ["dnsHostName", "lastLogon"]
609         res = ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", attrs=attrs)
610         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
611         self.assertEquals(res[0]["dnsHostName"], undefined)
612         self.assertEquals(res[0]["lastLogon"], "y")
613         self.assertEquals(str(res[1].dn), s4.dn("cn=A"))
614         self.assertEquals(res[1]["dnsHostName"], undefined)
615         self.assertEquals(res[1]["lastLogon"], "x")
616         self.assertEquals(str(res[2].dn), s4.dn("cn=Z"))
617         self.assertEquals(res[2]["dnsHostName"], "z")
618         self.assertEquals(res[2]["lastLogon"], "z")
619         self.assertEquals(str(res[3].dn), s4.dn("cn=C"))
620         self.assertEquals(res[3]["dnsHostName"], undefined)
621         self.assertEquals(res[3]["lastLogon"], "z")
622
623         # Search by negated disjunction of remote attributes
624         attrs = ["dnsHostName", "lastLogon"]
625         res = ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", attrs=attrs)
626         self.assertEquals(len(res), 4)
627         self.assertEquals(str(res[0].dn), s4.dn("cn=Y"))
628         self.assertEquals(res[0]["dnsHostName"], "y")
629         self.assertEquals(res[0]["lastLogon"], "y")
630         self.assertEquals(str(res[1].dn), s4.dn("cn=Z"))
631         self.assertEquals(res[1]["dnsHostName"], "z")
632         self.assertEquals(res[1]["lastLogon"], "z")
633         self.assertEquals(str(res[2].dn), s4.dn("cn=C"))
634         self.assertEquals(res[2]["dnsHostName"], undefined)
635         self.assertEquals(res[2]["lastLogon"], "z")
636
637         # Search by negated disjunction of local and remote attribute
638         attrs = ["dnsHostName", "lastLogon"]
639         res = ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", attrs=attrs)
640         self.assertEquals(len(res), 4)
641         self.assertEquals(str(res[0].dn), s4.dn("cn=A"))
642         self.assertEquals(res[0]["dnsHostName"], undefined)
643         self.assertEquals(res[0]["lastLogon"], "x")
644         self.assertEquals(str(res[1].dn), s4.dn("cn=Z"))
645         self.assertEquals(res[1]["dnsHostName"], "z")
646         self.assertEquals(res[1]["lastLogon"], "z")
647         self.assertEquals(str(res[2].dn), s4.dn("cn=C"))
648         self.assertEquals(res[2]["dnsHostName"], undefined)
649         self.assertEquals(res[2]["lastLogon"], "z")
650
651         print "Search by complex parse tree"
652         attrs = ["dnsHostName", "lastLogon"]
653         res = ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=attrs)
654         self.assertEquals(len(res), 6)
655         self.assertEquals(str(res[0].dn), s4.dn("cn=B"))
656         self.assertEquals(res[0]["dnsHostName"], undefined)
657         self.assertEquals(res[0]["lastLogon"], "y")
658         self.assertEquals(str(res[1].dn), s4.dn("cn=X"))
659         self.assertEquals(res[1]["dnsHostName"], "x")
660         self.assertEquals(res[1]["lastLogon"], "x")
661         self.assertEquals(str(res[2].dn), s4.dn("cn=A"))
662         self.assertEquals(res[2]["dnsHostName"], undefined)
663         self.assertEquals(res[2]["lastLogon"], "x")
664         self.assertEquals(str(res[3].dn), s4.dn("cn=Z"))
665         self.assertEquals(res[3]["dnsHostName"], "z")
666         self.assertEquals(res[3]["lastLogon"], "z")
667         self.assertEquals(str(res[4].dn), s4.dn("cn=C"))
668         self.assertEquals(res[4]["dnsHostName"], undefined)
669         self.assertEquals(res[4]["lastLogon"], "z")
670
671         # Clean up
672         dns = [s4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]]
673         for dn in dns:
674             ldb.delete(dn)
675
676     def _test_map_modify(self, ldb, s3, s4):
677         print "Running modification tests on mapped data"
678
679         print "Testing modification of local records"
680
681         # Add local record
682         dn = "cn=test,dc=idealx,dc=org"
683         ldif = """
684 dn: """ + dn + """
685 cn: test
686 foo: bar
687 revision: 1
688 description: test
689 """
690         ldb.add_ldif(ldif)
691         # Check it's there
692         attrs = ["foo", "revision", "description"]
693         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
694         self.assertEquals(len(res), 1)
695         self.assertEquals(str(res[0].dn), dn)
696         self.assertEquals(res[0]["foo"], "bar")
697         self.assertEquals(res[0]["revision"], "1")
698         self.assertEquals(res[0]["description"], "test")
699         # Check it's not in the local db
700         res = s4.db.search("(cn=test)", NULL, ldb.SCOPE_DEFAULT, attrs)
701         self.assertEquals(len(res), 0)
702         # Check it's not in the remote db
703         res = s3.db.search("(cn=test)", NULL, ldb.SCOPE_DEFAULT, attrs)
704         self.assertEquals(len(res), 0)
705
706         # Modify local record
707         ldif = """
708 dn: """ + dn + """
709 replace: foo
710 foo: baz
711 replace: description
712 description: foo
713 """
714         ldb.modify_ldif(ldif)
715         # Check in local db
716         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
717         self.assertEquals(len(res), 1)
718         self.assertEquals(str(res[0].dn), dn)
719         self.assertEquals(res[0]["foo"], "baz")
720         self.assertEquals(res[0]["revision"], "1")
721         self.assertEquals(res[0]["description"], "foo")
722
723         # Rename local record
724         dn2 = "cn=toast,dc=idealx,dc=org"
725         ldb.rename(dn, dn2)
726         # Check in local db
727         res = ldb.search(dn2, scope=ldb.SCOPE_BASE, attrs=attrs)
728         self.assertEquals(len(res), 1)
729         self.assertEquals(str(res[0].dn), dn2)
730         self.assertEquals(res[0]["foo"], "baz")
731         self.assertEquals(res[0]["revision"], "1")
732         self.assertEquals(res[0]["description"], "foo")
733
734         # Delete local record
735         ldb.delete(dn2)
736         # Check it's gone
737         res = ldb.search(dn2, scope=ldb.SCOPE_BASE)
738         self.assertEquals(len(res), 0)
739
740         print "Testing modification of remote records"
741
742         # Add remote record
743         dn = s4.dn("cn=test")
744         dn2 = s3.dn("cn=test")
745         ldif = """
746 dn: """ + dn2 + """
747 cn: test
748 description: foo
749 sambaBadPasswordCount: 3
750 sambaNextRid: 1001
751 """
752         s3.db.add_ldif(ldif)
753         # Check it's there
754         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
755         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
756         self.assertEquals(len(res), 1)
757         self.assertEquals(str(res[0].dn), dn2)
758         self.assertEquals(res[0]["description"], "foo")
759         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
760         self.assertEquals(res[0]["sambaNextRid"], "1001")
761         # Check in mapped db
762         attrs = ["description", "badPwdCount", "nextRid"]
763         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
764         self.assertEquals(len(res), 1)
765         self.assertEquals(str(res[0].dn), dn)
766         self.assertEquals(res[0]["description"], "foo")
767         self.assertEquals(res[0]["badPwdCount"], "3")
768         self.assertEquals(res[0]["nextRid"], "1001")
769         # Check in local db
770         res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
771         self.assertEquals(len(res), 0)
772
773         # Modify remote data of remote record
774         ldif = """
775 dn: """ + dn + """
776 replace: description
777 description: test
778 replace: badPwdCount
779 badPwdCount: 4
780 """
781         ldb.modify_ldif(ldif)
782         # Check in mapped db
783         attrs = ["description", "badPwdCount", "nextRid"]
784         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
785         self.assertEquals(len(res), 1)
786         self.assertEquals(str(res[0].dn), dn)
787         self.assertEquals(res[0]["description"], "test")
788         self.assertEquals(res[0]["badPwdCount"], "4")
789         self.assertEquals(res[0]["nextRid"], "1001")
790         # Check in remote db
791         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
792         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
793         self.assertEquals(len(res), 1)
794         self.assertEquals(str(res[0].dn), dn2)
795         self.assertEquals(res[0]["description"], "test")
796         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
797         self.assertEquals(res[0]["sambaNextRid"], "1001")
798
799         # Rename remote record
800         dn2 = s4.dn("cn=toast")
801         ldb.rename(dn, dn2)
802         # Check in mapped db
803         dn = dn2
804         attrs = ["description", "badPwdCount", "nextRid"]
805         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
806         self.assertEquals(len(res), 1)
807         self.assertEquals(str(res[0].dn), dn)
808         self.assertEquals(res[0]["description"], "test")
809         self.assertEquals(res[0]["badPwdCount"], "4")
810         self.assertEquals(res[0]["nextRid"], "1001")
811         # Check in remote db 
812         dn2 = s3.dn("cn=toast")
813         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid"]
814         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
815         self.assertEquals(len(res), 1)
816         self.assertEquals(str(res[0].dn), dn2)
817         self.assertEquals(res[0]["description"], "test")
818         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
819         self.assertEquals(res[0]["sambaNextRid"], "1001")
820
821         # Delete remote record
822         ldb.delete(dn)
823         # Check in mapped db
824         res = ldb.search(dn, scope=ldb.SCOPE_BASE)
825         self.assertEquals(len(res), 0)
826         # Check in remote db
827         res = s3.db.search("", dn2, ldb.SCOPE_BASE)
828         self.assertEquals(len(res), 0)
829
830         # Add remote record (same as before)
831         dn = s4.dn("cn=test")
832         dn2 = s3.dn("cn=test")
833         ldif = """
834 dn: """ + dn2 + """
835 cn: test
836 description: foo
837 sambaBadPasswordCount: 3
838 sambaNextRid: 1001
839 """
840         s3.db.add_ldif(ldif)
841
842         # Modify local data of remote record
843         ldif = """
844 dn: """ + dn + """
845 add: revision
846 revision: 1
847 replace: description
848 description: test
849 """
850         ldb.modify_ldif(ldif)
851         # Check in mapped db
852         attrs = ["revision", "description"]
853         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
854         self.assertEquals(len(res), 1)
855         self.assertEquals(str(res[0].dn), dn)
856         self.assertEquals(res[0]["description"], "test")
857         self.assertEquals(res[0]["revision"], "1")
858         # Check in remote db
859         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
860         self.assertEquals(len(res), 1)
861         self.assertEquals(str(res[0].dn), dn2)
862         self.assertEquals(res[0]["description"], "test")
863         self.assertEquals(res[0]["revision"], undefined)
864         # Check in local db
865         res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
866         self.assertEquals(len(res), 1)
867         self.assertEquals(str(res[0].dn), dn)
868         self.assertEquals(res[0]["description"], undefined)
869         self.assertEquals(res[0]["revision"], "1")
870
871         # Delete (newly) split record
872         ldb.delete(dn)
873
874         print "Testing modification of split records"
875
876         # Add split record
877         dn = s4.dn("cn=test")
878         dn2 = s3.dn("cn=test")
879         ldif = """
880 dn: """ + dn + """
881 cn: test
882 description: foo
883 badPwdCount: 3
884 nextRid: 1001
885 revision: 1
886 """
887         ldb.add_ldif(ldif)
888         # Check it's there
889         attrs = ["description", "badPwdCount", "nextRid", "revision"]
890         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
891         self.assertEquals(len(res), 1)
892         self.assertEquals(str(res[0].dn), dn)
893         self.assertEquals(res[0]["description"], "foo")
894         self.assertEquals(res[0]["badPwdCount"], "3")
895         self.assertEquals(res[0]["nextRid"], "1001")
896         self.assertEquals(res[0]["revision"], "1")
897         # Check in local db
898         res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
899         self.assertEquals(len(res), 1)
900         self.assertEquals(str(res[0].dn), dn)
901         self.assertEquals(res[0]["description"], undefined)
902         self.assertEquals(res[0]["badPwdCount"], undefined)
903         self.assertEquals(res[0]["nextRid"], undefined)
904         self.assertEquals(res[0]["revision"], "1")
905         # Check in remote db
906         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
907         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
908         self.assertEquals(len(res), 1)
909         self.assertEquals(str(res[0].dn), dn2)
910         self.assertEquals(res[0]["description"], "foo")
911         self.assertEquals(res[0]["sambaBadPasswordCount"], "3")
912         self.assertEquals(res[0]["sambaNextRid"], "1001")
913         self.assertEquals(res[0]["revision"], undefined)
914
915         # Modify of split record
916         ldif = """
917 dn: """ + dn + """
918 replace: description
919 description: test
920 replace: badPwdCount
921 badPwdCount: 4
922 replace: revision
923 revision: 2
924 """
925         ldb.modify_ldif(ldif)
926         # Check in mapped db
927         attrs = ["description", "badPwdCount", "nextRid", "revision"]
928         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
929         self.assertEquals(len(res), 1)
930         self.assertEquals(str(res[0].dn), dn)
931         self.assertEquals(res[0]["description"], "test")
932         self.assertEquals(res[0]["badPwdCount"], "4")
933         self.assertEquals(res[0]["nextRid"], "1001")
934         self.assertEquals(res[0]["revision"], "2")
935         # Check in local db
936         res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
937         self.assertEquals(len(res), 1)
938         self.assertEquals(str(res[0].dn), dn)
939         self.assertEquals(res[0]["description"], undefined)
940         self.assertEquals(res[0]["badPwdCount"], undefined)
941         self.assertEquals(res[0]["nextRid"], undefined)
942         self.assertEquals(res[0]["revision"], "2")
943         # Check in remote db
944         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
945         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
946         self.assertEquals(len(res), 1)
947         self.assertEquals(str(res[0].dn), dn2)
948         self.assertEquals(res[0]["description"], "test")
949         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
950         self.assertEquals(res[0]["sambaNextRid"], "1001")
951         self.assertEquals(res[0]["revision"], undefined)
952
953         # Rename split record
954         dn2 = s4.dn("cn=toast")
955         ldb.rename(dn, dn2)
956         # Check in mapped db
957         dn = dn2
958         attrs = ["description", "badPwdCount", "nextRid", "revision"]
959         res = ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=attrs)
960         self.assertEquals(len(res), 1)
961         self.assertEquals(str(res[0].dn), dn)
962         self.assertEquals(res[0]["description"], "test")
963         self.assertEquals(res[0]["badPwdCount"], "4")
964         self.assertEquals(res[0]["nextRid"], "1001")
965         self.assertEquals(res[0]["revision"], "2")
966         # Check in local db
967         res = s4.db.search("", dn, ldb.SCOPE_BASE, attrs)
968         self.assertEquals(len(res), 1)
969         self.assertEquals(str(res[0].dn), dn)
970         self.assertEquals(res[0]["description"], undefined)
971         self.assertEquals(res[0]["badPwdCount"], undefined)
972         self.assertEquals(res[0]["nextRid"], undefined)
973         self.assertEquals(res[0]["revision"], "2")
974         # Check in remote db
975         dn2 = s3.dn("cn=toast")
976         attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", "revision"]
977         res = s3.db.search("", dn2, ldb.SCOPE_BASE, attrs)
978         self.assertEquals(len(res), 1)
979         self.assertEquals(str(res[0].dn), dn2)
980         self.assertEquals(res[0]["description"], "test")
981         self.assertEquals(res[0]["sambaBadPasswordCount"], "4")
982         self.assertEquals(res[0]["sambaNextRid"], "1001")
983         self.assertEquals(res[0]["revision"], undefined)
984
985         # Delete split record
986         ldb.delete(dn)
987         # Check in mapped db
988         res = ldb.search(dn, scope=ldb.SCOPE_BASE)
989         self.assertEquals(len(res), 0)
990         # Check in local db
991         res = s4.db.search("", dn, ldb.SCOPE_BASE)
992         self.assertEquals(len(res), 0)
993         # Check in remote db
994         res = s3.db.search("", dn2, ldb.SCOPE_BASE)
995         self.assertEquals(len(res), 0)
996
997     def setUp(self):
998         super(Samba3SamTestCase, self).setUp()
999
1000         def make_dn(basedn, rdn):
1001             return rdn + ",sambaDomainName=TESTS," + basedn
1002
1003         def make_s4dn(basedn, rdn):
1004             return rdn + "," + basedn
1005
1006         self.ldbfile = os.path.join(self.tempdir, "test.ldb")
1007         self.ldburl = "tdb://" + self.ldbfile
1008
1009         tempdir = self.tempdir
1010         print tempdir
1011
1012         class Target:
1013             """Simple helper class that contains data for a specific SAM connection."""
1014             def __init__(self, file, basedn, dn):
1015                 self.file = os.path.join(tempdir, file)
1016                 self.url = "tdb://" + self.file
1017                 self.basedn = basedn
1018                 self.substvars = {"BASEDN": self.basedn}
1019                 self.db = Ldb()
1020                 self._dn = dn
1021
1022             def dn(self, rdn):
1023                 return self._dn(rdn, self.basedn)
1024
1025             def connect(self):
1026                 return self.db.connect(self.url)
1027
1028         self.samba4 = Target("samba4.ldb", "dc=vernstok,dc=nl", make_s4dn)
1029         self.samba3 = Target("samba3.ldb", "cn=Samba3Sam", make_dn)
1030         self.templates = Target("templates.ldb", "cn=templates", None)
1031
1032         self.samba3.connect()
1033         self.templates.connect()
1034         self.samba4.connect()
1035
1036     def tearDown(self):
1037         os.unlink(self.ldbfile)
1038         os.unlink(self.samba3.file)
1039         os.unlink(self.templates.file)
1040         os.unlink(self.samba4.file)
1041         super(Samba3SamTestCase, self).tearDown()
1042
1043     def test_s3sam(self):
1044         ldb = Ldb(self.ldburl)
1045         self.setup_data(self.samba3, open(os.path.join(datadir, "samba3.ldif"), 'r').read())
1046         self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
1047         ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
1048         ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))
1049         self.setup_modules(ldb, self.samba3, self.samba4)
1050
1051         ldb = Ldb(self.ldburl)
1052
1053         self._test_s3sam_search(ldb)
1054         self._test_s3sam_modify(ldb, self.samba3)
1055
1056     def test_map(self):
1057         ldb = Ldb(self.ldburl)
1058         self.setup_data(self.templates, open(os.path.join(datadir, "provision_samba3sam_templates.ldif"), 'r').read())
1059         ldif = open(os.path.join(datadir, "provision_samba3sam.ldif"), 'r').read()
1060         ldb.add_ldif(substitute_var(ldif, self.samba4.substvars))
1061         self.setup_modules(ldb, self.samba3, self.samba4)
1062
1063         ldb = Ldb(self.ldburl)
1064         self._test_map_search(ldb, self.samba3, self.samba4)
1065         self._test_map_modify(ldb, self.samba3, self.samba4)
1066