PY3: change shebang to python3 in source4/dsdb dir
[vlendec/samba-autobuild/.git] / source4 / dsdb / tests / python / linked_attributes.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
5 import optparse
6 import sys
7 import os
8 import itertools
9
10 sys.path.insert(0, "bin/python")
11 import samba
12 from samba.tests.subunitrun import SubunitOptions, TestProgram
13
14 import samba.getopt as options
15
16 from samba.auth import system_session
17 import ldb
18 from samba.samdb import SamDB
19 from samba.dcerpc import misc
20
21 parser = optparse.OptionParser("linked_attributes.py [options] <host>")
22 sambaopts = options.SambaOptions(parser)
23 parser.add_option_group(sambaopts)
24 parser.add_option_group(options.VersionOptions(parser))
25 # use command line creds if available
26 credopts = options.CredentialsOptions(parser)
27 parser.add_option_group(credopts)
28 subunitopts = SubunitOptions(parser)
29 parser.add_option_group(subunitopts)
30
31 parser.add_option('--delete-in-setup', action='store_true',
32                   help="cleanup in setup")
33
34 parser.add_option('--no-cleanup', action='store_true',
35                   help="don't cleanup in teardown")
36
37 parser.add_option('--no-reveal-internals', action='store_true',
38                   help="Only use windows compatible ldap controls")
39
40 opts, args = parser.parse_args()
41
42 if len(args) < 1:
43     parser.print_usage()
44     sys.exit(1)
45
46 host = args[0]
47
48 lp = sambaopts.get_loadparm()
49 creds = credopts.get_credentials(lp)
50
51
52 class LATestException(Exception):
53     pass
54
55
56 class LATests(samba.tests.TestCase):
57
58     def setUp(self):
59         super(LATests, self).setUp()
60         self.samdb = SamDB(host, credentials=creds,
61                            session_info=system_session(lp), lp=lp)
62
63         self.base_dn = self.samdb.domain_dn()
64         self.ou = "OU=la,%s" % self.base_dn
65         if opts.delete_in_setup:
66             try:
67                 self.samdb.delete(self.ou, ['tree_delete:1'])
68             except ldb.LdbError as e:
69                 print("tried deleting %s, got error %s" % (self.ou, e))
70         self.samdb.add({'objectclass': 'organizationalUnit',
71                         'dn': self.ou})
72
73     def tearDown(self):
74         super(LATests, self).tearDown()
75         if not opts.no_cleanup:
76             self.samdb.delete(self.ou, ['tree_delete:1'])
77
78     def add_object(self, cn, objectclass, more_attrs={}):
79         dn = "CN=%s,%s" % (cn, self.ou)
80         attrs = {'cn': cn,
81                  'objectclass': objectclass,
82                  'dn': dn}
83         attrs.update(more_attrs)
84         self.samdb.add(attrs)
85
86         return dn
87
88     def add_objects(self, n, objectclass, prefix=None, more_attrs={}):
89         if prefix is None:
90             prefix = objectclass
91         dns = []
92         for i in range(n):
93             dns.append(self.add_object("%s%d" % (prefix, i + 1),
94                                        objectclass,
95                                        more_attrs=more_attrs))
96         return dns
97
98     def add_linked_attribute(self, src, dest, attr='member',
99                              controls=None):
100         m = ldb.Message()
101         m.dn = ldb.Dn(self.samdb, src)
102         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
103         self.samdb.modify(m, controls=controls)
104
105     def remove_linked_attribute(self, src, dest, attr='member',
106                                 controls=None):
107         m = ldb.Message()
108         m.dn = ldb.Dn(self.samdb, src)
109         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
110         self.samdb.modify(m, controls=controls)
111
112     def replace_linked_attribute(self, src, dest, attr='member',
113                                  controls=None):
114         m = ldb.Message()
115         m.dn = ldb.Dn(self.samdb, src)
116         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_REPLACE, attr)
117         self.samdb.modify(m, controls=controls)
118
119     def attr_search(self, obj, attr, scope=ldb.SCOPE_BASE, **controls):
120         if opts.no_reveal_internals:
121             if 'reveal_internals' in controls:
122                 del controls['reveal_internals']
123
124         controls = ['%s:%d' % (k, int(v)) for k, v in controls.items()]
125
126         res = self.samdb.search(obj,
127                                 scope=scope,
128                                 attrs=[attr],
129                                 controls=controls)
130         return res
131
132     def assert_links(self, obj, expected, attr, msg='', **kwargs):
133         res = self.attr_search(obj, attr, **kwargs)
134
135         if len(expected) == 0:
136             if attr in res[0]:
137                 self.fail("found attr '%s' in %s" % (attr, res[0]))
138             return
139
140         try:
141             results = [str(x) for x in res[0][attr]]
142         except KeyError:
143             self.fail("missing attr '%s' on %s" % (attr, obj))
144
145         expected = sorted(expected)
146         results = sorted(results)
147
148         if expected != results:
149             print(msg)
150             print("expected %s" % expected)
151             print("received %s" % results)
152
153         self.assertEqual(results, expected)
154
155     def assert_back_links(self, obj, expected, attr='memberOf', **kwargs):
156         self.assert_links(obj, expected, attr=attr,
157                           msg='back links do not match', **kwargs)
158
159     def assert_forward_links(self, obj, expected, attr='member', **kwargs):
160         self.assert_links(obj, expected, attr=attr,
161                           msg='forward links do not match', **kwargs)
162
163     def get_object_guid(self, dn):
164         res = self.samdb.search(dn,
165                                 scope=ldb.SCOPE_BASE,
166                                 attrs=['objectGUID'])
167         return str(misc.GUID(res[0]['objectGUID'][0]))
168
169     def assertRaisesLdbError(self, errcode, msg, f, *args, **kwargs):
170         """Assert a function raises a particular LdbError."""
171         try:
172             f(*args, **kwargs)
173         except ldb.LdbError as e:
174             (num, msg) = e.args
175             if num != errcode:
176                 lut = {v: k for k, v in vars(ldb).items()
177                        if k.startswith('ERR_') and isinstance(v, int)}
178                 self.fail("%s, expected "
179                           "LdbError %s, (%d) "
180                           "got %s (%d)" % (msg,
181                                            lut.get(errcode), errcode,
182                                            lut.get(num), num))
183         else:
184             lut = {v: k for k, v in vars(ldb).items()
185                    if k.startswith('ERR_') and isinstance(v, int)}
186             self.fail("%s, expected "
187                       "LdbError %s, (%d) "
188                       "but we got success" % (msg, lut.get(errcode), errcode))
189
190     def _test_la_backlinks(self, reveal=False):
191         tag = 'backlinks'
192         kwargs = {}
193         if reveal:
194             tag += '_reveal'
195             kwargs = {'reveal_internals': 0}
196
197         u1, u2 = self.add_objects(2, 'user', 'u_%s' % tag)
198         g1, g2 = self.add_objects(2, 'group', 'g_%s' % tag)
199
200         self.add_linked_attribute(g1, u1)
201         self.add_linked_attribute(g2, u1)
202         self.add_linked_attribute(g2, u2)
203
204         self.assert_back_links(u1, [g1, g2], **kwargs)
205         self.assert_back_links(u2, [g2], **kwargs)
206
207     def test_la_backlinks(self):
208         self._test_la_backlinks()
209
210     def test_la_backlinks_reveal(self):
211         if opts.no_reveal_internals:
212             print('skipping because --no-reveal-internals')
213             return
214         self._test_la_backlinks(True)
215
216     def _test_la_backlinks_delete_group(self, reveal=False):
217         tag = 'del_group'
218         kwargs = {}
219         if reveal:
220             tag += '_reveal'
221             kwargs = {'reveal_internals': 0}
222
223         u1, u2 = self.add_objects(2, 'user', 'u_' + tag)
224         g1, g2 = self.add_objects(2, 'group', 'g_' + tag)
225
226         self.add_linked_attribute(g1, u1)
227         self.add_linked_attribute(g2, u1)
228         self.add_linked_attribute(g2, u2)
229
230         self.samdb.delete(g2, ['tree_delete:1'])
231
232         self.assert_back_links(u1, [g1], **kwargs)
233         self.assert_back_links(u2, set(), **kwargs)
234
235     def test_la_backlinks_delete_group(self):
236         self._test_la_backlinks_delete_group()
237
238     def test_la_backlinks_delete_group_reveal(self):
239         if opts.no_reveal_internals:
240             print('skipping because --no-reveal-internals')
241             return
242         self._test_la_backlinks_delete_group(True)
243
244     def test_links_all_delete_group(self):
245         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
246         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
247         g2guid = self.get_object_guid(g2)
248
249         self.add_linked_attribute(g1, u1)
250         self.add_linked_attribute(g2, u1)
251         self.add_linked_attribute(g2, u2)
252
253         self.samdb.delete(g2)
254         self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
255                                show_deactivated_link=0)
256         self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
257                                show_deactivated_link=0)
258         self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
259                                   show_deactivated_link=0)
260         self.assert_forward_links('<GUID=%s>' % g2guid,
261                                   [], show_deleted=1, show_recycled=1,
262                                   show_deactivated_link=0)
263
264     def test_links_all_delete_group_reveal(self):
265         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group_reveal')
266         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group_reveal')
267         g2guid = self.get_object_guid(g2)
268
269         self.add_linked_attribute(g1, u1)
270         self.add_linked_attribute(g2, u1)
271         self.add_linked_attribute(g2, u2)
272
273         self.samdb.delete(g2)
274         self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
275                                show_deactivated_link=0,
276                                reveal_internals=0)
277         self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
278                                show_deactivated_link=0,
279                                reveal_internals=0)
280         self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
281                                   show_deactivated_link=0,
282                                   reveal_internals=0)
283         self.assert_forward_links('<GUID=%s>' % g2guid,
284                                   [], show_deleted=1, show_recycled=1,
285                                   show_deactivated_link=0,
286                                   reveal_internals=0)
287
288     def test_la_links_delete_link(self):
289         u1, u2 = self.add_objects(2, 'user', 'u_del_link')
290         g1, g2 = self.add_objects(2, 'group', 'g_del_link')
291
292         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
293                                 attrs=['uSNChanged'])
294         old_usn1 = int(res[0]['uSNChanged'][0])
295
296         self.add_linked_attribute(g1, u1)
297
298         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
299                                 attrs=['uSNChanged'])
300         new_usn1 = int(res[0]['uSNChanged'][0])
301
302         self.assertNotEqual(old_usn1, new_usn1, "USN should have incremented")
303
304         self.add_linked_attribute(g2, u1)
305         self.add_linked_attribute(g2, u2)
306
307         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
308                                 attrs=['uSNChanged'])
309         old_usn2 = int(res[0]['uSNChanged'][0])
310
311         self.remove_linked_attribute(g2, u1)
312
313         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
314                                 attrs=['uSNChanged'])
315         new_usn2 = int(res[0]['uSNChanged'][0])
316
317         self.assertNotEqual(old_usn2, new_usn2, "USN should have incremented")
318
319         self.assert_forward_links(g1, [u1])
320         self.assert_forward_links(g2, [u2])
321
322         self.add_linked_attribute(g2, u1)
323         self.assert_forward_links(g2, [u1, u2])
324         self.remove_linked_attribute(g2, u2)
325         self.assert_forward_links(g2, [u1])
326         self.remove_linked_attribute(g2, u1)
327         self.assert_forward_links(g2, [])
328         self.remove_linked_attribute(g1, [])
329         self.assert_forward_links(g1, [])
330
331         # removing a duplicate link in the same message should fail
332         self.add_linked_attribute(g2, [u1, u2])
333         self.assertRaises(ldb.LdbError,
334                           self.remove_linked_attribute, g2, [u1, u1])
335
336     def _test_la_links_delete_link_reveal(self):
337         u1, u2 = self.add_objects(2, 'user', 'u_del_link_reveal')
338         g1, g2 = self.add_objects(2, 'group', 'g_del_link_reveal')
339
340         self.add_linked_attribute(g1, u1)
341         self.add_linked_attribute(g2, u1)
342         self.add_linked_attribute(g2, u2)
343
344         self.remove_linked_attribute(g2, u1)
345
346         self.assert_forward_links(g2, [u1, u2], show_deleted=1,
347                                   show_recycled=1,
348                                   show_deactivated_link=0,
349                                   reveal_internals=0
350                                   )
351
352     def test_la_links_delete_link_reveal(self):
353         if opts.no_reveal_internals:
354             print('skipping because --no-reveal-internals')
355             return
356         self._test_la_links_delete_link_reveal()
357
358     def test_la_links_delete_user(self):
359         u1, u2 = self.add_objects(2, 'user', 'u_del_user')
360         g1, g2 = self.add_objects(2, 'group', 'g_del_user')
361
362         self.add_linked_attribute(g1, u1)
363         self.add_linked_attribute(g2, u1)
364         self.add_linked_attribute(g2, u2)
365
366         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
367                                 attrs=['uSNChanged'])
368         old_usn1 = int(res[0]['uSNChanged'][0])
369
370         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
371                                 attrs=['uSNChanged'])
372         old_usn2 = int(res[0]['uSNChanged'][0])
373
374         self.samdb.delete(u1)
375
376         self.assert_forward_links(g1, [])
377         self.assert_forward_links(g2, [u2])
378
379         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
380                                 attrs=['uSNChanged'])
381         new_usn1 = int(res[0]['uSNChanged'][0])
382
383         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
384                                 attrs=['uSNChanged'])
385         new_usn2 = int(res[0]['uSNChanged'][0])
386
387         # Assert the USN on the alternate object is unchanged
388         self.assertEqual(old_usn1, new_usn1)
389         self.assertEqual(old_usn2, new_usn2)
390
391     def test_la_links_delete_user_reveal(self):
392         u1, u2 = self.add_objects(2, 'user', 'u_del_user_reveal')
393         g1, g2 = self.add_objects(2, 'group', 'g_del_user_reveal')
394
395         self.add_linked_attribute(g1, u1)
396         self.add_linked_attribute(g2, u1)
397         self.add_linked_attribute(g2, u2)
398
399         self.samdb.delete(u1)
400
401         self.assert_forward_links(g2, [u2],
402                                   show_deleted=1, show_recycled=1,
403                                   show_deactivated_link=0,
404                                   reveal_internals=0)
405         self.assert_forward_links(g1, [],
406                                   show_deleted=1, show_recycled=1,
407                                   show_deactivated_link=0,
408                                   reveal_internals=0)
409
410     def test_multiple_links(self):
411         u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_multiple_links')
412         g1, g2, g3, g4 = self.add_objects(4, 'group', 'g_multiple_links')
413
414         self.add_linked_attribute(g1, [u1, u2, u3, u4])
415         self.add_linked_attribute(g2, [u3, u1])
416         self.add_linked_attribute(g3, u2)
417
418         self.assertRaisesLdbError(ldb.ERR_ENTRY_ALREADY_EXISTS,
419                                   "adding duplicate values",
420                                   self.add_linked_attribute, g2,
421                                   [u1, u2, u3, u2])
422
423         self.assert_forward_links(g1, [u1, u2, u3, u4])
424         self.assert_forward_links(g2, [u3, u1])
425         self.assert_forward_links(g3, [u2])
426         self.assert_back_links(u1, [g2, g1])
427         self.assert_back_links(u2, [g3, g1])
428         self.assert_back_links(u3, [g2, g1])
429         self.assert_back_links(u4, [g1])
430
431         self.remove_linked_attribute(g2, [u1, u3])
432         self.remove_linked_attribute(g1, [u1, u3])
433
434         self.assert_forward_links(g1, [u2, u4])
435         self.assert_forward_links(g2, [])
436         self.assert_forward_links(g3, [u2])
437         self.assert_back_links(u1, [])
438         self.assert_back_links(u2, [g3, g1])
439         self.assert_back_links(u3, [])
440         self.assert_back_links(u4, [g1])
441
442         self.add_linked_attribute(g1, [u1, u3])
443         self.add_linked_attribute(g2, [u3, u1])
444         self.add_linked_attribute(g3, [u1, u3])
445
446         self.assert_forward_links(g1, [u1, u2, u3, u4])
447         self.assert_forward_links(g2, [u1, u3])
448         self.assert_forward_links(g3, [u1, u2, u3])
449         self.assert_back_links(u1, [g1, g2, g3])
450         self.assert_back_links(u2, [g3, g1])
451         self.assert_back_links(u3, [g3, g2, g1])
452         self.assert_back_links(u4, [g1])
453
454     def test_la_links_replace(self):
455         u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_replace')
456         g1, g2, g3, g4 = self.add_objects(4, 'group', 'g_replace')
457
458         self.add_linked_attribute(g1, [u1, u2])
459         self.add_linked_attribute(g2, [u1, u3])
460         self.add_linked_attribute(g3, u1)
461
462         self.replace_linked_attribute(g1, [u2])
463         self.replace_linked_attribute(g2, [u2, u3])
464         self.replace_linked_attribute(g3, [u1, u3])
465         self.replace_linked_attribute(g4, [u4])
466
467         self.assert_forward_links(g1, [u2])
468         self.assert_forward_links(g2, [u3, u2])
469         self.assert_forward_links(g3, [u3, u1])
470         self.assert_forward_links(g4, [u4])
471         self.assert_back_links(u1, [g3])
472         self.assert_back_links(u2, [g1, g2])
473         self.assert_back_links(u3, [g2, g3])
474         self.assert_back_links(u4, [g4])
475
476         self.replace_linked_attribute(g1, [u1, u2, u3])
477         self.replace_linked_attribute(g2, [u1])
478         self.replace_linked_attribute(g3, [u2])
479         self.replace_linked_attribute(g4, [])
480
481         self.assert_forward_links(g1, [u1, u2, u3])
482         self.assert_forward_links(g2, [u1])
483         self.assert_forward_links(g3, [u2])
484         self.assert_forward_links(g4, [])
485         self.assert_back_links(u1, [g1, g2])
486         self.assert_back_links(u2, [g1, g3])
487         self.assert_back_links(u3, [g1])
488         self.assert_back_links(u4, [])
489
490         self.assertRaisesLdbError(ldb.ERR_ENTRY_ALREADY_EXISTS,
491                                   "replacing duplicate values",
492                                   self.replace_linked_attribute, g2,
493                                   [u1, u2, u3, u2])
494
495     def test_la_links_replace2(self):
496         users = self.add_objects(12, 'user', 'u_replace2')
497         g1, = self.add_objects(1, 'group', 'g_replace2')
498
499         self.add_linked_attribute(g1, users[:6])
500         self.assert_forward_links(g1, users[:6])
501         self.replace_linked_attribute(g1, users)
502         self.assert_forward_links(g1, users)
503         self.replace_linked_attribute(g1, users[6:])
504         self.assert_forward_links(g1, users[6:])
505         self.remove_linked_attribute(g1, users[6:9])
506         self.assert_forward_links(g1, users[9:])
507         self.remove_linked_attribute(g1, users[9:])
508         self.assert_forward_links(g1, [])
509
510     def test_la_links_permutations(self):
511         """Make sure the order in which we add links doesn't matter."""
512         users = self.add_objects(3, 'user', 'u_permutations')
513         groups = self.add_objects(6, 'group', 'g_permutations')
514
515         for g, p in zip(groups, itertools.permutations(users)):
516             self.add_linked_attribute(g, p)
517
518         # everyone should be in every group
519         for g in groups:
520             self.assert_forward_links(g, users)
521
522         for u in users:
523             self.assert_back_links(u, groups)
524
525         for g, p in zip(groups[::-1], itertools.permutations(users)):
526             self.replace_linked_attribute(g, p)
527
528         for g in groups:
529             self.assert_forward_links(g, users)
530
531         for u in users:
532             self.assert_back_links(u, groups)
533
534         for g, p in zip(groups, itertools.permutations(users)):
535             self.remove_linked_attribute(g, p)
536
537         for g in groups:
538             self.assert_forward_links(g, [])
539
540         for u in users:
541             self.assert_back_links(u, [])
542
543     def test_la_links_relaxed(self):
544         """Check that the relax control doesn't mess with linked attributes."""
545         relax_control = ['relax:0']
546
547         users = self.add_objects(10, 'user', 'u_relax')
548         groups = self.add_objects(3, 'group', 'g_relax',
549                                   more_attrs={'member': users[:2]})
550         g_relax1, g_relax2, g_uptight = groups
551
552         # g_relax1 has all users added at once
553         # g_relax2 gets them one at a time in reverse order
554         # g_uptight never relaxes
555
556         self.add_linked_attribute(g_relax1, users[2:5], controls=relax_control)
557
558         for u in reversed(users[2:5]):
559             self.add_linked_attribute(g_relax2, u, controls=relax_control)
560             self.add_linked_attribute(g_uptight, u)
561
562         for g in groups:
563             self.assert_forward_links(g, users[:5])
564
565             self.add_linked_attribute(g, users[5:7])
566             self.assert_forward_links(g, users[:7])
567
568             for u in users[7:]:
569                 self.add_linked_attribute(g, u)
570
571             self.assert_forward_links(g, users)
572
573         for u in users:
574             self.assert_back_links(u, groups)
575
576         # try some replacement permutations
577         import random
578         random.seed(1)
579         users2 = users[:]
580         for i in range(5):
581             random.shuffle(users2)
582             self.replace_linked_attribute(g_relax1, users2,
583                                           controls=relax_control)
584
585             self.assert_forward_links(g_relax1, users)
586
587         for i in range(5):
588             random.shuffle(users2)
589             self.remove_linked_attribute(g_relax2, users2,
590                                          controls=relax_control)
591             self.remove_linked_attribute(g_uptight, users2)
592
593             self.replace_linked_attribute(g_relax1, [], controls=relax_control)
594
595             random.shuffle(users2)
596             self.add_linked_attribute(g_relax2, users2,
597                                       controls=relax_control)
598             self.add_linked_attribute(g_uptight, users2)
599             self.replace_linked_attribute(g_relax1, users2,
600                                           controls=relax_control)
601
602             self.assert_forward_links(g_relax1, users)
603             self.assert_forward_links(g_relax2, users)
604             self.assert_forward_links(g_uptight, users)
605
606         for u in users:
607             self.assert_back_links(u, groups)
608
609     def test_add_all_at_once(self):
610         """All these other tests are creating linked attributes after the
611         objects are there. We want to test creating them all at once
612         using LDIF.
613         """
614         users = self.add_objects(7, 'user', 'u_all_at_once')
615         g1, g3 = self.add_objects(2, 'group', 'g_all_at_once',
616                                   more_attrs={'member': users})
617         (g2,) = self.add_objects(1, 'group', 'g_all_at_once2',
618                                  more_attrs={'member': users[:5]})
619
620         self.assertRaisesLdbError(ldb.ERR_ENTRY_ALREADY_EXISTS,
621                                   "adding multiple duplicate values",
622                                   self.add_objects, 1, 'group',
623                                   'g_with_duplicate_links',
624                                   more_attrs={'member': users[:5] + users[1:2]})
625
626         self.assert_forward_links(g1, users)
627         self.assert_forward_links(g2, users[:5])
628         self.assert_forward_links(g3, users)
629         for u in users[:5]:
630             self.assert_back_links(u, [g1, g2, g3])
631         for u in users[5:]:
632             self.assert_back_links(u, [g1, g3])
633
634         self.remove_linked_attribute(g2, users[0])
635         self.remove_linked_attribute(g2, users[1])
636         self.add_linked_attribute(g2, users[1])
637         self.add_linked_attribute(g2, users[5])
638         self.add_linked_attribute(g2, users[6])
639
640         self.assert_forward_links(g1, users)
641         self.assert_forward_links(g2, users[1:])
642
643         for u in users[1:]:
644             self.remove_linked_attribute(g2, u)
645         self.remove_linked_attribute(g1, users)
646
647         for u in users:
648             self.samdb.delete(u)
649
650         self.assert_forward_links(g1, [])
651         self.assert_forward_links(g2, [])
652         self.assert_forward_links(g3, [])
653
654     def test_one_way_attributes(self):
655         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
656                                   'e_one_way')
657         guid = self.get_object_guid(e2)
658
659         self.add_linked_attribute(e1, e2, attr="addressBookRoots")
660         self.assert_forward_links(e1, [e2], attr='addressBookRoots')
661
662         self.samdb.delete(e2)
663
664         res = self.samdb.search("<GUID=%s>" % guid,
665                                 scope=ldb.SCOPE_BASE,
666                                 controls=['show_deleted:1',
667                                           'show_recycled:1'])
668
669         new_dn = str(res[0].dn)
670         self.assert_forward_links(e1, [new_dn], attr='addressBookRoots')
671         self.assert_forward_links(e1, [new_dn],
672                                   attr='addressBookRoots',
673                                   show_deactivated_link=0)
674
675     def test_one_way_attributes_delete_link(self):
676         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
677                                   'e_one_way')
678         guid = self.get_object_guid(e2)
679
680         self.add_linked_attribute(e1, e2, attr="addressBookRoots")
681         self.assert_forward_links(e1, [e2], attr='addressBookRoots')
682
683         self.remove_linked_attribute(e1, e2, attr="addressBookRoots")
684
685         self.assert_forward_links(e1, [], attr='addressBookRoots')
686         self.assert_forward_links(e1, [], attr='addressBookRoots',
687                                   show_deactivated_link=0)
688
689     def test_pretend_one_way_attributes(self):
690         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
691                                   'e_one_way')
692         guid = self.get_object_guid(e2)
693
694         self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
695         self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
696
697         self.samdb.delete(e2)
698         res = self.samdb.search("<GUID=%s>" % guid,
699                                 scope=ldb.SCOPE_BASE,
700                                 controls=['show_deleted:1',
701                                           'show_recycled:1'])
702
703         new_dn = str(res[0].dn)
704
705         self.assert_forward_links(e1, [], attr='addressBookRoots2')
706         self.assert_forward_links(e1, [], attr='addressBookRoots2',
707                                   show_deactivated_link=0)
708
709     def test_pretend_one_way_attributes_delete_link(self):
710         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
711                                   'e_one_way')
712         guid = self.get_object_guid(e2)
713
714         self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
715         self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
716
717         self.remove_linked_attribute(e1, e2, attr="addressBookRoots2")
718
719         self.assert_forward_links(e1, [], attr='addressBookRoots2')
720         self.assert_forward_links(e1, [], attr='addressBookRoots2',
721                                   show_deactivated_link=0)
722
723
724 if "://" not in host:
725     if os.path.isfile(host):
726         host = "tdb://%s" % host
727     else:
728         host = "ldap://%s" % host
729
730
731 TestProgram(module=__name__, opts=subunitopts)