ddacc04f18af172ff3fe5cd25960299f5540f7a5
[samba.git] / source4 / dsdb / tests / python / linked_attributes.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 import optparse
5 import sys
6 import os
7 import itertools
8
9 sys.path.insert(0, "bin/python")
10 import samba
11 from samba.tests.subunitrun import SubunitOptions, TestProgram
12
13 import samba.getopt as options
14
15 from samba.auth import system_session
16 import ldb
17 from samba.samdb import SamDB
18 from samba.dcerpc import misc
19
20 parser = optparse.OptionParser("linked_attributes.py [options] <host>")
21 sambaopts = options.SambaOptions(parser)
22 parser.add_option_group(sambaopts)
23 parser.add_option_group(options.VersionOptions(parser))
24 # use command line creds if available
25 credopts = options.CredentialsOptions(parser)
26 parser.add_option_group(credopts)
27 subunitopts = SubunitOptions(parser)
28 parser.add_option_group(subunitopts)
29
30 parser.add_option('--delete-in-setup', action='store_true',
31                   help="cleanup in setup")
32
33 parser.add_option('--no-cleanup', action='store_true',
34                   help="don't cleanup in teardown")
35
36 parser.add_option('--no-reveal-internals', action='store_true',
37                   help="Only use windows compatible ldap controls")
38
39 opts, args = parser.parse_args()
40
41 if len(args) < 1:
42     parser.print_usage()
43     sys.exit(1)
44
45 host = args[0]
46
47 lp = sambaopts.get_loadparm()
48 creds = credopts.get_credentials(lp)
49
50
51 class LATestException(Exception):
52     pass
53
54
55 class LATests(samba.tests.TestCase):
56
57     def setUp(self):
58         super(LATests, self).setUp()
59         self.samdb = SamDB(host, credentials=creds,
60                            session_info=system_session(lp), lp=lp)
61
62         self.base_dn = self.samdb.domain_dn()
63         self.testbase = "CN=LATests,%s" % self.base_dn
64         if opts.delete_in_setup:
65             try:
66                 self.samdb.delete(self.testbase, ['tree_delete:1'])
67             except ldb.LdbError as e:
68                 print("tried deleting %s, got error %s" % (self.testbase, e))
69         self.samdb.add({'objectclass': 'container',
70                         'dn': self.testbase})
71
72     def tearDown(self):
73         super(LATests, self).tearDown()
74         if not opts.no_cleanup:
75             self.samdb.delete(self.testbase, ['tree_delete:1'])
76
77     def add_object(self, cn, objectclass, more_attrs=None):
78         if more_attrs is None:
79             more_attrs = {}
80
81         dn = "CN=%s,%s" % (cn, self.testbase)
82         attrs = {'cn': cn,
83                  'objectclass': objectclass,
84                  'dn': dn}
85         attrs.update(more_attrs)
86         self.samdb.add(attrs)
87
88         return dn
89
90     def add_objects(self, n, objectclass, prefix=None, more_attrs=None):
91         if more_attrs is None:
92             more_attrs = {}
93         if prefix is None:
94             prefix = objectclass
95         dns = []
96         for i in range(n):
97             dns.append(self.add_object("%s%d" % (prefix, i + 1),
98                                        objectclass,
99                                        more_attrs=more_attrs))
100         return dns
101
102     def add_linked_attribute(self, src, dest, attr='member',
103                              controls=None):
104         m = ldb.Message()
105         m.dn = ldb.Dn(self.samdb, src)
106         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
107         self.samdb.modify(m, controls=controls)
108
109     def remove_linked_attribute(self, src, dest, attr='member',
110                                 controls=None):
111         m = ldb.Message()
112         m.dn = ldb.Dn(self.samdb, src)
113         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
114         self.samdb.modify(m, controls=controls)
115
116     def replace_linked_attribute(self, src, dest, attr='member',
117                                  controls=None):
118         m = ldb.Message()
119         m.dn = ldb.Dn(self.samdb, src)
120         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_REPLACE, attr)
121         self.samdb.modify(m, controls=controls)
122
123     def attr_search(self, obj, attr, scope=ldb.SCOPE_BASE, **controls):
124         if opts.no_reveal_internals:
125             if 'reveal_internals' in controls:
126                 del controls['reveal_internals']
127
128         controls = ['%s:%d' % (k, int(v)) for k, v in controls.items()]
129
130         res = self.samdb.search(obj,
131                                 scope=scope,
132                                 attrs=[attr],
133                                 controls=controls)
134         return res
135
136     def assert_links(self, obj, expected, attr, msg='', **kwargs):
137         res = self.attr_search(obj, attr, **kwargs)
138
139         if len(expected) == 0:
140             if attr in res[0]:
141                 self.fail("found attr '%s' in %s" % (attr, res[0]))
142             return
143
144         try:
145             results = [str(x) for x in res[0][attr]]
146         except KeyError:
147             self.fail("missing attr '%s' on %s" % (attr, obj))
148
149         expected = sorted(expected)
150         results = sorted(results)
151
152         if expected != results:
153             print(msg)
154             print("expected %s" % expected)
155             print("received %s" % results)
156
157         self.assertEqual(results, expected)
158
159     def assert_back_links(self, obj, expected, attr='memberOf', **kwargs):
160         self.assert_links(obj, expected, attr=attr,
161                           msg='back links do not match', **kwargs)
162
163     def assert_forward_links(self, obj, expected, attr='member', **kwargs):
164         self.assert_links(obj, expected, attr=attr,
165                           msg='forward links do not match', **kwargs)
166
167     def get_object_guid(self, dn):
168         res = self.samdb.search(dn,
169                                 scope=ldb.SCOPE_BASE,
170                                 attrs=['objectGUID'])
171         return str(misc.GUID(res[0]['objectGUID'][0]))
172
173     def _test_la_backlinks(self, reveal=False):
174         tag = 'backlinks'
175         kwargs = {}
176         if reveal:
177             tag += '_reveal'
178             kwargs = {'reveal_internals': 0}
179
180         u1, u2 = self.add_objects(2, 'user', 'u_%s' % tag)
181         g1, g2 = self.add_objects(2, 'group', 'g_%s' % tag)
182
183         self.add_linked_attribute(g1, u1)
184         self.add_linked_attribute(g2, u1)
185         self.add_linked_attribute(g2, u2)
186
187         self.assert_back_links(u1, [g1, g2], **kwargs)
188         self.assert_back_links(u2, [g2], **kwargs)
189
190     def test_la_backlinks(self):
191         self._test_la_backlinks()
192
193     def test_la_backlinks_reveal(self):
194         if opts.no_reveal_internals:
195             print('skipping because --no-reveal-internals')
196             return
197         self._test_la_backlinks(True)
198
199     def _test_la_backlinks_delete_group(self, reveal=False):
200         tag = 'del_group'
201         kwargs = {}
202         if reveal:
203             tag += '_reveal'
204             kwargs = {'reveal_internals': 0}
205
206         u1, u2 = self.add_objects(2, 'user', 'u_' + tag)
207         g1, g2 = self.add_objects(2, 'group', 'g_' + tag)
208
209         self.add_linked_attribute(g1, u1)
210         self.add_linked_attribute(g2, u1)
211         self.add_linked_attribute(g2, u2)
212
213         self.samdb.delete(g2, ['tree_delete:1'])
214
215         self.assert_back_links(u1, [g1], **kwargs)
216         self.assert_back_links(u2, set(), **kwargs)
217
218     def test_la_backlinks_delete_group(self):
219         self._test_la_backlinks_delete_group()
220
221     def test_la_backlinks_delete_group_reveal(self):
222         if opts.no_reveal_internals:
223             print('skipping because --no-reveal-internals')
224             return
225         self._test_la_backlinks_delete_group(True)
226
227     def test_links_all_delete_group(self):
228         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
229         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
230         g2guid = self.get_object_guid(g2)
231
232         self.add_linked_attribute(g1, u1)
233         self.add_linked_attribute(g2, u1)
234         self.add_linked_attribute(g2, u2)
235
236         self.samdb.delete(g2)
237         self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
238                                show_deactivated_link=0)
239         self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
240                                show_deactivated_link=0)
241         self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
242                                   show_deactivated_link=0)
243         self.assert_forward_links('<GUID=%s>' % g2guid,
244                                   [], show_deleted=1, show_recycled=1,
245                                   show_deactivated_link=0)
246
247     def test_links_all_delete_group_reveal(self):
248         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group_reveal')
249         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group_reveal')
250         g2guid = self.get_object_guid(g2)
251
252         self.add_linked_attribute(g1, u1)
253         self.add_linked_attribute(g2, u1)
254         self.add_linked_attribute(g2, u2)
255
256         self.samdb.delete(g2)
257         self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
258                                show_deactivated_link=0,
259                                reveal_internals=0)
260         self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
261                                show_deactivated_link=0,
262                                reveal_internals=0)
263         self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
264                                   show_deactivated_link=0,
265                                   reveal_internals=0)
266         self.assert_forward_links('<GUID=%s>' % g2guid,
267                                   [], show_deleted=1, show_recycled=1,
268                                   show_deactivated_link=0,
269                                   reveal_internals=0)
270
271     def test_la_links_delete_link(self):
272         u1, u2 = self.add_objects(2, 'user', 'u_del_link')
273         g1, g2 = self.add_objects(2, 'group', 'g_del_link')
274
275         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
276                                 attrs=['uSNChanged'])
277         old_usn1 = int(res[0]['uSNChanged'][0])
278
279         self.add_linked_attribute(g1, u1)
280
281         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
282                                 attrs=['uSNChanged'])
283         new_usn1 = int(res[0]['uSNChanged'][0])
284
285         self.assertNotEqual(old_usn1, new_usn1, "USN should have incremented")
286
287         self.add_linked_attribute(g2, u1)
288         self.add_linked_attribute(g2, u2)
289
290         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
291                                 attrs=['uSNChanged'])
292         old_usn2 = int(res[0]['uSNChanged'][0])
293
294         self.remove_linked_attribute(g2, u1)
295
296         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
297                                 attrs=['uSNChanged'])
298         new_usn2 = int(res[0]['uSNChanged'][0])
299
300         self.assertNotEqual(old_usn2, new_usn2, "USN should have incremented")
301
302         self.assert_forward_links(g1, [u1])
303         self.assert_forward_links(g2, [u2])
304
305         self.add_linked_attribute(g2, u1)
306         self.assert_forward_links(g2, [u1, u2])
307         self.remove_linked_attribute(g2, u2)
308         self.assert_forward_links(g2, [u1])
309         self.remove_linked_attribute(g2, u1)
310         self.assert_forward_links(g2, [])
311         self.remove_linked_attribute(g1, [])
312         self.assert_forward_links(g1, [])
313
314         # removing a duplicate link in the same message should fail
315         self.add_linked_attribute(g2, [u1, u2])
316         self.assertRaises(ldb.LdbError,
317                           self.remove_linked_attribute, g2, [u1, u1])
318
319     def _test_la_links_delete_link_reveal(self):
320         u1, u2 = self.add_objects(2, 'user', 'u_del_link_reveal')
321         g1, g2 = self.add_objects(2, 'group', 'g_del_link_reveal')
322
323         self.add_linked_attribute(g1, u1)
324         self.add_linked_attribute(g2, u1)
325         self.add_linked_attribute(g2, u2)
326
327         self.remove_linked_attribute(g2, u1)
328
329         self.assert_forward_links(g2, [u1, u2], show_deleted=1,
330                                   show_recycled=1,
331                                   show_deactivated_link=0,
332                                   reveal_internals=0
333                                   )
334
335     def test_la_links_delete_link_reveal(self):
336         if opts.no_reveal_internals:
337             print('skipping because --no-reveal-internals')
338             return
339         self._test_la_links_delete_link_reveal()
340
341     def test_la_links_delete_user(self):
342         u1, u2 = self.add_objects(2, 'user', 'u_del_user')
343         g1, g2 = self.add_objects(2, 'group', 'g_del_user')
344
345         self.add_linked_attribute(g1, u1)
346         self.add_linked_attribute(g2, u1)
347         self.add_linked_attribute(g2, u2)
348
349         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
350                                 attrs=['uSNChanged'])
351         old_usn1 = int(res[0]['uSNChanged'][0])
352
353         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
354                                 attrs=['uSNChanged'])
355         old_usn2 = int(res[0]['uSNChanged'][0])
356
357         self.samdb.delete(u1)
358
359         self.assert_forward_links(g1, [])
360         self.assert_forward_links(g2, [u2])
361
362         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
363                                 attrs=['uSNChanged'])
364         new_usn1 = int(res[0]['uSNChanged'][0])
365
366         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
367                                 attrs=['uSNChanged'])
368         new_usn2 = int(res[0]['uSNChanged'][0])
369
370         # Assert the USN on the alternate object is unchanged
371         self.assertEqual(old_usn1, new_usn1)
372         self.assertEqual(old_usn2, new_usn2)
373
374     def test_la_links_delete_user_reveal(self):
375         u1, u2 = self.add_objects(2, 'user', 'u_del_user_reveal')
376         g1, g2 = self.add_objects(2, 'group', 'g_del_user_reveal')
377
378         self.add_linked_attribute(g1, u1)
379         self.add_linked_attribute(g2, u1)
380         self.add_linked_attribute(g2, u2)
381
382         self.samdb.delete(u1)
383
384         self.assert_forward_links(g2, [u2],
385                                   show_deleted=1, show_recycled=1,
386                                   show_deactivated_link=0,
387                                   reveal_internals=0)
388         self.assert_forward_links(g1, [],
389                                   show_deleted=1, show_recycled=1,
390                                   show_deactivated_link=0,
391                                   reveal_internals=0)
392
393     def test_multiple_links(self):
394         u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_multiple_links')
395         g1, g2, g3, g4 = self.add_objects(4, 'group', 'g_multiple_links')
396
397         self.add_linked_attribute(g1, [u1, u2, u3, u4])
398         self.add_linked_attribute(g2, [u3, u1])
399         self.add_linked_attribute(g3, u2)
400
401         self.assertRaisesLdbError(ldb.ERR_ENTRY_ALREADY_EXISTS,
402                                   "adding duplicate values",
403                                   self.add_linked_attribute, g2,
404                                   [u1, u2, u3, u2])
405
406         self.assert_forward_links(g1, [u1, u2, u3, u4])
407         self.assert_forward_links(g2, [u3, u1])
408         self.assert_forward_links(g3, [u2])
409         self.assert_back_links(u1, [g2, g1])
410         self.assert_back_links(u2, [g3, g1])
411         self.assert_back_links(u3, [g2, g1])
412         self.assert_back_links(u4, [g1])
413
414         self.remove_linked_attribute(g2, [u1, u3])
415         self.remove_linked_attribute(g1, [u1, u3])
416
417         self.assert_forward_links(g1, [u2, u4])
418         self.assert_forward_links(g2, [])
419         self.assert_forward_links(g3, [u2])
420         self.assert_back_links(u1, [])
421         self.assert_back_links(u2, [g3, g1])
422         self.assert_back_links(u3, [])
423         self.assert_back_links(u4, [g1])
424
425         self.add_linked_attribute(g1, [u1, u3])
426         self.add_linked_attribute(g2, [u3, u1])
427         self.add_linked_attribute(g3, [u1, u3])
428
429         self.assert_forward_links(g1, [u1, u2, u3, u4])
430         self.assert_forward_links(g2, [u1, u3])
431         self.assert_forward_links(g3, [u1, u2, u3])
432         self.assert_back_links(u1, [g1, g2, g3])
433         self.assert_back_links(u2, [g3, g1])
434         self.assert_back_links(u3, [g3, g2, g1])
435         self.assert_back_links(u4, [g1])
436
437     def test_la_links_replace(self):
438         u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_replace')
439         g1, g2, g3, g4 = self.add_objects(4, 'group', 'g_replace')
440
441         self.add_linked_attribute(g1, [u1, u2])
442         self.add_linked_attribute(g2, [u1, u3])
443         self.add_linked_attribute(g3, u1)
444
445         self.replace_linked_attribute(g1, [u2])
446         self.replace_linked_attribute(g2, [u2, u3])
447         self.replace_linked_attribute(g3, [u1, u3])
448         self.replace_linked_attribute(g4, [u4])
449
450         self.assert_forward_links(g1, [u2])
451         self.assert_forward_links(g2, [u3, u2])
452         self.assert_forward_links(g3, [u3, u1])
453         self.assert_forward_links(g4, [u4])
454         self.assert_back_links(u1, [g3])
455         self.assert_back_links(u2, [g1, g2])
456         self.assert_back_links(u3, [g2, g3])
457         self.assert_back_links(u4, [g4])
458
459         self.replace_linked_attribute(g1, [u1, u2, u3])
460         self.replace_linked_attribute(g2, [u1])
461         self.replace_linked_attribute(g3, [u2])
462         self.replace_linked_attribute(g4, [])
463
464         self.assert_forward_links(g1, [u1, u2, u3])
465         self.assert_forward_links(g2, [u1])
466         self.assert_forward_links(g3, [u2])
467         self.assert_forward_links(g4, [])
468         self.assert_back_links(u1, [g1, g2])
469         self.assert_back_links(u2, [g1, g3])
470         self.assert_back_links(u3, [g1])
471         self.assert_back_links(u4, [])
472
473         self.assertRaisesLdbError(ldb.ERR_ENTRY_ALREADY_EXISTS,
474                                   "replacing duplicate values",
475                                   self.replace_linked_attribute, g2,
476                                   [u1, u2, u3, u2])
477
478     def test_la_links_replace2(self):
479         users = self.add_objects(12, 'user', 'u_replace2')
480         g1, = self.add_objects(1, 'group', 'g_replace2')
481
482         self.add_linked_attribute(g1, users[:6])
483         self.assert_forward_links(g1, users[:6])
484         self.replace_linked_attribute(g1, users)
485         self.assert_forward_links(g1, users)
486         self.replace_linked_attribute(g1, users[6:])
487         self.assert_forward_links(g1, users[6:])
488         self.remove_linked_attribute(g1, users[6:9])
489         self.assert_forward_links(g1, users[9:])
490         self.remove_linked_attribute(g1, users[9:])
491         self.assert_forward_links(g1, [])
492
493     def test_la_links_permutations(self):
494         """Make sure the order in which we add links doesn't matter."""
495         users = self.add_objects(3, 'user', 'u_permutations')
496         groups = self.add_objects(6, 'group', 'g_permutations')
497
498         for g, p in zip(groups, itertools.permutations(users)):
499             self.add_linked_attribute(g, p)
500
501         # everyone should be in every group
502         for g in groups:
503             self.assert_forward_links(g, users)
504
505         for u in users:
506             self.assert_back_links(u, groups)
507
508         for g, p in zip(groups[::-1], itertools.permutations(users)):
509             self.replace_linked_attribute(g, p)
510
511         for g in groups:
512             self.assert_forward_links(g, users)
513
514         for u in users:
515             self.assert_back_links(u, groups)
516
517         for g, p in zip(groups, itertools.permutations(users)):
518             self.remove_linked_attribute(g, p)
519
520         for g in groups:
521             self.assert_forward_links(g, [])
522
523         for u in users:
524             self.assert_back_links(u, [])
525
526     def test_la_links_relaxed(self):
527         """Check that the relax control doesn't mess with linked attributes."""
528         relax_control = ['relax:0']
529
530         users = self.add_objects(10, 'user', 'u_relax')
531         groups = self.add_objects(3, 'group', 'g_relax',
532                                   more_attrs={'member': users[:2]})
533         g_relax1, g_relax2, g_uptight = groups
534
535         # g_relax1 has all users added at once
536         # g_relax2 gets them one at a time in reverse order
537         # g_uptight never relaxes
538
539         self.add_linked_attribute(g_relax1, users[2:5], controls=relax_control)
540
541         for u in reversed(users[2:5]):
542             self.add_linked_attribute(g_relax2, u, controls=relax_control)
543             self.add_linked_attribute(g_uptight, u)
544
545         for g in groups:
546             self.assert_forward_links(g, users[:5])
547
548             self.add_linked_attribute(g, users[5:7])
549             self.assert_forward_links(g, users[:7])
550
551             for u in users[7:]:
552                 self.add_linked_attribute(g, u)
553
554             self.assert_forward_links(g, users)
555
556         for u in users:
557             self.assert_back_links(u, groups)
558
559         # try some replacement permutations
560         import random
561         random.seed(1)
562         users2 = users[:]
563         for i in range(5):
564             random.shuffle(users2)
565             self.replace_linked_attribute(g_relax1, users2,
566                                           controls=relax_control)
567
568             self.assert_forward_links(g_relax1, users)
569
570         for i in range(5):
571             random.shuffle(users2)
572             self.remove_linked_attribute(g_relax2, users2,
573                                          controls=relax_control)
574             self.remove_linked_attribute(g_uptight, users2)
575
576             self.replace_linked_attribute(g_relax1, [], controls=relax_control)
577
578             random.shuffle(users2)
579             self.add_linked_attribute(g_relax2, users2,
580                                       controls=relax_control)
581             self.add_linked_attribute(g_uptight, users2)
582             self.replace_linked_attribute(g_relax1, users2,
583                                           controls=relax_control)
584
585             self.assert_forward_links(g_relax1, users)
586             self.assert_forward_links(g_relax2, users)
587             self.assert_forward_links(g_uptight, users)
588
589         for u in users:
590             self.assert_back_links(u, groups)
591
592     def test_add_all_at_once(self):
593         """All these other tests are creating linked attributes after the
594         objects are there. We want to test creating them all at once
595         using LDIF.
596         """
597         users = self.add_objects(7, 'user', 'u_all_at_once')
598         g1, g3 = self.add_objects(2, 'group', 'g_all_at_once',
599                                   more_attrs={'member': users})
600         (g2,) = self.add_objects(1, 'group', 'g_all_at_once2',
601                                  more_attrs={'member': users[:5]})
602
603         self.assertRaisesLdbError(ldb.ERR_ENTRY_ALREADY_EXISTS,
604                                   "adding multiple duplicate values",
605                                   self.add_objects, 1, 'group',
606                                   'g_with_duplicate_links',
607                                   more_attrs={'member': users[:5] + users[1:2]})
608
609         self.assert_forward_links(g1, users)
610         self.assert_forward_links(g2, users[:5])
611         self.assert_forward_links(g3, users)
612         for u in users[:5]:
613             self.assert_back_links(u, [g1, g2, g3])
614         for u in users[5:]:
615             self.assert_back_links(u, [g1, g3])
616
617         self.remove_linked_attribute(g2, users[0])
618         self.remove_linked_attribute(g2, users[1])
619         self.add_linked_attribute(g2, users[1])
620         self.add_linked_attribute(g2, users[5])
621         self.add_linked_attribute(g2, users[6])
622
623         self.assert_forward_links(g1, users)
624         self.assert_forward_links(g2, users[1:])
625
626         for u in users[1:]:
627             self.remove_linked_attribute(g2, u)
628         self.remove_linked_attribute(g1, users)
629
630         for u in users:
631             self.samdb.delete(u)
632
633         self.assert_forward_links(g1, [])
634         self.assert_forward_links(g2, [])
635         self.assert_forward_links(g3, [])
636
637     def test_one_way_attributes(self):
638         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
639                                   'e_one_way')
640         guid = self.get_object_guid(e2)
641
642         self.add_linked_attribute(e1, e2, attr="addressBookRoots")
643         self.assert_forward_links(e1, [e2], attr='addressBookRoots')
644
645         self.samdb.delete(e2)
646
647         res = self.samdb.search("<GUID=%s>" % guid,
648                                 scope=ldb.SCOPE_BASE,
649                                 controls=['show_deleted:1',
650                                           'show_recycled:1'])
651
652         new_dn = str(res[0].dn)
653         self.assert_forward_links(e1, [new_dn], attr='addressBookRoots')
654         self.assert_forward_links(e1, [new_dn],
655                                   attr='addressBookRoots',
656                                   show_deactivated_link=0)
657
658     def test_one_way_attributes_delete_link(self):
659         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
660                                   'e_one_way')
661         guid = self.get_object_guid(e2)
662
663         self.add_linked_attribute(e1, e2, attr="addressBookRoots")
664         self.assert_forward_links(e1, [e2], attr='addressBookRoots')
665
666         self.remove_linked_attribute(e1, e2, attr="addressBookRoots")
667
668         self.assert_forward_links(e1, [], attr='addressBookRoots')
669         self.assert_forward_links(e1, [], attr='addressBookRoots',
670                                   show_deactivated_link=0)
671
672     def test_pretend_one_way_attributes(self):
673         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
674                                   'e_one_way')
675         guid = self.get_object_guid(e2)
676
677         self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
678         self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
679
680         self.samdb.delete(e2)
681         res = self.samdb.search("<GUID=%s>" % guid,
682                                 scope=ldb.SCOPE_BASE,
683                                 controls=['show_deleted:1',
684                                           'show_recycled:1'])
685
686         new_dn = str(res[0].dn)
687
688         self.assert_forward_links(e1, [], attr='addressBookRoots2')
689         self.assert_forward_links(e1, [], attr='addressBookRoots2',
690                                   show_deactivated_link=0)
691
692     def test_pretend_one_way_attributes_delete_link(self):
693         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
694                                   'e_one_way')
695         guid = self.get_object_guid(e2)
696
697         self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
698         self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
699
700         self.remove_linked_attribute(e1, e2, attr="addressBookRoots2")
701
702         self.assert_forward_links(e1, [], attr='addressBookRoots2')
703         self.assert_forward_links(e1, [], attr='addressBookRoots2',
704                                   show_deactivated_link=0)
705
706
707     def test_self_link(self):
708         e1, = self.add_objects(1, 'group',
709                               'e_self_link')
710
711         guid = self.get_object_guid(e1)
712         self.add_linked_attribute(e1, e1, attr="member")
713         self.assert_forward_links(e1, [e1], attr='member')
714         self.assert_back_links(e1, [e1], attr='memberOf')
715
716         try:
717             self.samdb.delete(e1)
718         except ldb.LdbError:
719             # Cope with the current bug to make this a failure
720             self.remove_linked_attribute(e1, e1, attr="member")
721             self.samdb.delete(e1)
722             self.fail("could not delete object with link to itself")
723
724         self.assert_forward_links('<GUID=%s>' % guid, [], attr='member',
725                                   show_deleted=1)
726         self.assert_forward_links('<GUID=%s>' % guid, [], attr='member',
727                                   show_deactivated_link=0,
728                                   show_deleted=1)
729         self.assert_back_links('<GUID=%s>' % guid, [], attr='memberOf',
730                                show_deleted=1)
731
732 if "://" not in host:
733     if os.path.isfile(host):
734         host = "tdb://%s" % host
735     else:
736         host = "ldap://%s" % host
737
738
739 TestProgram(module=__name__, opts=subunitopts)