2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
9 sys.path.insert(0, "bin/python")
11 from samba.tests.subunitrun import SubunitOptions, TestProgram
13 import samba.getopt as options
15 from samba.auth import system_session
17 from samba.samdb import SamDB
18 from samba.dcerpc import misc
20 parser = optparse.OptionParser("linked_attributes.py [options] <host>")
21 sambaopts = options.SambaOptions(parser)
22 parser.add_option_group(sambaopts)
23 parser.add_option_group(options.VersionOptions(parser))
24 # use command line creds if available
25 credopts = options.CredentialsOptions(parser)
26 parser.add_option_group(credopts)
27 subunitopts = SubunitOptions(parser)
28 parser.add_option_group(subunitopts)
30 parser.add_option('--delete-in-setup', action='store_true',
31 help="cleanup in setup")
33 parser.add_option('--no-cleanup', action='store_true',
34 help="don't cleanup in teardown")
36 parser.add_option('--no-reveal-internals', action='store_true',
37 help="Only use windows compatible ldap controls")
39 opts, args = parser.parse_args()
47 lp = sambaopts.get_loadparm()
48 creds = credopts.get_credentials(lp)
51 class LATestException(Exception):
55 class LATests(samba.tests.TestCase):
58 super(LATests, self).setUp()
59 self.samdb = SamDB(host, credentials=creds,
60 session_info=system_session(lp), lp=lp)
62 self.base_dn = self.samdb.domain_dn()
63 self.ou = "OU=la,%s" % self.base_dn
64 if opts.delete_in_setup:
66 self.samdb.delete(self.ou, ['tree_delete:1'])
67 except ldb.LdbError, e:
68 print "tried deleting %s, got error %s" % (self.ou, e)
69 self.samdb.add({'objectclass': 'organizationalUnit',
73 super(LATests, self).tearDown()
74 if not opts.no_cleanup:
75 self.samdb.delete(self.ou, ['tree_delete:1'])
77 def delete_user(self, user):
78 self.samdb.delete(user['dn'])
79 del self.users[self.users.index(user)]
81 def add_object(self, cn, objectclass):
82 dn = "CN=%s,%s" % (cn, self.ou)
83 self.samdb.add({'cn': cn,
84 'objectclass': objectclass,
89 def add_objects(self, n, objectclass, prefix=None):
94 dns.append(self.add_object("%s%d" % (prefix, i + 1),
98 def add_linked_attribute(self, src, dest, attr='member'):
100 m.dn = ldb.Dn(self.samdb, src)
101 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
104 def remove_linked_attribute(self, src, dest, attr='member'):
106 m.dn = ldb.Dn(self.samdb, src)
107 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
110 def replace_linked_attribute(self, src, dest, attr='member'):
112 m.dn = ldb.Dn(self.samdb, src)
113 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_REPLACE, attr)
116 def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE,
118 if opts.no_reveal_internals:
119 if 'reveal_internals' in controls:
120 del controls['reveal_internals']
122 controls = ['%s:%d' % (k, int(v)) for k, v in controls.items()]
124 res = self.samdb.search(obj,
130 def assert_links(self, obj, expected, attr, sorted=False, msg='',
132 res = self.attr_search(obj, expected, attr, **kwargs)
134 if len(expected) == 0:
136 self.fail("found attr '%s' in %s" % (attr, res[0]))
140 results = list([x[attr] for x in res][0])
142 self.fail("missing attr '%s' on %s" % (attr, obj))
145 results = set(results)
146 expected = set(expected)
148 if expected != results:
150 print "expected %s" % expected
151 print "received %s" % results
153 self.assertEqual(results, expected)
155 def assert_back_links(self, obj, expected, attr='memberOf', **kwargs):
156 self.assert_links(obj, expected, attr=attr,
157 msg='back links do not match', **kwargs)
159 def assert_forward_links(self, obj, expected, attr='member', **kwargs):
160 self.assert_links(obj, expected, attr=attr,
161 msg='forward links do not match', **kwargs)
163 def get_object_guid(self, dn):
164 res = self.samdb.search(dn,
165 scope=ldb.SCOPE_BASE,
166 attrs=['objectGUID'])
167 return str(misc.GUID(res[0]['objectGUID'][0]))
169 def _test_la_backlinks(self, reveal=False):
174 kwargs = {'reveal_internals': 0}
176 u1, u2 = self.add_objects(2, 'user', 'u_%s' % tag)
177 g1, g2 = self.add_objects(2, 'group', 'g_%s' % tag)
179 self.add_linked_attribute(g1, u1)
180 self.add_linked_attribute(g2, u1)
181 self.add_linked_attribute(g2, u2)
183 self.assert_back_links(u1, [g1, g2], **kwargs)
184 self.assert_back_links(u2, [g2], **kwargs)
186 def test_la_backlinks(self):
187 self._test_la_backlinks()
189 def test_la_backlinks_reveal(self):
190 if opts.no_reveal_internals:
191 print 'skipping because --no-reveal-internals'
193 self._test_la_backlinks(True)
195 def _test_la_backlinks_delete_group(self, reveal=False):
200 kwargs = {'reveal_internals': 0}
202 u1, u2 = self.add_objects(2, 'user', 'u_' + tag)
203 g1, g2 = self.add_objects(2, 'group', 'g_' + tag)
205 self.add_linked_attribute(g1, u1)
206 self.add_linked_attribute(g2, u1)
207 self.add_linked_attribute(g2, u2)
209 self.samdb.delete(g2, ['tree_delete:1'])
211 self.assert_back_links(u1, [g1], **kwargs)
212 self.assert_back_links(u2, set(), **kwargs)
214 def test_la_backlinks_delete_group(self):
215 self._test_la_backlinks_delete_group()
217 def test_la_backlinks_delete_group_reveal(self):
218 if opts.no_reveal_internals:
219 print 'skipping because --no-reveal-internals'
221 self._test_la_backlinks_delete_group(True)
223 def test_links_all_delete_group(self):
224 u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
225 g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
226 g2guid = self.get_object_guid(g2)
228 self.add_linked_attribute(g1, u1)
229 self.add_linked_attribute(g2, u1)
230 self.add_linked_attribute(g2, u2)
232 self.samdb.delete(g2)
233 self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
234 show_deactivated_link=0)
235 self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
236 show_deactivated_link=0)
237 self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
238 show_deactivated_link=0)
239 self.assert_forward_links('<GUID=%s>' % g2guid,
240 [], show_deleted=1, show_recycled=1,
241 show_deactivated_link=0)
243 def test_links_all_delete_group_reveal(self):
244 u1, u2 = self.add_objects(2, 'user', 'u_all_del_group_reveal')
245 g1, g2 = self.add_objects(2, 'group', 'g_all_del_group_reveal')
246 g2guid = self.get_object_guid(g2)
248 self.add_linked_attribute(g1, u1)
249 self.add_linked_attribute(g2, u1)
250 self.add_linked_attribute(g2, u2)
252 self.samdb.delete(g2)
253 self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1,
254 show_deactivated_link=0,
256 self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1,
257 show_deactivated_link=0,
259 self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1,
260 show_deactivated_link=0,
262 self.assert_forward_links('<GUID=%s>' % g2guid,
263 [], show_deleted=1, show_recycled=1,
264 show_deactivated_link=0,
267 def test_la_links_delete_link(self):
268 u1, u2 = self.add_objects(2, 'user', 'u_del_link')
269 g1, g2 = self.add_objects(2, 'group', 'g_del_link')
271 res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
272 attrs=['uSNChanged'])
273 old_usn1 = int(res[0]['uSNChanged'][0])
275 self.add_linked_attribute(g1, u1)
277 res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
278 attrs=['uSNChanged'])
279 new_usn1 = int(res[0]['uSNChanged'][0])
281 self.assertNotEqual(old_usn1, new_usn1, "USN should have incremented")
283 self.add_linked_attribute(g2, u1)
284 self.add_linked_attribute(g2, u2)
286 res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
287 attrs=['uSNChanged'])
288 old_usn2 = int(res[0]['uSNChanged'][0])
290 self.remove_linked_attribute(g2, u1)
292 res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
293 attrs=['uSNChanged'])
294 new_usn2 = int(res[0]['uSNChanged'][0])
296 self.assertNotEqual(old_usn2, new_usn2, "USN should have incremented")
298 self.assert_forward_links(g1, [u1])
299 self.assert_forward_links(g2, [u2])
301 self.add_linked_attribute(g2, u1)
302 self.assert_forward_links(g2, [u1, u2])
303 self.remove_linked_attribute(g2, u2)
304 self.assert_forward_links(g2, [u1])
305 self.remove_linked_attribute(g2, u1)
306 self.assert_forward_links(g2, [])
307 self.remove_linked_attribute(g1, [])
308 self.assert_forward_links(g1, [])
310 def _test_la_links_delete_link_reveal(self):
311 u1, u2 = self.add_objects(2, 'user', 'u_del_link_reveal')
312 g1, g2 = self.add_objects(2, 'group', 'g_del_link_reveal')
314 self.add_linked_attribute(g1, u1)
315 self.add_linked_attribute(g2, u1)
316 self.add_linked_attribute(g2, u2)
318 self.remove_linked_attribute(g2, u1)
320 self.assert_forward_links(g2, [u1, u2], show_deleted=1,
322 show_deactivated_link=0,
326 def test_la_links_delete_link_reveal(self):
327 if opts.no_reveal_internals:
328 print 'skipping because --no-reveal-internals'
330 self._test_la_links_delete_link_reveal()
332 def test_la_links_delete_user(self):
333 u1, u2 = self.add_objects(2, 'user', 'u_del_user')
334 g1, g2 = self.add_objects(2, 'group', 'g_del_user')
336 self.add_linked_attribute(g1, u1)
337 self.add_linked_attribute(g2, u1)
338 self.add_linked_attribute(g2, u2)
340 res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
341 attrs=['uSNChanged'])
342 old_usn1 = int(res[0]['uSNChanged'][0])
344 res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
345 attrs=['uSNChanged'])
346 old_usn2 = int(res[0]['uSNChanged'][0])
348 self.samdb.delete(u1)
350 self.assert_forward_links(g1, [])
351 self.assert_forward_links(g2, [u2])
353 res = self.samdb.search(g1, scope=ldb.SCOPE_BASE,
354 attrs=['uSNChanged'])
355 new_usn1 = int(res[0]['uSNChanged'][0])
357 res = self.samdb.search(g2, scope=ldb.SCOPE_BASE,
358 attrs=['uSNChanged'])
359 new_usn2 = int(res[0]['uSNChanged'][0])
361 # Assert the USN on the alternate object is unchanged
362 self.assertEqual(old_usn1, new_usn1)
363 self.assertEqual(old_usn2, new_usn2)
365 def test_la_links_delete_user_reveal(self):
366 u1, u2 = self.add_objects(2, 'user', 'u_del_user_reveal')
367 g1, g2 = self.add_objects(2, 'group', 'g_del_user_reveal')
369 self.add_linked_attribute(g1, u1)
370 self.add_linked_attribute(g2, u1)
371 self.add_linked_attribute(g2, u2)
373 self.samdb.delete(u1)
375 self.assert_forward_links(g2, [u2],
376 show_deleted=1, show_recycled=1,
377 show_deactivated_link=0,
379 self.assert_forward_links(g1, [],
380 show_deleted=1, show_recycled=1,
381 show_deactivated_link=0,
384 def test_multiple_links(self):
385 u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_multiple_links')
386 g1, g2, g3 = self.add_objects(3, 'group', 'g_multiple_links')
388 self.add_linked_attribute(g1, [u1, u2, u3, u4])
389 self.add_linked_attribute(g2, [u3, u1])
390 self.add_linked_attribute(g3, u2)
392 self.assert_forward_links(g1, [u1, u2, u3, u4])
393 self.assert_forward_links(g2, [u3, u1])
394 self.assert_forward_links(g3, [u2])
395 self.assert_back_links(u1, [g2, g1])
396 self.assert_back_links(u2, [g3, g1])
397 self.assert_back_links(u3, [g2, g1])
398 self.assert_back_links(u4, [g1])
400 self.remove_linked_attribute(g2, [u1, u3])
401 self.remove_linked_attribute(g1, [u1, u3])
403 self.assert_forward_links(g1, [u2, u4])
404 self.assert_forward_links(g2, [])
405 self.assert_forward_links(g3, [u2])
406 self.assert_back_links(u1, [])
407 self.assert_back_links(u2, [g3, g1])
408 self.assert_back_links(u3, [])
409 self.assert_back_links(u4, [g1])
411 self.add_linked_attribute(g1, [u1, u3])
412 self.add_linked_attribute(g2, [u3, u1])
413 self.add_linked_attribute(g3, [u1, u3])
415 self.assert_forward_links(g1, [u1, u2, u3, u4])
416 self.assert_forward_links(g2, [u1, u3])
417 self.assert_forward_links(g3, [u1, u2, u3])
418 self.assert_back_links(u1, [g1, g2, g3])
419 self.assert_back_links(u2, [g3, g1])
420 self.assert_back_links(u3, [g3, g2, g1])
421 self.assert_back_links(u4, [g1])
423 def test_la_links_replace(self):
424 u1, u2, u3, u4 = self.add_objects(4, 'user', 'u_replace')
425 g1, g2, g3, g4 = self.add_objects(4, 'group', 'g_replace')
427 self.add_linked_attribute(g1, [u1, u2])
428 self.add_linked_attribute(g2, [u1, u3])
429 self.add_linked_attribute(g3, u1)
431 self.replace_linked_attribute(g1, [u2])
432 self.replace_linked_attribute(g2, [u2, u3])
433 self.replace_linked_attribute(g3, [u1, u3])
434 self.replace_linked_attribute(g4, [u4])
436 self.assert_forward_links(g1, [u2])
437 self.assert_forward_links(g2, [u3, u2])
438 self.assert_forward_links(g3, [u3, u1])
439 self.assert_forward_links(g4, [u4])
440 self.assert_back_links(u1, [g3])
441 self.assert_back_links(u2, [g1, g2])
442 self.assert_back_links(u3, [g2, g3])
443 self.assert_back_links(u4, [g4])
445 self.replace_linked_attribute(g1, [u1, u2, u3])
446 self.replace_linked_attribute(g2, [u1])
447 self.replace_linked_attribute(g3, [u2])
448 self.replace_linked_attribute(g4, [])
450 self.assert_forward_links(g1, [u1, u2, u3])
451 self.assert_forward_links(g2, [u1])
452 self.assert_forward_links(g3, [u2])
453 self.assert_forward_links(g4, [])
454 self.assert_back_links(u1, [g1, g2])
455 self.assert_back_links(u2, [g1, g3])
456 self.assert_back_links(u3, [g1])
457 self.assert_back_links(u4, [])
460 def test_la_links_replace2(self):
461 users = self.add_objects(12, 'user', 'u_replace2')
462 g1, = self.add_objects(1, 'group', 'g_replace2')
464 self.add_linked_attribute(g1, users[:6])
465 self.assert_forward_links(g1, users[:6])
466 self.replace_linked_attribute(g1, users)
467 self.assert_forward_links(g1, users)
468 self.replace_linked_attribute(g1, users[6:])
469 self.assert_forward_links(g1, users[6:])
470 self.remove_linked_attribute(g1, users[6:9])
471 self.assert_forward_links(g1, users[9:])
472 self.remove_linked_attribute(g1, users[9:])
473 self.assert_forward_links(g1, [])
475 def test_la_links_permutations(self):
476 """Make sure the order in which we add links doesn't matter."""
477 users = self.add_objects(3, 'user', 'u_permutations')
478 groups = self.add_objects(6, 'group', 'g_permutations')
480 for g, p in zip(groups, itertools.permutations(users)):
481 self.add_linked_attribute(g, p)
483 # everyone should be in every group
485 self.assert_forward_links(g, users)
488 self.assert_back_links(u, groups)
490 for g, p in zip(groups[::-1], itertools.permutations(users)):
491 self.replace_linked_attribute(g, p)
494 self.assert_forward_links(g, users)
497 self.assert_back_links(u, groups)
499 for g, p in zip(groups, itertools.permutations(users)):
500 self.remove_linked_attribute(g, p)
503 self.assert_forward_links(g, [])
506 self.assert_back_links(u, [])
508 def test_one_way_attributes(self):
509 e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
511 guid = self.get_object_guid(e2)
513 self.add_linked_attribute(e1, e2, attr="addressBookRoots")
514 self.assert_forward_links(e1, [e2], attr='addressBookRoots')
516 self.samdb.delete(e2)
518 res = self.samdb.search("<GUID=%s>" % guid,
519 scope=ldb.SCOPE_BASE,
520 controls=['show_deleted:1',
523 new_dn = str(res[0].dn)
524 self.assert_forward_links(e1, [new_dn], attr='addressBookRoots')
525 self.assert_forward_links(e1, [new_dn],
526 attr='addressBookRoots',
527 show_deactivated_link=0)
529 def test_one_way_attributes_delete_link(self):
530 e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
532 guid = self.get_object_guid(e2)
534 self.add_linked_attribute(e1, e2, attr="addressBookRoots")
535 self.assert_forward_links(e1, [e2], attr='addressBookRoots')
537 self.remove_linked_attribute(e1, e2, attr="addressBookRoots")
539 self.assert_forward_links(e1, [], attr='addressBookRoots')
540 self.assert_forward_links(e1, [], attr='addressBookRoots',
541 show_deactivated_link=0)
543 def test_pretend_one_way_attributes(self):
544 e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
546 guid = self.get_object_guid(e2)
548 self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
549 self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
551 self.samdb.delete(e2)
552 res = self.samdb.search("<GUID=%s>" % guid,
553 scope=ldb.SCOPE_BASE,
554 controls=['show_deleted:1',
557 new_dn = str(res[0].dn)
559 self.assert_forward_links(e1, [], attr='addressBookRoots2')
560 self.assert_forward_links(e1, [], attr='addressBookRoots2',
561 show_deactivated_link=0)
563 def test_pretend_one_way_attributes_delete_link(self):
564 e1, e2 = self.add_objects(2, 'msExchConfigurationContainer',
566 guid = self.get_object_guid(e2)
568 self.add_linked_attribute(e1, e2, attr="addressBookRoots2")
569 self.assert_forward_links(e1, [e2], attr='addressBookRoots2')
571 self.remove_linked_attribute(e1, e2, attr="addressBookRoots2")
573 self.assert_forward_links(e1, [], attr='addressBookRoots2')
574 self.assert_forward_links(e1, [], attr='addressBookRoots2',
575 show_deactivated_link=0)
577 if "://" not in host:
578 if os.path.isfile(host):
579 host = "tdb://%s" % host
581 host = "ldap://%s" % host
584 TestProgram(module=__name__, opts=subunitopts)