s4/linked_attribute tests: remove unused code
[nivanova/samba-autobuild/.git] / source4 / dsdb / tests / python / linked_attributes.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 import optparse
5 import sys
6 import os
7 import itertools
8
9 sys.path.insert(0, "bin/python")
10 import samba
11 from samba.tests.subunitrun import SubunitOptions, TestProgram
12
13 import samba.getopt as options
14
15 from samba.auth import system_session
16 import ldb
17 from samba.samdb import SamDB
18 from samba.dcerpc import misc
19
20 parser = optparse.OptionParser("linked_attributes.py [options] <host>")
21 sambaopts = options.SambaOptions(parser)
22 parser.add_option_group(sambaopts)
23 parser.add_option_group(options.VersionOptions(parser))
24 # use command line creds if available
25 credopts = options.CredentialsOptions(parser)
26 parser.add_option_group(credopts)
27 subunitopts = SubunitOptions(parser)
28 parser.add_option_group(subunitopts)
29
30 parser.add_option('--delete-in-setup', action='store_true',
31                   help="cleanup in setup")
32
33 parser.add_option('--no-cleanup', action='store_true',
34                   help="don't cleanup in teardown")
35
36 parser.add_option('--no-reveal-internals', action='store_true',
37                   help="Only use windows compatible ldap controls")
38
39 opts, args = parser.parse_args()
40
41 if len(args) < 1:
42     parser.print_usage()
43     sys.exit(1)
44
45 host = args[0]
46
47 lp = sambaopts.get_loadparm()
48 creds = credopts.get_credentials(lp)
49
50
51 class LATestException(Exception):
52     pass
53
54
55 class LATests(samba.tests.TestCase):
56
57     def setUp(self):
58         super(LATests, self).setUp()
59         self.samdb = SamDB(host, credentials=creds,
60                            session_info=system_session(lp), lp=lp)
61
62         self.base_dn = self.samdb.domain_dn()
63         self.ou = "OU=la,%s" % self.base_dn
64         if opts.delete_in_setup:
65             try:
66                 self.samdb.delete(self.ou, ['tree_delete:1'])
67             except ldb.LdbError, e:
68                 print "tried deleting %s, got error %s" % (self.ou, e)
69         self.samdb.add({'objectclass': 'organizationalUnit',
70                         'dn': self.ou})
71
72     def tearDown(self):
73         super(LATests, self).tearDown()
74         if not opts.no_cleanup:
75             self.samdb.delete(self.ou, ['tree_delete:1'])
76
77     def delete_user(self, user):
78         self.samdb.delete(user['dn'])
79         del self.users[self.users.index(user)]
80
81     def add_object(self, cn, objectclass):
82         dn = "CN=%s,%s" % (cn, self.ou)
83         self.samdb.add({'cn': cn,
84                         'objectclass': objectclass,
85                         'dn': dn})
86
87         return dn
88
89     def add_objects(self, n, objectclass, prefix=None):
90         if prefix is None:
91             prefix = objectclass
92         dns = []
93         for i in range(n):
94             dns.append(self.add_object("%s%d" % (prefix, i + 1),
95                                        objectclass))
96         return dns
97
98     def add_linked_attribute(self, src, dest, attr='member'):
99         m = ldb.Message()
100         m.dn = ldb.Dn(self.samdb, src)
101         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
102         self.samdb.modify(m)
103
104     def remove_linked_attribute(self, src, dest, attr='member'):
105         m = ldb.Message()
106         m.dn = ldb.Dn(self.samdb, src)
107         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
108         self.samdb.modify(m)
109
110     def replace_linked_attribute(self, src, dest, attr='member'):
111         m = ldb.Message()
112         m.dn = ldb.Dn(self.samdb, src)
113         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_REPLACE, attr)
114         self.samdb.modify(m)
115
116     def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE,
117                     **controls):
118         if opts.no_reveal_internals:
119             if 'reveal_internals' in controls:
120                 del controls['reveal_internals']
121
122         controls = ['%s:%d' % (k, int(v)) for k, v in controls.items()]
123
124         res = self.samdb.search(obj,
125                                 scope=scope,
126                                 attrs=[attr],
127                                 controls=controls)
128         return res
129
130     def assert_links(self, obj, expected, attr, sorted=False, msg='',
131                      **kwargs):
132         res = self.attr_search(obj, expected, attr, **kwargs)
133
134         if len(expected) == 0:
135             if attr in res[0]:
136                 self.fail("found attr '%s' in %s" % (attr, res[0]))
137             return
138
139         try:
140             results = list([x[attr] for x in res][0])
141         except KeyError:
142             self.fail("missing attr '%s' on %s" % (attr, obj))
143
144         if sorted == False:
145             results = set(results)
146             expected = set(expected)
147
148         if expected != results:
149             print msg
150             print "expected %s" % expected
151             print "received %s" % results
152
153         self.assertEqual(results, expected)
154
155     def assert_back_links(self, obj, expected, attr='memberOf', **kwargs):
156         self.assert_links(obj, expected, attr=attr,
157                           msg='back links do not match', **kwargs)
158
159     def assert_forward_links(self, obj, expected, attr='member', **kwargs):
160         self.assert_links(obj, expected, attr=attr,
161                           msg='forward links do not match', **kwargs)
162
163     def get_object_guid(self, dn):
164         res = self.samdb.search(dn,
165                                 scope=ldb.SCOPE_BASE,
166                                 attrs=['objectGUID'])
167         return str(misc.GUID(res[0]['objectGUID'][0]))
168
169     def _test_la_backlinks(self, reveal=False):
170         tag = 'backlinks'
171         kwargs = {}
172         if reveal:
173             tag += '_reveal'
174             kwargs = {'reveal_internals': 0}
175
176         u1, u2 = self.add_objects(2, 'user', 'u_%s' % tag)
177         g1, g2 = self.add_objects(2, 'group', 'g_%s' % tag)
178
179         self.add_linked_attribute(g1, u1)
180         self.add_linked_attribute(g2, u1)
181         self.add_linked_attribute(g2, u2)
182
183         self.assert_back_links(u1, [g1, g2], **kwargs)
184         self.assert_back_links(u2, [g2], **kwargs)
185
186     def test_la_backlinks(self):
187         self._test_la_backlinks()
188
189     def test_la_backlinks_reveal(self):
190         if opts.no_reveal_internals:
191             print 'skipping because --no-reveal-internals'
192             return
193         self._test_la_backlinks(True)
194
195     def _test_la_backlinks_delete_group(self, reveal=False):
196         tag = 'del_group'
197         kwargs = {}
198         if reveal:
199             tag += '_reveal'
200             kwargs = {'reveal_internals': 0}
201
202         u1, u2 = self.add_objects(2, 'user', 'u_' + tag)
203         g1, g2 = self.add_objects(2, 'group', 'g_' + tag)
204
205         self.add_linked_attribute(g1, u1)
206         self.add_linked_attribute(g2, u1)
207         self.add_linked_attribute(g2, u2)
208
209         self.samdb.delete(g2, ['tree_delete:1'])
210
211         self.assert_back_links(u1, [g1], **kwargs)
212         self.assert_back_links(u2, set(), **kwargs)
213
214     def test_la_backlinks_delete_group(self):
215         self._test_la_backlinks_delete_group()
216
217     def test_la_backlinks_delete_group_reveal(self):
218         if opts.no_reveal_internals:
219             print 'skipping because --no-reveal-internals'
220             return
221         self._test_la_backlinks_delete_group(True)
222
223     def test_links_all_delete_group(self):
224         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
225         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
226         g2guid = self.get_object_guid(g2)
227
228         self.add_linked_attribute(g1, u1)
229         self.add_linked_attribute(g2, u1)
230         self.add_linked_attribute(g2, u2)
231
232         self.samdb.delete(g2)
233         self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
234                                show_deactivated_link=0)
235         self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
236                                show_deactivated_link=0)
237         self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
238                                   show_deactivated_link=0)
239         self.assert_forward_links('<GUID=%s>' % g2guid,
240                                   [], show_deleted=1, show_recycled=1,
241                                   show_deactivated_link=0)
242
243     def test_links_all_delete_group_reveal(self):
244         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group_reveal')
245         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group_reveal')
246         g2guid = self.get_object_guid(g2)
247
248         self.add_linked_attribute(g1, u1)
249         self.add_linked_attribute(g2, u1)
250         self.add_linked_attribute(g2, u2)
251
252         self.samdb.delete(g2)
253         self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
254                                show_deactivated_link=0,
255                                reveal_internals=0)
256         self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
257                                show_deactivated_link=0,
258                                reveal_internals=0)
259         self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
260                                   show_deactivated_link=0,
261                                   reveal_internals=0)
262         self.assert_forward_links('<GUID=%s>' % g2guid,
263                                   [], show_deleted=1, show_recycled=1,
264                                   show_deactivated_link=0,
265                                   reveal_internals=0)
266
267     def test_la_links_delete_link(self):
268         u1, u2 = self.add_objects(2, 'user', 'u_del_link')
269         g1, g2 = self.add_objects(2, 'group', 'g_del_link')
270
271         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
272                                 attrs=['uSNChanged'])
273         old_usn1 = int(res[0]['uSNChanged'][0])
274
275         self.add_linked_attribute(g1, u1)
276
277         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
278                                 attrs=['uSNChanged'])
279         new_usn1 = int(res[0]['uSNChanged'][0])
280
281         self.assertNotEqual(old_usn1, new_usn1, "USN should have incremented")
282
283         self.add_linked_attribute(g2, u1)
284         self.add_linked_attribute(g2, u2)
285
286         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
287                                 attrs=['uSNChanged'])
288         old_usn2 = int(res[0]['uSNChanged'][0])
289
290         self.remove_linked_attribute(g2, u1)
291
292         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
293                                 attrs=['uSNChanged'])
294         new_usn2 = int(res[0]['uSNChanged'][0])
295
296         self.assertNotEqual(old_usn2, new_usn2, "USN should have incremented")
297
298         self.assert_forward_links(g1, [u1])
299         self.assert_forward_links(g2, [u2])
300
301         self.add_linked_attribute(g2, u1)
302         self.assert_forward_links(g2, [u1, u2])
303         self.remove_linked_attribute(g2, u2)
304         self.assert_forward_links(g2, [u1])
305         self.remove_linked_attribute(g2, u1)
306         self.assert_forward_links(g2, [])
307         self.remove_linked_attribute(g1, [])
308         self.assert_forward_links(g1, [])
309
310     def _test_la_links_delete_link_reveal(self):
311         u1, u2 = self.add_objects(2, 'user', 'u_del_link_reveal')
312         g1, g2 = self.add_objects(2, 'group', 'g_del_link_reveal')
313
314         self.add_linked_attribute(g1, u1)
315         self.add_linked_attribute(g2, u1)
316         self.add_linked_attribute(g2, u2)
317
318         self.remove_linked_attribute(g2, u1)
319
320         self.assert_forward_links(g2, [u1, u2], show_deleted=1,
321                                   show_recycled=1,
322                                   show_deactivated_link=0,
323                                   reveal_internals=0
324         )
325
326     def test_la_links_delete_link_reveal(self):
327         if opts.no_reveal_internals:
328             print 'skipping because --no-reveal-internals'
329             return
330         self._test_la_links_delete_link_reveal()
331
332     def test_la_links_delete_user(self):
333         u1, u2 = self.add_objects(2, 'user', 'u_del_user')
334         g1, g2 = self.add_objects(2, 'group', 'g_del_user')
335
336         self.add_linked_attribute(g1, u1)
337         self.add_linked_attribute(g2, u1)
338         self.add_linked_attribute(g2, u2)
339
340         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
341                                 attrs=['uSNChanged'])
342         old_usn1 = int(res[0]['uSNChanged'][0])
343
344         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
345                                 attrs=['uSNChanged'])
346         old_usn2 = int(res[0]['uSNChanged'][0])
347
348         self.samdb.delete(u1)
349
350         self.assert_forward_links(g1, [])
351         self.assert_forward_links(g2, [u2])
352
353         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
354                                 attrs=['uSNChanged'])
355         new_usn1 = int(res[0]['uSNChanged'][0])
356
357         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
358                                 attrs=['uSNChanged'])
359         new_usn2 = int(res[0]['uSNChanged'][0])
360
361         # Assert the USN on the alternate object is unchanged
362         self.assertEqual(old_usn1, new_usn1)
363         self.assertEqual(old_usn2, new_usn2)
364
365     def test_la_links_delete_user_reveal(self):
366         u1, u2 = self.add_objects(2, 'user', 'u_del_user_reveal')
367         g1, g2 = self.add_objects(2, 'group', 'g_del_user_reveal')
368
369         self.add_linked_attribute(g1, u1)
370         self.add_linked_attribute(g2, u1)
371         self.add_linked_attribute(g2, u2)
372
373         self.samdb.delete(u1)
374
375         self.assert_forward_links(g2, [u2],
376                                   show_deleted=1, show_recycled=1,
377                                   show_deactivated_link=0,
378                                   reveal_internals=0)
379         self.assert_forward_links(g1, [],
380                                   show_deleted=1, show_recycled=1,
381                                   show_deactivated_link=0,
382                                   reveal_internals=0)
383
384     def test_multiple_links(self):
385         u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_multiple_links')
386         g1, g2, g3 = self.add_objects(3, 'group', 'g_multiple_links')
387
388         self.add_linked_attribute(g1, [u1, u2, u3, u4])
389         self.add_linked_attribute(g2, [u3, u1])
390         self.add_linked_attribute(g3, u2)
391
392         self.assert_forward_links(g1, [u1, u2, u3, u4])
393         self.assert_forward_links(g2, [u3, u1])
394         self.assert_forward_links(g3, [u2])
395         self.assert_back_links(u1, [g2, g1])
396         self.assert_back_links(u2, [g3, g1])
397         self.assert_back_links(u3, [g2, g1])
398         self.assert_back_links(u4, [g1])
399
400         self.remove_linked_attribute(g2, [u1, u3])
401         self.remove_linked_attribute(g1, [u1, u3])
402
403         self.assert_forward_links(g1, [u2, u4])
404         self.assert_forward_links(g2, [])
405         self.assert_forward_links(g3, [u2])
406         self.assert_back_links(u1, [])
407         self.assert_back_links(u2, [g3, g1])
408         self.assert_back_links(u3, [])
409         self.assert_back_links(u4, [g1])
410
411         self.add_linked_attribute(g1, [u1, u3])
412         self.add_linked_attribute(g2, [u3, u1])
413         self.add_linked_attribute(g3, [u1, u3])
414
415         self.assert_forward_links(g1, [u1, u2, u3, u4])
416         self.assert_forward_links(g2, [u1, u3])
417         self.assert_forward_links(g3, [u1, u2, u3])
418         self.assert_back_links(u1, [g1, g2, g3])
419         self.assert_back_links(u2, [g3, g1])
420         self.assert_back_links(u3, [g3, g2, g1])
421         self.assert_back_links(u4, [g1])
422
423     def test_la_links_replace(self):
424         u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_replace')
425         g1, g2, g3, g4 = self.add_objects(4, 'group', 'g_replace')
426
427         self.add_linked_attribute(g1, [u1, u2])
428         self.add_linked_attribute(g2, [u1, u3])
429         self.add_linked_attribute(g3, u1)
430
431         self.replace_linked_attribute(g1, [u2])
432         self.replace_linked_attribute(g2, [u2, u3])
433         self.replace_linked_attribute(g3, [u1, u3])
434         self.replace_linked_attribute(g4, [u4])
435
436         self.assert_forward_links(g1, [u2])
437         self.assert_forward_links(g2, [u3, u2])
438         self.assert_forward_links(g3, [u3, u1])
439         self.assert_forward_links(g4, [u4])
440         self.assert_back_links(u1, [g3])
441         self.assert_back_links(u2, [g1, g2])
442         self.assert_back_links(u3, [g2, g3])
443         self.assert_back_links(u4, [g4])
444
445         self.replace_linked_attribute(g1, [u1, u2, u3])
446         self.replace_linked_attribute(g2, [u1])
447         self.replace_linked_attribute(g3, [u2])
448         self.replace_linked_attribute(g4, [])
449
450         self.assert_forward_links(g1, [u1, u2, u3])
451         self.assert_forward_links(g2, [u1])
452         self.assert_forward_links(g3, [u2])
453         self.assert_forward_links(g4, [])
454         self.assert_back_links(u1, [g1, g2])
455         self.assert_back_links(u2, [g1, g3])
456         self.assert_back_links(u3, [g1])
457         self.assert_back_links(u4, [])
458
459
460     def test_la_links_replace2(self):
461         users = self.add_objects(12, 'user', 'u_replace2')
462         g1, = self.add_objects(1, 'group', 'g_replace2')
463
464         self.add_linked_attribute(g1, users[:6])
465         self.assert_forward_links(g1, users[:6])
466         self.replace_linked_attribute(g1, users)
467         self.assert_forward_links(g1, users)
468         self.replace_linked_attribute(g1, users[6:])
469         self.assert_forward_links(g1, users[6:])
470         self.remove_linked_attribute(g1, users[6:9])
471         self.assert_forward_links(g1, users[9:])
472         self.remove_linked_attribute(g1, users[9:])
473         self.assert_forward_links(g1, [])
474
475     def test_la_links_permutations(self):
476         """Make sure the order in which we add links doesn't matter."""
477         users = self.add_objects(3, 'user', 'u_permutations')
478         groups = self.add_objects(6, 'group', 'g_permutations')
479
480         for g, p in zip(groups, itertools.permutations(users)):
481             self.add_linked_attribute(g, p)
482
483         # everyone should be in every group
484         for g in groups:
485             self.assert_forward_links(g, users)
486
487         for u in users:
488             self.assert_back_links(u, groups)
489
490         for g, p in zip(groups[::-1], itertools.permutations(users)):
491             self.replace_linked_attribute(g, p)
492
493         for g in groups:
494             self.assert_forward_links(g, users)
495
496         for u in users:
497             self.assert_back_links(u, groups)
498
499         for g, p in zip(groups, itertools.permutations(users)):
500             self.remove_linked_attribute(g, p)
501
502         for g in groups:
503             self.assert_forward_links(g, [])
504
505         for u in users:
506             self.assert_back_links(u, [])
507
508     def test_one_way_attributes(self):
509         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
510                                   'e_one_way')
511         guid = self.get_object_guid(e2)
512
513         self.add_linked_attribute(e1, e2, attr="addressBookRoots")
514         self.assert_forward_links(e1, [e2], attr='addressBookRoots')
515
516         self.samdb.delete(e2)
517
518         res = self.samdb.search("<GUID=%s>" % guid,
519                                 scope=ldb.SCOPE_BASE,
520                                 controls=['show_deleted:1',
521                                           'show_recycled:1'])
522
523         new_dn = str(res[0].dn)
524         self.assert_forward_links(e1, [new_dn], attr='addressBookRoots')
525         self.assert_forward_links(e1, [new_dn],
526                                   attr='addressBookRoots',
527                                   show_deactivated_link=0)
528
529     def test_one_way_attributes_delete_link(self):
530         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
531                                   'e_one_way')
532         guid = self.get_object_guid(e2)
533
534         self.add_linked_attribute(e1, e2, attr="addressBookRoots")
535         self.assert_forward_links(e1, [e2], attr='addressBookRoots')
536
537         self.remove_linked_attribute(e1, e2, attr="addressBookRoots")
538
539         self.assert_forward_links(e1, [], attr='addressBookRoots')
540         self.assert_forward_links(e1, [], attr='addressBookRoots',
541                                   show_deactivated_link=0)
542
543     def test_pretend_one_way_attributes(self):
544         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
545                                   'e_one_way')
546         guid = self.get_object_guid(e2)
547
548         self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
549         self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
550
551         self.samdb.delete(e2)
552         res = self.samdb.search("<GUID=%s>" % guid,
553                                 scope=ldb.SCOPE_BASE,
554                                 controls=['show_deleted:1',
555                                           'show_recycled:1'])
556
557         new_dn = str(res[0].dn)
558
559         self.assert_forward_links(e1, [], attr='addressBookRoots2')
560         self.assert_forward_links(e1, [], attr='addressBookRoots2',
561                                   show_deactivated_link=0)
562
563     def test_pretend_one_way_attributes_delete_link(self):
564         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
565                                   'e_one_way')
566         guid = self.get_object_guid(e2)
567
568         self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
569         self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
570
571         self.remove_linked_attribute(e1, e2, attr="addressBookRoots2")
572
573         self.assert_forward_links(e1, [], attr='addressBookRoots2')
574         self.assert_forward_links(e1, [], attr='addressBookRoots2',
575                                   show_deactivated_link=0)
576
577 if "://" not in host:
578     if os.path.isfile(host):
579         host = "tdb://%s" % host
580     else:
581         host = "ldap://%s" % host
582
583
584 TestProgram(module=__name__, opts=subunitopts)