#
# Unit tests for dirsync control
# Copyright (C) Matthieu Patou <mat@matws.net> 2011
-#
+# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import sys
sys.path.insert(0, "bin/python")
import samba
-samba.ensure_external_module("testtools", "testtools")
-samba.ensure_external_module("subunit", "subunit/python")
+from samba.tests.subunitrun import TestProgram, SubunitOptions
import samba.getopt as options
import base64
from ldb import LdbError, SCOPE_BASE
from ldb import Message, MessageElement, Dn
from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
-from samba.dcerpc import security, misc, drsblobs
+from samba.dcerpc import security, misc, drsblobs, security
from samba.ndr import ndr_unpack, ndr_pack
from samba.auth import system_session
from samba import gensec, sd_utils
from samba.samdb import SamDB
-from samba.credentials import Credentials
+from samba.credentials import Credentials, DONT_USE_KERBEROS
import samba.tests
from samba.tests import delete_force
-from subunit.run import SubunitTestRunner
-import unittest
parser = optparse.OptionParser("dirsync.py [options] <host>")
sambaopts = options.SambaOptions(parser)
# use command line creds if available
credopts = options.CredentialsOptions(parser)
parser.add_option_group(credopts)
+subunitopts = SubunitOptions(parser)
+parser.add_option_group(subunitopts)
opts, args = parser.parse_args()
if len(args) < 1:
parser.print_usage()
sys.exit(1)
-host = args[0]
+host = args.pop()
if not "://" in host:
ldaphost = "ldap://%s" % host
ldapshost = "ldaps://%s" % host
def setUp(self):
super(DirsyncBaseTests, self).setUp()
- self.ldb_admin = ldb
- self.base_dn = ldb.domain_dn()
- self.domain_sid = security.dom_sid(ldb.get_domain_sid())
- self.user_pass = "samba123@AAA"
+ self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
+ self.base_dn = self.ldb_admin.domain_dn()
+ self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
+ self.user_pass = samba.generate_random_password(12, 16)
self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
- self.sd_utils = sd_utils.SDUtils(ldb)
+ self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
#used for anonymous login
print "baseDN: %s" % self.base_dn
creds_tmp.set_workstation(creds.get_workstation())
creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
| gensec.FEATURE_SEAL)
+ creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
return ldb_target
self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
- mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
+ mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
+ str(user_sid))
self.sd_utils.dacl_add_ace(self.base_dn, mod)
# add admins to the Domain Admins group
- self.ldb_admin.add_remove_group_members("Domain Admins", self.admin_user,
+ self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
add_members_operation=True)
def tearDown(self):
self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
try:
self.ldb_admin.deletegroup("testgroup")
- except:
+ except Exception:
pass
#def test_dirsync_errors(self):
-
def test_dirsync_supported(self):
"""Test the basic of the dirsync is supported"""
self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
self.ldb_simple.search(self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
- except LdbError,l:
+ except LdbError as l:
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
def test_parentGUID_referrals(self):
def test_ok_not_rootdc(self):
"""Test if it's ok to do dirsync on another NC that is not the root DC"""
- try:
- res = self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
- expression="samaccountname=*",
- controls=["dirsync:1:0:1"])
- except:
- self.assertTrue(False)
+ self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
+ expression="samaccountname=*",
+ controls=["dirsync:1:0:1"])
def test_dirsync_errors(self):
"""Test if dirsync returns the correct LDAP errors in case of pb"""
self.ldb_simple.search(self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
- except LdbError,l:
+ except LdbError as l:
print l
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
self.ldb_simple.search("CN=Users,%s" % self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
- except LdbError,l:
+ except LdbError as l:
print l
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
self.ldb_simple.search("CN=Users,%s" % self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:1:1"])
- except LdbError,l:
+ except LdbError as l:
print l
self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
- except LdbError,l:
+ except LdbError as l:
print l
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
self.ldb_admin.search("CN=Users,%s" % self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
- except LdbError,l:
+ except LdbError as l:
print l
self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
self.ldb_admin.search("CN=Users,%s" % self.base_dn,
expression="samaccountname=*",
controls=["dirsync:1:1:1"])
- except LdbError,l:
+ except LdbError as l:
print l
self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
-
-
-
def test_dirsync_attributes(self):
"""Check behavior with some attributes """
res = self.ldb_admin.search(self.base_dn,
# We don't return an entry if asked for objectGUID
res = self.ldb_admin.search(self.base_dn,
- expression="dn=%s" % self.base_dn,
+ expression="(distinguishedName=%s)" % str(self.base_dn),
attrs=["objectGUID"],
controls=["dirsync:1:0:1"])
self.assertEquals(len(res.msgs), 0)
# a request on the root of a NC didn't return parentGUID
res = self.ldb_admin.search(self.base_dn,
- expression="dn=%s" % self.base_dn,
+ expression="(distinguishedName=%s)" % str(self.base_dn),
attrs=["name"],
controls=["dirsync:1:0:1"])
self.assertTrue(res.msgs[0].get("objectGUID") != None)
def test_dirsync_with_controls(self):
"""Check that dirsync return correct informations when dealing with the NC"""
res = self.ldb_admin.search(self.base_dn,
- expression="(dn=%s)" % str(self.base_dn),
+ expression="(distinguishedName=%s)" % str(self.base_dn),
attrs=["name"],
controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
def test_dirsync_basenc(self):
"""Check that dirsync return correct informations when dealing with the NC"""
res = self.ldb_admin.search(self.base_dn,
- expression="(dn=%s)" % str(self.base_dn),
+ expression="(distinguishedName=%s)" % str(self.base_dn),
attrs=["name"],
controls=["dirsync:1:0:10000"])
self.assertEqual(len(res.msgs), 1)
self.assertEqual(len(res.msgs[0]), 3)
res = self.ldb_admin.search(self.base_dn,
- expression="(dn=%s)" % str(self.base_dn),
+ expression="(distinguishedName=%s)" % str(self.base_dn),
attrs=["ntSecurityDescriptor"],
controls=["dirsync:1:0:10000"])
self.assertEqual(len(res.msgs), 1)
ctl[2] = "1"
ctl[3] = "10000"
control1 = str(":".join(ctl))
- self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
+ self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
add_members_operation=True)
res = self.ldb_simple.search(self.base_dn,
control1 = str(":".join(ctl))
# remove the user from the group
- self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
+ self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
add_members_operation=False)
res = self.ldb_simple.search(self.base_dn,
self.assertEqual(len(res[0].get("member")), size )
self.ldb_admin.newgroup("testgroup")
- self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
+ self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
add_members_operation=True)
res = self.ldb_admin.search(self.base_dn,
ctl[3] = "10000"
control1 = str(":".join(ctl))
- self.ldb_admin.add_remove_group_members("testgroup", self.simple_user,
+ self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
add_members_operation=False)
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=controls)
+
class ExtendedDirsyncTests(SimpleDirsyncTests):
+
def test_dirsync_linkedattributes(self):
flag_incr_linked = 2147483648
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
ctl[2] = "%d" % flag_incr_linked
ctl[3] = "10000"
control1 = str(":".join(ctl))
- self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
+ self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
add_members_operation=True)
- self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
+ self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
add_members_operation=True)
control1 = str(":".join(ctl))
# remove the user from the group
- self.ldb_admin.add_remove_group_members("Administrators", self.simple_user,
+ self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
add_members_operation=False)
res = self.ldb_admin.search(self.base_dn,
ctl[3] = "10000"
control2 = str(":".join(ctl))
- self.ldb_admin.add_remove_group_members("Administrators", self.dirsync_user,
+ self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
add_members_operation=False)
res = self.ldb_admin.search(self.base_dn,
self.assertEqual(str(res[0].dn), "")
-ldb = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
+if not getattr(opts, "listtests", False):
+ lp = sambaopts.get_loadparm()
+ samba.tests.cmdline_credentials = credopts.get_credentials(lp)
-runner = SubunitTestRunner()
-rc = 0
-#
-if not runner.run(unittest.makeSuite(SimpleDirsyncTests)).wasSuccessful():
- rc = 1
-if not runner.run(unittest.makeSuite(ExtendedDirsyncTests)).wasSuccessful():
- rc = 1
-sys.exit(rc)
+TestProgram(module=__name__, opts=subunitopts)