d40192badc7a427d37277376f7fc9fad2a726628
[kai/samba-autobuild/.git] / source4 / dsdb / tests / python / linked_attributes.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
5 import optparse
6 import sys
7 import os
8 import itertools
9
10 sys.path.insert(0, "bin/python")
11 import samba
12 from samba.tests.subunitrun import SubunitOptions, TestProgram
13
14 import samba.getopt as options
15
16 from samba.auth import system_session
17 import ldb
18 from samba.samdb import SamDB
19 from samba.dcerpc import misc
20
21 parser = optparse.OptionParser("linked_attributes.py [options] <host>")
22 sambaopts = options.SambaOptions(parser)
23 parser.add_option_group(sambaopts)
24 parser.add_option_group(options.VersionOptions(parser))
25 # use command line creds if available
26 credopts = options.CredentialsOptions(parser)
27 parser.add_option_group(credopts)
28 subunitopts = SubunitOptions(parser)
29 parser.add_option_group(subunitopts)
30
31 parser.add_option('--delete-in-setup', action='store_true',
32                   help="cleanup in setup")
33
34 parser.add_option('--no-cleanup', action='store_true',
35                   help="don't cleanup in teardown")
36
37 parser.add_option('--no-reveal-internals', action='store_true',
38                   help="Only use windows compatible ldap controls")
39
40 opts, args = parser.parse_args()
41
42 if len(args) < 1:
43     parser.print_usage()
44     sys.exit(1)
45
46 host = args[0]
47
48 lp = sambaopts.get_loadparm()
49 creds = credopts.get_credentials(lp)
50
51
52 class LATestException(Exception):
53     pass
54
55
56 class LATests(samba.tests.TestCase):
57
58     def setUp(self):
59         super(LATests, self).setUp()
60         self.samdb = SamDB(host, credentials=creds,
61                            session_info=system_session(lp), lp=lp)
62
63         self.base_dn = self.samdb.domain_dn()
64         self.ou = "OU=la,%s" % self.base_dn
65         if opts.delete_in_setup:
66             try:
67                 self.samdb.delete(self.ou, ['tree_delete:1'])
68             except ldb.LdbError as e:
69                 print("tried deleting %s, got error %s" % (self.ou, e))
70         self.samdb.add({'objectclass': 'organizationalUnit',
71                         'dn': self.ou})
72
73     def tearDown(self):
74         super(LATests, self).tearDown()
75         if not opts.no_cleanup:
76             self.samdb.delete(self.ou, ['tree_delete:1'])
77
78     def add_object(self, cn, objectclass, more_attrs={}):
79         dn = "CN=%s,%s" % (cn, self.ou)
80         attrs = {'cn': cn,
81                  'objectclass': objectclass,
82                  'dn': dn}
83         attrs.update(more_attrs)
84         self.samdb.add(attrs)
85
86         return dn
87
88     def add_objects(self, n, objectclass, prefix=None, more_attrs={}):
89         if prefix is None:
90             prefix = objectclass
91         dns = []
92         for i in range(n):
93             dns.append(self.add_object("%s%d" % (prefix, i + 1),
94                                        objectclass,
95                                        more_attrs=more_attrs))
96         return dns
97
98     def add_linked_attribute(self, src, dest, attr='member',
99                              controls=None):
100         m = ldb.Message()
101         m.dn = ldb.Dn(self.samdb, src)
102         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
103         self.samdb.modify(m, controls=controls)
104
105     def remove_linked_attribute(self, src, dest, attr='member',
106                                 controls=None):
107         m = ldb.Message()
108         m.dn = ldb.Dn(self.samdb, src)
109         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
110         self.samdb.modify(m, controls=controls)
111
112     def replace_linked_attribute(self, src, dest, attr='member',
113                                  controls=None):
114         m = ldb.Message()
115         m.dn = ldb.Dn(self.samdb, src)
116         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_REPLACE, attr)
117         self.samdb.modify(m, controls=controls)
118
119     def attr_search(self, obj, attr, scope=ldb.SCOPE_BASE, **controls):
120         if opts.no_reveal_internals:
121             if 'reveal_internals' in controls:
122                 del controls['reveal_internals']
123
124         controls = ['%s:%d' % (k, int(v)) for k, v in controls.items()]
125
126         res = self.samdb.search(obj,
127                                 scope=scope,
128                                 attrs=[attr],
129                                 controls=controls)
130         return res
131
132     def assert_links(self, obj, expected, attr, msg='', **kwargs):
133         res = self.attr_search(obj, attr, **kwargs)
134
135         if len(expected) == 0:
136             if attr in res[0]:
137                 self.fail("found attr '%s' in %s" % (attr, res[0]))
138             return
139
140         try:
141             results = list([x[attr] for x in res][0])
142         except KeyError:
143             self.fail("missing attr '%s' on %s" % (attr, obj))
144
145         expected = sorted(expected)
146         results = sorted(results)
147
148         if expected != results:
149             print(msg)
150             print("expected %s" % expected)
151             print("received %s" % results)
152
153         self.assertEqual(results, expected)
154
155     def assert_back_links(self, obj, expected, attr='memberOf', **kwargs):
156         self.assert_links(obj, expected, attr=attr,
157                           msg='back links do not match', **kwargs)
158
159     def assert_forward_links(self, obj, expected, attr='member', **kwargs):
160         self.assert_links(obj, expected, attr=attr,
161                           msg='forward links do not match', **kwargs)
162
163     def get_object_guid(self, dn):
164         res = self.samdb.search(dn,
165                                 scope=ldb.SCOPE_BASE,
166                                 attrs=['objectGUID'])
167         return str(misc.GUID(res[0]['objectGUID'][0]))
168
169     def assertRaisesLdbError(self, errcode, msg, f, *args, **kwargs):
170         """Assert a function raises a particular LdbError."""
171         try:
172             f(*args, **kwargs)
173         except ldb.LdbError as e:
174             (num, msg) = e.args
175             if num != errcode:
176                 lut = {v: k for k, v in vars(ldb).iteritems()
177                        if k.startswith('ERR_') and isinstance(v, int)}
178                 self.fail("%s, expected "
179                           "LdbError %s, (%d) "
180                           "got %s (%d)" % (msg,
181                                            lut.get(errcode), errcode,
182                                            lut.get(num), num))
183         else:
184             lut = {v: k for k, v in vars(ldb).iteritems()
185                    if k.startswith('ERR_') and isinstance(v, int)}
186             self.fail("%s, expected "
187                       "LdbError %s, (%d) "
188                       "but we got success" % (msg, lut.get(errcode), errcode))
189
190     def _test_la_backlinks(self, reveal=False):
191         tag = 'backlinks'
192         kwargs = {}
193         if reveal:
194             tag += '_reveal'
195             kwargs = {'reveal_internals': 0}
196
197         u1, u2 = self.add_objects(2, 'user', 'u_%s' % tag)
198         g1, g2 = self.add_objects(2, 'group', 'g_%s' % tag)
199
200         self.add_linked_attribute(g1, u1)
201         self.add_linked_attribute(g2, u1)
202         self.add_linked_attribute(g2, u2)
203
204         self.assert_back_links(u1, [g1, g2], **kwargs)
205         self.assert_back_links(u2, [g2], **kwargs)
206
207     def test_la_backlinks(self):
208         self._test_la_backlinks()
209
210     def test_la_backlinks_reveal(self):
211         if opts.no_reveal_internals:
212             print('skipping because --no-reveal-internals')
213             return
214         self._test_la_backlinks(True)
215
216     def _test_la_backlinks_delete_group(self, reveal=False):
217         tag = 'del_group'
218         kwargs = {}
219         if reveal:
220             tag += '_reveal'
221             kwargs = {'reveal_internals': 0}
222
223         u1, u2 = self.add_objects(2, 'user', 'u_' + tag)
224         g1, g2 = self.add_objects(2, 'group', 'g_' + tag)
225
226         self.add_linked_attribute(g1, u1)
227         self.add_linked_attribute(g2, u1)
228         self.add_linked_attribute(g2, u2)
229
230         self.samdb.delete(g2, ['tree_delete:1'])
231
232         self.assert_back_links(u1, [g1], **kwargs)
233         self.assert_back_links(u2, set(), **kwargs)
234
235     def test_la_backlinks_delete_group(self):
236         self._test_la_backlinks_delete_group()
237
238     def test_la_backlinks_delete_group_reveal(self):
239         if opts.no_reveal_internals:
240             print('skipping because --no-reveal-internals')
241             return
242         self._test_la_backlinks_delete_group(True)
243
244     def test_links_all_delete_group(self):
245         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
246         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
247         g2guid = self.get_object_guid(g2)
248
249         self.add_linked_attribute(g1, u1)
250         self.add_linked_attribute(g2, u1)
251         self.add_linked_attribute(g2, u2)
252
253         self.samdb.delete(g2)
254         self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
255                                show_deactivated_link=0)
256         self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
257                                show_deactivated_link=0)
258         self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
259                                   show_deactivated_link=0)
260         self.assert_forward_links('<GUID=%s>' % g2guid,
261                                   [], show_deleted=1, show_recycled=1,
262                                   show_deactivated_link=0)
263
264     def test_links_all_delete_group_reveal(self):
265         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group_reveal')
266         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group_reveal')
267         g2guid = self.get_object_guid(g2)
268
269         self.add_linked_attribute(g1, u1)
270         self.add_linked_attribute(g2, u1)
271         self.add_linked_attribute(g2, u2)
272
273         self.samdb.delete(g2)
274         self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
275                                show_deactivated_link=0,
276                                reveal_internals=0)
277         self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
278                                show_deactivated_link=0,
279                                reveal_internals=0)
280         self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
281                                   show_deactivated_link=0,
282                                   reveal_internals=0)
283         self.assert_forward_links('<GUID=%s>' % g2guid,
284                                   [], show_deleted=1, show_recycled=1,
285                                   show_deactivated_link=0,
286                                   reveal_internals=0)
287
288     def test_la_links_delete_link(self):
289         u1, u2 = self.add_objects(2, 'user', 'u_del_link')
290         g1, g2 = self.add_objects(2, 'group', 'g_del_link')
291
292         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
293                                 attrs=['uSNChanged'])
294         old_usn1 = int(res[0]['uSNChanged'][0])
295
296         self.add_linked_attribute(g1, u1)
297
298         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
299                                 attrs=['uSNChanged'])
300         new_usn1 = int(res[0]['uSNChanged'][0])
301
302         self.assertNotEqual(old_usn1, new_usn1, "USN should have incremented")
303
304         self.add_linked_attribute(g2, u1)
305         self.add_linked_attribute(g2, u2)
306
307         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
308                                 attrs=['uSNChanged'])
309         old_usn2 = int(res[0]['uSNChanged'][0])
310
311         self.remove_linked_attribute(g2, u1)
312
313         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
314                                 attrs=['uSNChanged'])
315         new_usn2 = int(res[0]['uSNChanged'][0])
316
317         self.assertNotEqual(old_usn2, new_usn2, "USN should have incremented")
318
319         self.assert_forward_links(g1, [u1])
320         self.assert_forward_links(g2, [u2])
321
322         self.add_linked_attribute(g2, u1)
323         self.assert_forward_links(g2, [u1, u2])
324         self.remove_linked_attribute(g2, u2)
325         self.assert_forward_links(g2, [u1])
326         self.remove_linked_attribute(g2, u1)
327         self.assert_forward_links(g2, [])
328         self.remove_linked_attribute(g1, [])
329         self.assert_forward_links(g1, [])
330
331         # removing a duplicate link in the same message should fail
332         self.add_linked_attribute(g2, [u1, u2])
333         self.assertRaises(ldb.LdbError,
334                           self.remove_linked_attribute,g2, [u1, u1])
335
336     def _test_la_links_delete_link_reveal(self):
337         u1, u2 = self.add_objects(2, 'user', 'u_del_link_reveal')
338         g1, g2 = self.add_objects(2, 'group', 'g_del_link_reveal')
339
340         self.add_linked_attribute(g1, u1)
341         self.add_linked_attribute(g2, u1)
342         self.add_linked_attribute(g2, u2)
343
344         self.remove_linked_attribute(g2, u1)
345
346         self.assert_forward_links(g2, [u1, u2], show_deleted=1,
347                                   show_recycled=1,
348                                   show_deactivated_link=0,
349                                   reveal_internals=0
350         )
351
352     def test_la_links_delete_link_reveal(self):
353         if opts.no_reveal_internals:
354             print('skipping because --no-reveal-internals')
355             return
356         self._test_la_links_delete_link_reveal()
357
358     def test_la_links_delete_user(self):
359         u1, u2 = self.add_objects(2, 'user', 'u_del_user')
360         g1, g2 = self.add_objects(2, 'group', 'g_del_user')
361
362         self.add_linked_attribute(g1, u1)
363         self.add_linked_attribute(g2, u1)
364         self.add_linked_attribute(g2, u2)
365
366         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
367                                 attrs=['uSNChanged'])
368         old_usn1 = int(res[0]['uSNChanged'][0])
369
370         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
371                                 attrs=['uSNChanged'])
372         old_usn2 = int(res[0]['uSNChanged'][0])
373
374         self.samdb.delete(u1)
375
376         self.assert_forward_links(g1, [])
377         self.assert_forward_links(g2, [u2])
378
379         res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
380                                 attrs=['uSNChanged'])
381         new_usn1 = int(res[0]['uSNChanged'][0])
382
383         res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
384                                 attrs=['uSNChanged'])
385         new_usn2 = int(res[0]['uSNChanged'][0])
386
387         # Assert the USN on the alternate object is unchanged
388         self.assertEqual(old_usn1, new_usn1)
389         self.assertEqual(old_usn2, new_usn2)
390
391     def test_la_links_delete_user_reveal(self):
392         u1, u2 = self.add_objects(2, 'user', 'u_del_user_reveal')
393         g1, g2 = self.add_objects(2, 'group', 'g_del_user_reveal')
394
395         self.add_linked_attribute(g1, u1)
396         self.add_linked_attribute(g2, u1)
397         self.add_linked_attribute(g2, u2)
398
399         self.samdb.delete(u1)
400
401         self.assert_forward_links(g2, [u2],
402                                   show_deleted=1, show_recycled=1,
403                                   show_deactivated_link=0,
404                                   reveal_internals=0)
405         self.assert_forward_links(g1, [],
406                                   show_deleted=1, show_recycled=1,
407                                   show_deactivated_link=0,
408                                   reveal_internals=0)
409
410     def test_multiple_links(self):
411         u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_multiple_links')
412         g1, g2, g3, g4 = self.add_objects(4, 'group', 'g_multiple_links')
413
414         self.add_linked_attribute(g1, [u1, u2, u3, u4])
415         self.add_linked_attribute(g2, [u3, u1])
416         self.add_linked_attribute(g3, u2)
417
418         self.assertRaisesLdbError(ldb.ERR_ENTRY_ALREADY_EXISTS,
419                                   "adding duplicate values",
420                                   self.add_linked_attribute, g2,
421                                   [u1, u2, u3, u2])
422
423         self.assert_forward_links(g1, [u1, u2, u3, u4])
424         self.assert_forward_links(g2, [u3, u1])
425         self.assert_forward_links(g3, [u2])
426         self.assert_back_links(u1, [g2, g1])
427         self.assert_back_links(u2, [g3, g1])
428         self.assert_back_links(u3, [g2, g1])
429         self.assert_back_links(u4, [g1])
430
431         self.remove_linked_attribute(g2, [u1, u3])
432         self.remove_linked_attribute(g1, [u1, u3])
433
434         self.assert_forward_links(g1, [u2, u4])
435         self.assert_forward_links(g2, [])
436         self.assert_forward_links(g3, [u2])
437         self.assert_back_links(u1, [])
438         self.assert_back_links(u2, [g3, g1])
439         self.assert_back_links(u3, [])
440         self.assert_back_links(u4, [g1])
441
442         self.add_linked_attribute(g1, [u1, u3])
443         self.add_linked_attribute(g2, [u3, u1])
444         self.add_linked_attribute(g3, [u1, u3])
445
446         self.assert_forward_links(g1, [u1, u2, u3, u4])
447         self.assert_forward_links(g2, [u1, u3])
448         self.assert_forward_links(g3, [u1, u2, u3])
449         self.assert_back_links(u1, [g1, g2, g3])
450         self.assert_back_links(u2, [g3, g1])
451         self.assert_back_links(u3, [g3, g2, g1])
452         self.assert_back_links(u4, [g1])
453
454     def test_la_links_replace(self):
455         u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_replace')
456         g1, g2, g3, g4 = self.add_objects(4, 'group', 'g_replace')
457
458         self.add_linked_attribute(g1, [u1, u2])
459         self.add_linked_attribute(g2, [u1, u3])
460         self.add_linked_attribute(g3, u1)
461
462         self.replace_linked_attribute(g1, [u2])
463         self.replace_linked_attribute(g2, [u2, u3])
464         self.replace_linked_attribute(g3, [u1, u3])
465         self.replace_linked_attribute(g4, [u4])
466
467         self.assert_forward_links(g1, [u2])
468         self.assert_forward_links(g2, [u3, u2])
469         self.assert_forward_links(g3, [u3, u1])
470         self.assert_forward_links(g4, [u4])
471         self.assert_back_links(u1, [g3])
472         self.assert_back_links(u2, [g1, g2])
473         self.assert_back_links(u3, [g2, g3])
474         self.assert_back_links(u4, [g4])
475
476         self.replace_linked_attribute(g1, [u1, u2, u3])
477         self.replace_linked_attribute(g2, [u1])
478         self.replace_linked_attribute(g3, [u2])
479         self.replace_linked_attribute(g4, [])
480
481         self.assert_forward_links(g1, [u1, u2, u3])
482         self.assert_forward_links(g2, [u1])
483         self.assert_forward_links(g3, [u2])
484         self.assert_forward_links(g4, [])
485         self.assert_back_links(u1, [g1, g2])
486         self.assert_back_links(u2, [g1, g3])
487         self.assert_back_links(u3, [g1])
488         self.assert_back_links(u4, [])
489
490         self.assertRaisesLdbError(ldb.ERR_ENTRY_ALREADY_EXISTS,
491                                   "replacing duplicate values",
492                                   self.replace_linked_attribute, g2,
493                                   [u1, u2, u3, u2])
494
495
496     def test_la_links_replace2(self):
497         users = self.add_objects(12, 'user', 'u_replace2')
498         g1, = self.add_objects(1, 'group', 'g_replace2')
499
500         self.add_linked_attribute(g1, users[:6])
501         self.assert_forward_links(g1, users[:6])
502         self.replace_linked_attribute(g1, users)
503         self.assert_forward_links(g1, users)
504         self.replace_linked_attribute(g1, users[6:])
505         self.assert_forward_links(g1, users[6:])
506         self.remove_linked_attribute(g1, users[6:9])
507         self.assert_forward_links(g1, users[9:])
508         self.remove_linked_attribute(g1, users[9:])
509         self.assert_forward_links(g1, [])
510
511     def test_la_links_permutations(self):
512         """Make sure the order in which we add links doesn't matter."""
513         users = self.add_objects(3, 'user', 'u_permutations')
514         groups = self.add_objects(6, 'group', 'g_permutations')
515
516         for g, p in zip(groups, itertools.permutations(users)):
517             self.add_linked_attribute(g, p)
518
519         # everyone should be in every group
520         for g in groups:
521             self.assert_forward_links(g, users)
522
523         for u in users:
524             self.assert_back_links(u, groups)
525
526         for g, p in zip(groups[::-1], itertools.permutations(users)):
527             self.replace_linked_attribute(g, p)
528
529         for g in groups:
530             self.assert_forward_links(g, users)
531
532         for u in users:
533             self.assert_back_links(u, groups)
534
535         for g, p in zip(groups, itertools.permutations(users)):
536             self.remove_linked_attribute(g, p)
537
538         for g in groups:
539             self.assert_forward_links(g, [])
540
541         for u in users:
542             self.assert_back_links(u, [])
543
544     def test_la_links_relaxed(self):
545         """Check that the relax control doesn't mess with linked attributes."""
546         relax_control = ['relax:0']
547
548         users = self.add_objects(10, 'user', 'u_relax')
549         groups = self.add_objects(3, 'group', 'g_relax',
550                                   more_attrs={'member': users[:2]})
551         g_relax1, g_relax2, g_uptight = groups
552
553         # g_relax1 has all users added at once
554         # g_relax2 gets them one at a time in reverse order
555         # g_uptight never relaxes
556
557         self.add_linked_attribute(g_relax1, users[2:5], controls=relax_control)
558
559         for u in reversed(users[2:5]):
560             self.add_linked_attribute(g_relax2, u, controls=relax_control)
561             self.add_linked_attribute(g_uptight, u)
562
563         for g in groups:
564             self.assert_forward_links(g, users[:5])
565
566             self.add_linked_attribute(g, users[5:7])
567             self.assert_forward_links(g, users[:7])
568
569             for u in users[7:]:
570                 self.add_linked_attribute(g, u)
571
572             self.assert_forward_links(g, users)
573
574         for u in users:
575             self.assert_back_links(u, groups)
576
577         # try some replacement permutations
578         import random
579         random.seed(1)
580         users2 = users[:]
581         for i in range(5):
582             random.shuffle(users2)
583             self.replace_linked_attribute(g_relax1, users2,
584                                           controls=relax_control)
585
586             self.assert_forward_links(g_relax1, users)
587
588         for i in range(5):
589             random.shuffle(users2)
590             self.remove_linked_attribute(g_relax2, users2,
591                                          controls=relax_control)
592             self.remove_linked_attribute(g_uptight, users2)
593
594             self.replace_linked_attribute(g_relax1, [], controls=relax_control)
595
596             random.shuffle(users2)
597             self.add_linked_attribute(g_relax2, users2,
598                                       controls=relax_control)
599             self.add_linked_attribute(g_uptight, users2)
600             self.replace_linked_attribute(g_relax1, users2,
601                                           controls=relax_control)
602
603             self.assert_forward_links(g_relax1, users)
604             self.assert_forward_links(g_relax2, users)
605             self.assert_forward_links(g_uptight, users)
606
607         for u in users:
608             self.assert_back_links(u, groups)
609
610     def test_add_all_at_once(self):
611         """All these other tests are creating linked attributes after the
612         objects are there. We want to test creating them all at once
613         using LDIF.
614         """
615         users = self.add_objects(7, 'user', 'u_all_at_once')
616         g1, g3 = self.add_objects(2, 'group', 'g_all_at_once',
617                                   more_attrs={'member': users})
618         (g2,) = self.add_objects(1, 'group', 'g_all_at_once2',
619                                  more_attrs={'member': users[:5]})
620
621         self.assertRaisesLdbError(ldb.ERR_ENTRY_ALREADY_EXISTS,
622                                   "adding multiple duplicate values",
623                                   self.add_objects, 1, 'group',
624                                   'g_with_duplicate_links',
625                                   more_attrs={'member': users[:5] + users[1:2]})
626
627         self.assert_forward_links(g1, users)
628         self.assert_forward_links(g2, users[:5])
629         self.assert_forward_links(g3, users)
630         for u in users[:5]:
631             self.assert_back_links(u, [g1, g2, g3])
632         for u in users[5:]:
633             self.assert_back_links(u, [g1, g3])
634
635         self.remove_linked_attribute(g2, users[0])
636         self.remove_linked_attribute(g2, users[1])
637         self.add_linked_attribute(g2, users[1])
638         self.add_linked_attribute(g2, users[5])
639         self.add_linked_attribute(g2, users[6])
640
641         self.assert_forward_links(g1, users)
642         self.assert_forward_links(g2, users[1:])
643
644         for u in users[1:]:
645             self.remove_linked_attribute(g2, u)
646         self.remove_linked_attribute(g1, users)
647
648         for u in users:
649             self.samdb.delete(u)
650
651         self.assert_forward_links(g1, [])
652         self.assert_forward_links(g2, [])
653         self.assert_forward_links(g3, [])
654
655     def test_one_way_attributes(self):
656         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
657                                   'e_one_way')
658         guid = self.get_object_guid(e2)
659
660         self.add_linked_attribute(e1, e2, attr="addressBookRoots")
661         self.assert_forward_links(e1, [e2], attr='addressBookRoots')
662
663         self.samdb.delete(e2)
664
665         res = self.samdb.search("<GUID=%s>" % guid,
666                                 scope=ldb.SCOPE_BASE,
667                                 controls=['show_deleted:1',
668                                           'show_recycled:1'])
669
670         new_dn = str(res[0].dn)
671         self.assert_forward_links(e1, [new_dn], attr='addressBookRoots')
672         self.assert_forward_links(e1, [new_dn],
673                                   attr='addressBookRoots',
674                                   show_deactivated_link=0)
675
676     def test_one_way_attributes_delete_link(self):
677         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
678                                   'e_one_way')
679         guid = self.get_object_guid(e2)
680
681         self.add_linked_attribute(e1, e2, attr="addressBookRoots")
682         self.assert_forward_links(e1, [e2], attr='addressBookRoots')
683
684         self.remove_linked_attribute(e1, e2, attr="addressBookRoots")
685
686         self.assert_forward_links(e1, [], attr='addressBookRoots')
687         self.assert_forward_links(e1, [], attr='addressBookRoots',
688                                   show_deactivated_link=0)
689
690     def test_pretend_one_way_attributes(self):
691         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
692                                   'e_one_way')
693         guid = self.get_object_guid(e2)
694
695         self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
696         self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
697
698         self.samdb.delete(e2)
699         res = self.samdb.search("<GUID=%s>" % guid,
700                                 scope=ldb.SCOPE_BASE,
701                                 controls=['show_deleted:1',
702                                           'show_recycled:1'])
703
704         new_dn = str(res[0].dn)
705
706         self.assert_forward_links(e1, [], attr='addressBookRoots2')
707         self.assert_forward_links(e1, [], attr='addressBookRoots2',
708                                   show_deactivated_link=0)
709
710     def test_pretend_one_way_attributes_delete_link(self):
711         e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
712                                   'e_one_way')
713         guid = self.get_object_guid(e2)
714
715         self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
716         self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
717
718         self.remove_linked_attribute(e1, e2, attr="addressBookRoots2")
719
720         self.assert_forward_links(e1, [], attr='addressBookRoots2')
721         self.assert_forward_links(e1, [], attr='addressBookRoots2',
722                                   show_deactivated_link=0)
723
724 if "://" not in host:
725     if os.path.isfile(host):
726         host = "tdb://%s" % host
727     else:
728         host = "ldap://%s" % host
729
730
731 TestProgram(module=__name__, opts=subunitopts)