SOURCE_EXTENSIONS = [ '.C', '.cpp', '.cxx', '.cc', '.c', '.m', '.mm' ]
+
def DirectoryOfThisScript():
return os.path.dirname( os.path.abspath( __file__ ) )
creds_user3 = copy.deepcopy(creds)
creds_user4 = copy.deepcopy(creds)
+
class BindTests(samba.tests.TestCase):
info_dc = None
# This should be fixed to work inline with Windows.
# The literal strings are in the case Windows uses.
# Windows appear to preserve casing of the RDN and uppercase the other keys.
+
+
class MatchRulesTests(samba.tests.TestCase):
def setUp(self):
super(MatchRulesTests, self).setUp()
expression="memberOf:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups)
self.assertEqual(len(res1), 0)
+
class MatchRuleConditionTests(samba.tests.TestCase):
def setUp(self):
super(MatchRuleConditionTests, self).setUp()
"@IDX_DN_GUID": [b"GUID"]
}
+
def tempdir():
import tempfile
try:
self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
# Run the SimpleLdb tests against an lmdb backend
+
+
class SimpleLdbLmdb(SimpleLdb):
def setUp(self):
def tearDown(self):
super(SimpleLdbLmdb, self).tearDown()
+
class SearchTests(LdbBaseTest):
def tearDown(self):
shutil.rmtree(self.testdir)
"@IDXATTR": [b"x", b"y", b"ou"]})
self.IDX = True
+
class IndexedCheckSearchTests(IndexedSearchTests):
"""Test searches using the index, to ensure the index doesn't
break things (full scan disabled)"""
self.IDXCHECK = True
super(IndexedCheckSearchTests, self).setUp()
+
class IndexedSearchDnFilterTests(SearchTests):
"""Test searches using the index, to ensure the index doesn't
break things"""
"@IDXATTR": [b"x", b"y", b"ou"]})
self.IDX = True
+
class IndexedAndOneLevelSearchTests(SearchTests):
"""Test searches using the index including @IDXONE, to ensure
the index doesn't break things"""
self.IDX = True
self.IDXONE = True
+
class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
"""Test searches using the index including @IDXONE, to ensure
the index doesn't break things (full scan disabled)"""
self.IDXCHECK = True
super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
+
class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
"""Test searches using the index including @IDXONE, to ensure
the index doesn't break things"""
self.IDX = True
self.IDXONE = True
+
class GUIDIndexedSearchTests(SearchTests):
"""Test searches using the index, to ensure the index doesn't
break things"""
self.IDX = True
self.IDXGUID = True
+
class GUIDAndOneLevelIndexedSearchTests(SearchTests):
"""Test searches using the index including @IDXONE, to ensure
the index doesn't break things"""
self.IDXGUID = True
self.IDXONE = True
+
class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
def setUp(self):
def tearDown(self):
super(AddModifyTestsLmdb, self).tearDown()
+
class IndexedAddModifyTests(AddModifyTests):
"""Test searches using the index, to ensure the index doesn't
break things"""
"x": "z", "y": "a",
"objectUUID": b"0123456789abcde2"})
+
class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
"""Test searches using the index, to ensure the index doesn't
break things"""
self.l.transaction_commit()
super(GUIDTransIndexedAddModifyTests, self).tearDown()
+
class TransIndexedAddModifyTests(IndexedAddModifyTests):
"""Test index behaviour insdie the transaction"""
self.l.transaction_commit()
super(TransIndexedAddModifyTests, self).tearDown()
+
class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
def setUp(self):
def tearDown(self):
super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
+
class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
def setUp(self):
def tearDown(self):
super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
+
class BadIndexTests(LdbBaseTest):
def setUp(self):
super(BadIndexTests, self).setUp()
super(GUIDBadIndexTests, self).setUp()
+
class DnTests(TestCase):
def setUp(self):
dn = ldb.Dn(self.ldb, '')
self.assertTrue(dn.is_null())
+
class LdbMsgTests(TestCase):
def setUp(self):
l = ldb.Ldb(self.filename)
self.assertEqual(["init"], ops)
+
class LdbResultTests(LdbBaseTest):
def setUp(self):
import talloc
import _test_pytalloc
+
def dummy_func():
pass
self.assertFalse(obj1 >= obj2)
self.assertFalse(obj1 > obj2)
+
class TallocBaseComparisonTests(unittest.TestCase):
def test_compare_same(self):
import _tevent
+
class BackendListTests(TestCase):
def test_backend_list(self):
from samba.dcerpc import samr, security
+
def display_lsa_string(str):
return str.string
+
def FillUserInfo(samr, dom_handle, users, level):
"""fill a user array with user information from samrQueryUserInfo"""
for i in range(len(users)):
users[i] = info
samr.Close(user_handle)
+
def toArray((handle, array, num_entries)):
ret = []
for x in range(num_entries):
print "Testing samr_Connect"
return samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
+
def test_LookupDomain(samr, handle, domain):
"""test the samr_LookupDomain interface"""
print "Testing samr_LookupDomain"
return samr.LookupDomain(handle, domain)
+
def test_OpenDomain(samr, handle, sid):
"""test the samr_OpenDomain interface"""
print "Testing samr_OpenDomain"
return samr.OpenDomain(handle, security.SEC_FLAG_MAXIMUM_ALLOWED, sid)
+
def test_EnumDomainUsers(samr, dom_handle):
"""test the samr_EnumDomainUsers interface"""
print "Testing samr_EnumDomainUsers"
for idx, user in users:
print "\t%s\t(%d)" % (user.string, idx)
+
def test_EnumDomainGroups(samr, dom_handle):
"""test the samr_EnumDomainGroups interface"""
print "Testing samr_EnumDomainGroups"
for idx, group in groups:
print "\t%s\t(%d)" % (group.string, idx)
+
def test_domain_ops(samr, dom_handle):
"""test domain specific ops"""
test_EnumDomainUsers(samr, dom_handle)
test_EnumDomainGroups(samr, dom_handle)
+
def test_EnumDomains(samr, handle):
"""test the samr_EnumDomains interface"""
print "Testing samr_EnumDomains"
print "Connecting to " + binding
conn = winreg.winreg(binding, sambaopts.get_loadparm())
+
def list_values(key):
(num_values, max_valnamelen, max_valbufsize) = conn.QueryInfoKey(key, winreg.String())[4:8]
for i in range(num_values):
# printf("\t\t0x%llx (%lld)\n", v.value, v.value)
# }
+
def list_path(key, path):
count = 0
(num_subkeys, max_subkeylen, max_subkeysize) = conn.QueryInfoKey(key, winreg.String())[1:4]
f.close()
MAX_NETBIOS_NAME_LEN = 15
+
+
def is_valid_netbios_char(c):
return (c.isalnum() or c in " !#$%&'()-.@^_{}~")
"""return a DN from a DNS name domain/forest root"""
return "DC=" + ",DC=".join(dnsdomain.split("."))
+
def current_unix_time():
return int(time.time())
+
def string_to_byte_array(string):
blob = [0] * len(string)
return blob
+
def arcfour_encrypt(key, data):
from samba.crypto import arcfour_crypt_blob
return arcfour_crypt_blob(data, key)
# Descriptors of naming contexts and other important objects
+
def sddl2binary(sddl_in, domain_sid, name_map):
sddl = "%s" % sddl_in
sec = security.descriptor.from_sddl(sddl, domain_sid)
return ndr_pack(sec)
+
def get_empty_descriptor(domain_sid, name_map={}):
sddl = ""
return sddl2binary(sddl, domain_sid, name_map)
# "get_schema_descriptor" is located in "schema.py"
+
def get_config_descriptor(domain_sid, name_map={}):
sddl = "O:EAG:EAD:(OA;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;ED)" \
"(OA;;CR;1131f6ab-9c07-11d1-f79f-00c04fc2dcd2;;ED)" \
"(OU;SA;CR;45ec5156-db7e-47bb-b53f-dbeb2d03c40f;;WD)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_config_partitions_descriptor(domain_sid, name_map={}):
sddl = "D:" \
"(A;;LCLORC;;;AU)" \
"(AU;CISA;WPCRCCDCWOWDSDDT;;;WD)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_config_sites_descriptor(domain_sid, name_map={}):
sddl = "D:" \
"(A;;RPLCLORC;;;AU)" \
"(OU;CIIOSA;WP;3e10944c-c354-11d0-aff8-0000f80367c1;b7b13124-b82e-11d0-afee-0000f80367c1;WD)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_config_ntds_quotas_descriptor(domain_sid, name_map={}):
sddl = "D:" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;EA)" \
"(OA;;CR;4ecc03fe-ffc0-4947-b630-eb672a8a9dbc;;WD)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_config_delete_protected1_descriptor(domain_sid, name_map={}):
sddl = "D:AI" \
"(A;;RPLCLORC;;;AU)" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_config_delete_protected1wd_descriptor(domain_sid, name_map={}):
sddl = "D:AI" \
"(A;;RPLCLORC;;;WD)" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_config_delete_protected2_descriptor(domain_sid, name_map={}):
sddl = "D:AI" \
"(A;;RPLCLORC;;;AU)" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_domain_descriptor(domain_sid, name_map={}):
sddl = "O:BAG:BAD:AI(OA;CIIO;RP;4c164200-20c0-11d0-a768-00aa006e0529;4828cc14-1437-45bc-9b07-ad6f015e5f28;RU)" \
"(OA;CIIO;RP;4c164200-20c0-11d0-a768-00aa006e0529;bf967aba-0de6-11d0-a285-00aa003049e2;RU)" \
"(AU;SA;CR;;;DU)(AU;SA;CR;;;BA)(AU;SA;WPWOWD;;;WD)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_domain_infrastructure_descriptor(domain_sid, name_map={}):
sddl = "D:" \
"(A;;RPLCLORC;;;AU)" \
"(AU;SA;WPCR;;;WD)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_domain_builtin_descriptor(domain_sid, name_map={}):
sddl = "D:" \
"(OA;CIIO;RP;4c164200-20c0-11d0-a768-00aa006e0529;4828cc14-1437-45bc-9b07-ad6f015e5f28;RU)" \
"(AU;SA;WPWOWD;;;WD)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_domain_computers_descriptor(domain_sid, name_map={}):
sddl = "D:" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
"S:"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_domain_users_descriptor(domain_sid, name_map={}):
sddl = "D:" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
"S:"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_managed_service_accounts_descriptor(domain_sid, name_map={}):
sddl = "D:" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
"S:"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_domain_controllers_descriptor(domain_sid, name_map={}):
sddl = "D:" \
"(A;;RPLCLORC;;;AU)" \
"(AU;CISA;WP;;;WD)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_domain_delete_protected1_descriptor(domain_sid, name_map={}):
sddl = "D:AI" \
"(A;;RPLCLORC;;;AU)" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_domain_delete_protected2_descriptor(domain_sid, name_map={}):
sddl = "D:AI" \
"(A;;RPLCLORC;;;AU)" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_dns_partition_descriptor(domain_sid, name_map={}):
sddl = "O:SYG:BAD:AI" \
"(OA;CIIO;RP;4c164200-20c0-11d0-a768-00aa006e0529;4828cc14-1437-45bc-9b07-ad6f015e5f28;RU)" \
"(AU;SA;CR;;;DU)(AU;SA;CR;;;BA)(AU;SA;WPWOWD;;;WD)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_dns_forest_microsoft_dns_descriptor(domain_sid, name_map={}):
sddl = "O:SYG:SYD:AI" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
"(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_dns_domain_microsoft_dns_descriptor(domain_sid, name_map={}):
sddl = "O:SYG:SYD:AI" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" \
"(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_paritions_crossref_subdomain_descriptor(domain_sid, name_map={}):
sddl = "O:SubdomainAdminsG:SubdomainAdminsD:AI" \
"(A;;RPWPCRCCLCLORCWOWDSW;;;SubdomainAdmins)" \
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
return sddl2binary(sddl, domain_sid, name_map)
+
def get_wellknown_sds(samdb):
# Then subcontainers
return subcontainers
+
def chunck_acl(acl):
"""Return separate ACE of an ACL
# to overcome the bug in pidl generated python bindings.
#
+
class ARecord(dnsserver.DNS_RPC_RECORD):
def __init__(self, ip_addr, serial=1, ttl=900, rank=dnsp.DNS_RANK_ZONE,
node_flag=0):
from samba.dcerpc.drsuapi import DRSUAPI_ATTID_name
import re
+
class drsException(Exception):
"""Base element for drs errors"""
samba.ensure_third_party_module("dns", "dnspython")
import dns.resolver
+
def uint32(v):
return ctypes.c_uint32(v).value
def import_file(name, location):
return imp.load_source(name, location)
+
def get_gp_ext_from_module(name, mod):
if mod:
for k, v in vars(mod).items():
return v
return None
+
def get_gp_client_side_extensions(logger, smb_conf):
user_exts = []
machine_exts = []
import os.path
from samba.gpclass import gp_ext_setter, gp_inf_ext
+
class inf_to_kdc_tdb(gp_ext_setter):
def mins_to_hours(self):
return '%d' % (int(self.val) / 60)
def __str__(self):
return 'Kerberos Policy'
+
class inf_to_ldb(gp_ext_setter):
'''This class takes the .inf file parameter (essentially a GPO file mapped
to a GUID), hashmaps it to the Samba parameter, which then uses an ldb
def __str__(self):
return 'System Access'
+
class gp_sec_ext(gp_inf_ext):
'''This class does the following two things:
1) Identifies the GPO if it has a certain kind of filepath,
ENFORCE = 2
UNAPPLY = 3
+
class gp_log:
''' Log settings overwritten by gpo apply
The gp_log is an xml file that stores a history of gpo changes (and the
''' Write gp_log changes to disk '''
self.gpostore.store(self.username, etree.tostring(self.gpdb, 'utf-8'))
+
class GPOStorage:
def __init__(self, log_file):
if os.path.isfile(log_file):
def __del__(self):
self.log.close()
+
class gp_ext(object):
__metaclass__ = ABCMeta
def __str__(self):
pass
+
class gp_ext_setter():
__metaclass__ = ABCMeta
def __str__(self):
pass
+
class gp_inf_ext(gp_ext):
@abstractmethod
def list(self, rootpath):
pass
''' Fetch the hostname of a writable DC '''
+
+
def get_dc_hostname(creds, lp):
net = Net(creds=creds, lp=lp)
cldap_ret = net.finddc(domain=lp.get('realm'), flags=(nbt.NBT_SERVER_LDAP |
return cldap_ret.pdc_dns_name
''' Fetch a list of GUIDs for applicable GPOs '''
+
+
def get_gpo_list(dc_hostname, creds, lp):
gpos = []
ads = gpo.ADS_STRUCT(dc_hostname, lp, creds)
return os.path.join(*dirs)
raise OSError(path)
+
def check_refresh_gpo_list(dc_hostname, lp, creds, gpos):
conn = smb.SMB(dc_hostname, 'sysvol', lp=lp, creds=creds, sign=True)
cache_path = lp.cache_path('gpo_cache')
continue
cache_gpo_dir(conn, cache_path, check_safe_path(gpo.file_sys_path))
+
def gpo_version(lp, path):
# gpo.gpo_get_sysvol_gpt_version() reads the GPT.INI from a local file,
# read from the gpo client cache.
gpt_path = lp.cache_path(os.path.join('gpo_cache', path))
return int(gpo.gpo_get_sysvol_gpt_version(gpt_path)[1])
+
def apply_gp(lp, creds, test_ldb, logger, store, gp_extensions):
gp_db = store.get_gplog(creds.get_username())
dc_hostname = get_dc_hostname(creds, lp)
store.store(guid, '%i' % version)
store.commit()
+
def unapply_log(gp_db):
while True:
item = gp_db.apply_log_pop()
else:
break
+
def unapply_gp(lp, creds, test_ldb, logger, store, gp_extensions):
gp_db = store.get_gplog(creds.get_username())
gp_db.state(GPOSTATE.UNAPPLY)
gp_db.delete(str(attr_obj), attr[0])
gp_db.commit()
+
def parse_gpext_conf(smb_conf):
lp = LoadParm()
if smb_conf is not None:
parser.read(ext_conf)
return lp, parser
+
def atomic_write_conf(lp, parser):
ext_conf = lp.state_path('gpext.conf')
with NamedTemporaryFile(delete=False, dir=os.path.dirname(ext_conf)) as f:
parser.write(f)
os.rename(f.name, ext_conf)
+
def check_guid(guid):
# Check for valid guid with curly braces
if guid[0] != '{' or guid[-1] != '}' or len(guid) != 38:
return False
return True
+
def register_gp_extension(guid, name, path,
smb_conf=None, machine=True, user=True):
# Check that the module exists
return True
+
def list_gp_extensions(smb_conf=None):
_, parser = parse_gpext_conf(smb_conf)
results = {}
results[guid]['UserPolicy'] = not int(parser.get(guid, 'NoUserPolicy'))
return results
+
def unregister_gp_extension(guid, smb_conf=None):
if not check_guid(guid):
return False
m2.setdefault(k2, {})[k1] = dist
return m2
+
def full_matrix(rows,
utf8=False,
colour=None,
from __future__ import absolute_import
from .samdb import SamDB
+
class Hostconfig(object):
"""Aggregate object that contains all information about the configuration
of a Samba host."""
import ldb
import samba
+
class IDmapDB(samba.Ldb):
"""The IDmap database."""
import os
import tempfile
+
class DCJoinException(Exception):
def __init__(self, msg):
ctx.do_join()
logger.info("Joined domain %s (SID %s) as a DC" % (ctx.domain_name, ctx.domsid))
+
def join_clone(logger=None, server=None, creds=None, lp=None,
targetdir=None, domain=None, include_secrets=False,
dns_backend="NONE"):
logger.info("Cloned domain %s (SID %s)" % (ctx.domain_name, ctx.domsid))
return ctx
+
def join_subdomain(logger=None, server=None, creds=None, lp=None, site=None,
netbios_name=None, targetdir=None, parent_domain=None, dnsdomain=None,
netbios_domain=None, machinepass=None, adminpass=None, use_ntvfs=False,
import re
+
def __read_folded_line(f, buffer):
"""Read a line from an LDIF file, unfolding it"""
line = buffer
# Will not match options after the attribute type.
attr_type_re = re.compile("^([A-Za-z][A-Za-z0-9-]*):")
+
def __read_raw_entries(f):
"""Read an LDIF entry, only unfolding lines"""
if l == "":
break
+
def fix_dn(dn):
"""Fix a string DN to use ${CONFIGDN}"""
else:
return dn
+
def __write_ldif_one(entry):
"""Write out entry as LDIF"""
out = []
return "\n".join(out)
+
def __transform_entry(entry):
"""Perform required transformations to the Microsoft-provided LDIF"""
return entry
+
def read_ms_ldif(filename):
"""Read and transform Microsoft-provided LDIF file."""
"systemauxiliaryclass", "systemmaycontain", "systemmustcontain",
"systemposssuperiors"])
+
def __read_folded_line(f, buffer):
""" reads a line from an LDIF file, unfolding it"""
line = buffer
else:
return dn
+
def __convert_bitfield(key, value):
"""Evaluate the OR expression in 'value'"""
assert(isinstance(value, string_types))
return str(o)
+
def __write_ldif_one(entry):
"""Write out entry as LDIF"""
out = []
return "\n".join(out)
+
def __transform_entry(entry, objectClass):
"""Perform transformations required to convert the LDIF-like schema
file entries to LDIF, including Samba-specific stuff."""
return entry
+
def __parse_schema_file(filename, objectClass):
"""Load and transform a schema file."""
import markdown
import xml.etree.ElementTree as ET
+
def innertext(tag):
return (tag.text or '') + \
''.join(innertext(e) for e in tag) + \
(tag.tail or '')
+
def read_ms_markdown(in_file, out_folder):
"""Read Github documentation-derived schema files."""
raise TypeError("%r is not a NDR object" % object)
return ndr_print()
+
def ndr_pack_in(object, bigendian=False, ndr64=False):
"""Pack the input of an NDR function object.
import sys, traceback
import textwrap
+
class Option(optparse.Option):
pass
# This help formatter does text wrapping and preserves newlines
+
+
class PlainHelpFormatter(optparse.IndentedHelpFormatter):
def format_description(self, description=""):
desc_width = self.width - self.current_indent
else:
return ""
+
class Command(object):
"""A samba-tool command."""
for msg in res:
self.outf.write("%s\n" % msg.get("samaccountname", idx=0))
+
class cmd_computer_show(Command):
"""Display a computer AD object.
computer_ldif = samdb.write_ldif(msg, ldb.CHANGETYPE_NONE)
self.outf.write(computer_ldif)
+
class cmd_computer_move(Command):
"""Move a computer to an organizational unit/container."""
from samba.dnsserver import ARecord, AAAARecord, PTRRecord, CNameRecord, NSRecord, MXRecord, SOARecord, SRVRecord, TXTRecord
+
def dns_connect(server, lp, creds):
if server.lower() == 'localhost':
server = '127.0.0.1'
action="store_true")
]
+
def get_testparm_var(testparm, smbconf, varname):
errfile = open(os.devnull, 'w')
p = subprocess.Popen([testparm, '-s', '-l',
else:
raise CommandError("invalid argument: '%s' (choose from 'show', 'raise')" % subcommand)
+
class cmd_domain_passwordsettings_show(Command):
"""Display current password settings for the domain."""
self.message("Account lockout threshold (attempts): %d" % cur_account_lockout_threshold)
self.message("Reset account lockout after (mins): %d" % cur_reset_account_lockout_after)
+
class cmd_domain_passwordsettings_set(Command):
"""Set password settings.
msgs.append("All changes applied successfully!")
self.message("\n".join(msgs))
+
class cmd_domain_passwordsettings(SuperCommand):
"""Manage password policy settings."""
subcommands["show"] = cmd_domain_passwordsettings_show()
subcommands["set"] = cmd_domain_passwordsettings_set()
+
class cmd_domain_classicupgrade(Command):
"""Upgrade from Samba classic (NT4-like) database to Samba AD DC database.
hidden = True
+
class LocalDCCredentialsOptions(options.CredentialsOptions):
def __init__(self, parser):
options.CredentialsOptions.__init__(self, parser, special_name="local-dc")
+
class DomainTrustCommand(Command):
"""List domain trusts."""
d.domain_sid, collision_string))
return
+
class cmd_domain_trust_list(DomainTrustCommand):
"""List domain trusts."""
"Name[%s]" % self.netr_DomainTrust_to_name(t)))
return
+
class cmd_domain_trust_show(DomainTrustCommand):
"""Show trusted domain details."""
return
+
class cmd_domain_trust_create(DomainTrustCommand):
"""Create a domain or forest trust."""
self.outf.write("Success.\n")
return
+
class cmd_domain_trust_delete(DomainTrustCommand):
"""Delete a domain trust."""
return
+
class cmd_domain_trust_validate(DomainTrustCommand):
"""Validate a domain trust."""
return
+
class cmd_domain_trust_namespaces(DomainTrustCommand):
"""Manage forest trust namespaces."""
tln=local_tdo_info.domain_name.string)
return
+
class cmd_domain_tombstones_expunge(Command):
"""Expunge tombstones from the database.
subcommands["validate"] = cmd_domain_trust_validate()
subcommands["namespaces"] = cmd_domain_trust_namespaces()
+
class cmd_domain_tombstones(SuperCommand):
"""Domain tombstone and recycled object management."""
subcommands = {}
subcommands["expunge"] = cmd_domain_tombstones_expunge()
+
class ldif_schema_update:
"""Helper class for applying LDIF schema updates"""
return 1
+
class cmd_domain_schema_upgrade(Command):
"""Domain schema upgrading"""
if error_encountered:
raise CommandError('Failed to upgrade schema')
+
class cmd_domain_functional_prep(Command):
"""Domain functional level preparation"""
if error_encountered:
raise CommandError('Failed to perform functional prep')
+
class cmd_domain(SuperCommand):
"""Domain management."""
except Exception as e:
raise CommandError("DRS connection to %s failed" % ctx.server, e)
+
def samdb_connect(ctx):
'''make a ldap connection to the server'''
try:
except Exception as e:
raise CommandError("LDAP connection to %s failed" % ctx.server, e)
+
def drs_errmsg(werr):
'''return "was successful" or an error string'''
(ecode, estring) = werr
DEFAULT_SHOWREPL_FORMAT = 'classic'
+
class cmd_drs_showrepl(Command):
"""Show replication status."""
Option
)
+
class cmd_forest_show(Command):
"""Display forest settings.
except KeyError:
self.outf.write("%s: <NO VALUE>\n" % attr)
+
class cmd_forest_set(Command):
"""Modify forest settings.
objectdn = "CN=Directory Service,CN=Windows NT,CN=Services,CN=Configuration"
attributes = ['dsheuristics']
+
class cmd_forest_set_directory_service_dsheuristics(cmd_forest_set):
"""Set the value of dsheuristics on the Directory Service.
objectdn = "CN=Directory Service,CN=Windows NT,CN=Services,CN=Configuration"
attribute = 'dsheuristics'
+
class cmd_forest_directory_service(SuperCommand):
"""Forest configuration partition management."""
subcommands["show"] = cmd_forest_show_directory_service()
subcommands["dsheuristics"] = cmd_forest_set_directory_service_dsheuristics()
+
class cmd_forest(SuperCommand):
"""Forest management."""
)
from samba.samdb import SamDB
+
def get_fsmo_roleowner(samdb, roledn, role):
"""Gets the owner of an FSMO role
outf.write("This DC already has the '%s' FSMO role\n" % role)
return False
+
def transfer_role(outf, role, samdb):
"""Transfer standard FSMO role. """
outf.write("This DC already has the '%s' FSMO role\n" % role)
return False
+
class cmd_fsmo_seize(Command):
"""Seize the role."""
except CommandError as e:
self.message("%s: * %s" % (long_name, e.message))
+
class cmd_fsmo_transfer(Command):
"""Transfer the role."""
smb.FILE_ATTRIBUTE_ARCHIVE | \
smb.FILE_ATTRIBUTE_HIDDEN
+
def copy_directory_remote_to_local(conn, remotedir, localdir):
if not os.path.isdir(localdir):
os.mkdir(localdir)
for msg in res:
self.outf.write("%s\n" % msg.get("samaccountname", idx=0))
+
class cmd_group_list_members(Command):
"""List all members of an AD group.
except Exception as e:
raise CommandError('Failed to list members of "%s" group ' % groupname, e)
+
class cmd_group_move(Command):
"""Move a group to an organizational unit/container.
self.outf.write('Moved group "%s" into "%s"\n' %
(groupname, full_new_parent_dn))
+
class cmd_group_show(Command):
"""Display a group AD object.
user_ldif = samdb.write_ldif(msg, ldb.CHANGETYPE_NONE)
self.outf.write(user_ldif)
+
class cmd_group(SuperCommand):
"""Group management."""
global summary
summary = {}
+
class LDAPBase(object):
def __init__(self, host, creds, lp,
except KeyError:
pass
+
class Descriptor(object):
def __init__(self, connection, dn, outf=sys.stdout, errf=sys.stderr):
self.outf = outf
res += 8 * " " + ace + "\n"
return (self_aces == [] and other_aces == [], res)
+
class LDAPObject(object):
def __init__(self, connection, dn, summary, filter_list,
outf=sys.stdout, errf=sys.stderr):
from samba.netcmd import SuperCommand
+
class cache_loader(dict):
"""
We only load subcommand tools if they are actually used.
Command,
)
+
class cmd_time(Command):
"""Retrieve the time on a server.
if dosinfo:
self.outf.write(ndr_print(dosinfo))
+
class cmd_ntacl_get(Command):
"""Get ACLs of a file."""
synopsis = "%prog <file> [options]"
lp.get("realm").lower(), samdb.domain_dn(),
lp, use_ntvfs=use_ntvfs)
+
class cmd_ntacl_sysvolcheck(Command):
"""Check sysvol ACLs match defaults (including correct ACLs on GPOs)."""
synopsis = "%prog <file> [options]"
from samba import dsdb
from operator import attrgetter
+
class cmd_rename(Command):
"""Rename an organizational unit.
self.outf.write('Renamed ou "%s" to "%s"\n' % (full_old_ou_dn,
full_new_ou_dn))
+
class cmd_move(Command):
"""Move an organizational unit.
self.outf.write('Moved ou "%s" into "%s"\n' %
(full_old_ou_dn, full_new_parent_dn))
+
class cmd_create(Command):
"""Create an organizational unit.
self.outf.write('Created ou "%s"\n' % full_ou_dn)
+
class cmd_listobjects(Command):
"""List all objects in an organizational unit.
raise CommandError('Failed to list contents of ou "%s"' %
full_ou_dn, e)
+
class cmd_list(Command):
"""List all organizational units.
msg.dn.remove_base_components(len(domain_dn))
self.outf.write("%s\n" % str(msg.dn))
+
class cmd_delete(Command):
"""Delete an organizational unit.
from samba.netcmd import Command, CommandError, Option
from samba.messaging import Messaging
+
class cmd_processes(Command):
"""List processes (to aid debugging on systems without setproctitle)."""
from samba.drs_utils import drs_Replicate
import sys
+
class RODCException(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return "%s: %s" % (self.__class__.__name__, self.value)
+
class NamingError(RODCException):
pass
+
class ReplicationError(RODCException):
pass
+
class cmd_rodc_preload(Command):
"""Preload accounts for an RODC. Multiple accounts may be requested."""
Option
)
+
class cmd_schema_attribute_modify(Command):
"""Modify attribute settings in the schema partition.
samdb.set_schema_update_now()
self.outf.write("modified %s" % attr_dn)
+
class cmd_schema_attribute_show(Command):
"""Show details about an attribute from the schema.
user_ldif = samdb.write_ldif(res[0], ldb.CHANGETYPE_NONE)
self.outf.write(user_ldif)
+
class cmd_schema_attribute_show_oc(Command):
"""Show what objectclasses MAY or MUST contain an attribute.
user_ldif = samdb.write_ldif(msg, ldb.CHANGETYPE_NONE)
self.outf.write(user_ldif)
+
class cmd_schema_attribute(SuperCommand):
"""Query and manage attributes in the schema partition."""
subcommands = {}
subcommands["show"] = cmd_schema_attribute_show()
subcommands["show_oc"] = cmd_schema_attribute_show_oc()
+
class cmd_schema_objectclass(SuperCommand):
"""Query and manage objectclasses in the schema partition."""
subcommands = {}
subcommands["show"] = cmd_schema_objectclass_show()
+
class cmd_schema(SuperCommand):
"""Schema querying and management."""
"set-site": cmd_sites_subnet_set_site(),
}
+
class cmd_sites(SuperCommand):
"""Sites management."""
subcommands = {}
import samba.getopt as options
from samba.netcmd import Command, CommandError, Option
+
class cmd_testparm(Command):
"""Syntax check the configuration file."""
except ImportError as e:
pass
+
def check_random():
if get_random_bytes_fn is not None:
return None
return "Crypto.Random or M2Crypto.Rand required"
+
def get_random_bytes(num):
random_reason = check_random()
if random_reason is not None:
raise ImportError(random_reason)
return get_random_bytes_fn(num)
+
def get_crypt_value(alg, utf8pw, rounds=0):
algs = {
"5": {"length": 43},
# i.e. options = "rounds=20;other=ignored;" will return 20
# if the rounds option is not found or the value is not a number, 0 is returned
# which indicates that the default number of rounds should be used.
+
+
def get_rounds(options):
if not options:
return 0
if len(disabled_virtual_attributes) != 0:
virtual_attributes_help += "Unsupported virtual attributes: %s" % ", ".join(sorted(disabled_virtual_attributes.keys()))
+
class cmd_user_create(Command):
"""Create a new user.
raise CommandError("%s: %s" % (command, msg))
self.outf.write("Changed password OK\n")
+
class GetPasswordCommand(Command):
def __init__(self):
return password_attrs
+
class cmd_user_getpassword(GetPasswordCommand):
"""Get the password fields of a user/computer account.
self.outf.write("%s" % ldif)
self.outf.write("Got password OK\n")
+
class cmd_user_syncpasswords(GetPasswordCommand):
"""Sync the password of user accounts.
update_pid(None)
return
+
class cmd_user_edit(Command):
"""Modify User AD object.
self.outf.write("Modified User '%s' successfully\n" % username)
+
class cmd_user_show(Command):
"""Display a user AD object.
user_ldif = samdb.write_ldif(msg, ldb.CHANGETYPE_NONE)
self.outf.write(user_ldif)
+
class cmd_user_move(Command):
"""Move a user to an organizational unit/container.
self.outf.write('Moved user "%s" into "%s"\n' %
(username, full_new_parent_dn))
+
class cmd_user(SuperCommand):
"""User management."""
else:
raise XattrBackendError("Invalid xattr backend choice %s" %backend)
+
def getdosinfo(lp, file):
try:
attribute = samba.xattr_native.wrap_getxattr(file,
return ndr_unpack(xattr.DOSATTRIB, attribute)
+
def getntacl(lp, file, backend=None, eadbfile=None, direct_db_access=True, service=None):
if direct_db_access:
(backend_obj, dbname) = checkset_backend(lp, backend, eadbfile)
return names
+
def make_smbconf(smbconf, hostname, domain, realm, targetdir,
serverrole=None, eadb=False, use_ntvfs=False, lp=None,
global_param=None):
privilege_ldb.erase()
privilege_ldb.load_ldif_file_add(setup_path("provision_privilege.ldif"))
+
def setup_encrypted_secrets_key(path):
"""Setup the encrypted secrets key file.
POLICIES_ACL = "O:LAG:BAD:P(A;OICI;0x001f01ff;;;BA)(A;OICI;0x001200a9;;;SO)(A;OICI;0x001f01ff;;;SY)(A;OICI;0x001200a9;;;AU)(A;OICI;0x001301bf;;;PA)"
SYSVOL_SERVICE = "sysvol"
+
def set_dir_acl(path, acl, lp, domsid, use_ntvfs, passdb, service=SYSVOL_SERVICE):
setntacl(lp, path, acl, domsid, use_ntvfs=use_ntvfs, skip_invalid_chown=True, passdb=passdb, service=service)
for root, dirs, files in os.walk(path, topdown=False):
# Set acls on Policy folder and policies folders
set_gpos_acl(sysvol, dnsdomain, domainsid, domaindn, samdb, lp, use_ntvfs, passdb=s4_passdb)
+
def acl_type(direct_db_access):
if direct_db_access:
return "DB"
else:
return "VFS"
+
def check_dir_acl(path, acl, lp, domainsid, direct_db_access):
fsacl = getntacl(lp, path, direct_db_access=direct_db_access, service=SYSVOL_SERVICE)
fsacl_sddl = fsacl.as_sddl(domainsid)
else:
samdb.transaction_commit()
+
def directory_create_or_exists(path, mode=0o755):
if not os.path.exists(path):
try:
else:
raise ProvisioningError("Failed to create directory %s: %s" % (path, e.strerror))
+
def determine_host_ip(logger, lp, hostip=None):
if hostip is None:
logger.info("Looking up IPv4 addresses")
return hostip
+
def determine_host_ip6(logger, lp, hostip6=None):
if hostip6 is None:
logger.info("Looking up IPv6 addresses")
return hostip6
+
def provision(logger, session_info, smbconf=None,
targetdir=None, samdb_fill=FILL_FULL, realm=None, rootdn=None,
domaindn=None, schemadn=None, configdn=None, serverdn=None,
from samba import is_heimdal_built
import os
+
def create_kdc_conf(kdcconf, realm, domain, logdir):
if is_heimdal_built():
from samba.samdb import get_default_backend_store
+
def get_domainguid(samdb, domaindn):
res = samdb.search(base=domaindn, scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
domainguid = str(ndr_unpack(misc.GUID, res[0]["objectGUID"][0]))
msg["dnsRecord"] = ldb.MessageElement(record, ldb.FLAG_MOD_ADD, "dnsRecord")
samdb.add(msg)
+
def add_at_record(samdb, container_dn, prefix, hostname, dnsdomain, hostip, hostip6):
fqdn_hostname = "%s.%s" % (hostname, dnsdomain)
from samba.dcerpc.dnsp import DNS_TYPE_NS, DNS_TYPE_A, DNS_TYPE_AAAA, \
DNS_TYPE_CNAME, DNS_TYPE_SRV, DNS_TYPE_PTR
+
class DemoteException(Exception):
"""Base element for demote errors"""
if remove_sysvol_obj:
remove_sysvol_references(samdb, logger, dc_name)
+
def offline_remove_ntds_dc(samdb,
logger,
ntds_dn,
import samba.samba3.passdb
from samba.samba3 import param as s3param
+
def fetch_uint32(db, key):
try:
data = db[key]
# idmap version determines auto-conversion
IDMAP_VERSION_V2 = 2
+
class IdmapDatabase(DbDatabase):
"""Samba 3 ID map database reader."""
def get_default_backend_store():
return "tdb"
+
class SamDB(samba.Ldb):
"""The SAM database."""
from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL
import os
+
def get_schema_descriptor(domain_sid, name_map={}):
sddl = "O:SAG:SAD:AI(OA;;CR;e12b56b6-0a95-11d1-adbb-00c04fd8d5cd;;SA)" \
"(OA;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;ED)" \
from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, LdbError
from sites import SiteNotFoundException
+
class SubnetException(Exception):
"""Base element for Subnet errors"""
pass
samdb.delete(dnsubnet)
+
def rename_subnet(samdb, configDn, subnet_name, new_name):
"""Rename a subnet.
else:
raise
+
def set_subnet_site(samdb, configDn, subnet_name, site_name):
"""Assign a subnet to a site.
import subprocess
import os
+
def tdb_copy(file1, file2, readonly=False):
"""Copy tdb file using tdbbackup utility and rename it
"""
HEXDUMP_FILTER = bytearray([x if ((len(repr(chr(x))) == 3) and (x < 127)) else ord('.') for x in range(256)])
+
class TestCase(unittest.TestCase):
"""A Samba test case."""
cmdline_credentials = None
+
class RpcInterfaceTestCase(TestCase):
"""DCE/RPC Test case."""
return s
+
class BlackboxTestCase(TestCaseInTempDir):
"""Base test case for blackbox tests."""
(num, errstr) = error.args
assert num == ldb.ERR_NO_SUCH_OBJECT, "ldb.delete() failed: %s" % errstr
+
def create_test_ou(samdb, name):
"""Creates a unique OU for the test"""
from samba import auth
import samba.tests
+
class AuthSystemSessionTests(samba.tests.TestCase):
def setUp(self):
self.assertTrue(self.system_session.security_token.is_system())
self.assertFalse(self.system_session.security_token.is_anonymous())
+
class AuthAdminSessionTests(samba.tests.TestCase):
def setUp(self):
import ldb
import shutil, os
+
class SambaDnsUpdateTests(samba.tests.BlackboxTestCase):
"""Blackbox test case for samba_dnsupdate."""
from samba import arcfour_encrypt, string_to_byte_array
from samba.tests import TestCase, TestCaseInTempDir
+
class SubstituteVarTestCase(TestCase):
def test_empty(self):
self.assertRaises(Exception, samba.check_all_substituted,
"Not subsituted: ${FOOBAR}")
+
class ArcfourTestCase(TestCase):
def test_arcfour_direct(self):
crypt_calculated = arcfour_encrypt(key, plain)
self.assertEquals(crypt_expected, crypt_calculated)
+
class StringToByteArrayTestCase(TestCase):
def test_byte_array(self):
calculated = string_to_byte_array('\xda\x91Z\xb0l\xd7\xb9\xcf\x99')
self.assertEquals(expected, calculated)
+
class LdbExtensionTests(TestCaseInTempDir):
def test_searchone(self):
from samba.compat import PY3
from samba.dcerpc import misc
+
class CredentialsTests(samba.tests.TestCaseInTempDir):
def setUp(self):
import talloc
import gc
+
class ArrayTests(samba.tests.TestCase):
def setUp(self):
from samba.dcerpc import ClientConnection
import samba.tests
+
class BareTestCase(samba.tests.TestCase):
def test_bare(self):
from samba.netcmd.dns import ARecord, AAAARecord, PTRRecord, CNameRecord, NSRecord, MXRecord, SRVRecord, TXTRecord
from samba import sd_utils, descriptor
+
class DnsserverTests(RpcInterfaceTestCase):
@classmethod
from samba.dcerpc import server_id, misc, srvsvc, samr
import samba.tests
+
class IntegerTests(samba.tests.TestCase):
def test_uint32_into_hyper(self):
global_ndr_print = False
global_hexdump = False
+
class TestDCERPC_BIND(RawDCERPCTest):
def setUp(self):
from samba.tests import TestCase
from samba.ndr import ndr_pack, ndr_unpack, ndr_unpack_out
+
class RawDCERPCTest(TestCase):
"""A raw DCE/RPC Test case."""
talloc.enable_null_tracking()
+
class RpcTests(object):
'''test type behaviour of pidl generated python RPC code'''
from samba.dcerpc import unixinfo
from samba.tests import RpcInterfaceTestCase
+
class UnixinfoTests(RpcInterfaceTestCase):
def setUp(self):
from samba import tests
from samba.param import LoadParm
+
def open_bytes(filename):
if sys.version_info[0] == 3:
return open(filename, errors='ignore')
else:
return open(filename, 'rb')
+
class DCKeytabTests(tests.TestCase):
def setUp(self):
super(DCKeytabTests, self).setUp()
creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
+
def make_txt_record(records):
rdata_txt = dns.txt_record()
s_list = dnsp.string_list()
VERBOSE = False
+
def debug(msg):
if VERBOSE:
sys.stdout.flush()
t = Timer(timeout, self.really_handle, [data, socket])
t.start()
+
def main():
global SERVER_ID
host, port, SERVER_ID = sys.argv[1:]
import subprocess
import xml.etree.ElementTree as ET
+
class TestCase(samba.tests.TestCaseInTempDir):
def _format_message(self, parameters, message):
yield name, default_text, context, param_type
p.close()
+
class SmbDotConfTests(TestCase):
# defines the cases where the defaults may differ from the documentation
import samba
import uuid
+
class DsdbTests(TestCase):
def setUp(self):
str(part_dn) + "," + str(domain_dn)),
self.samdb.normalize_dn_in_domain(part_dn))
+
class DsdbFullScanTests(TestCase):
def setUp(self):
import gc
import time
+
class DsdbLockTestCase(SamDBTestCase):
def test_db_lock1(self):
basedn = self.samdb.get_default_basedn()
import samba.tests
+
class SchemaAttributesTestCase(samba.tests.TestCase):
def setUp(self):
from samba import gensec, auth
import samba.tests
+
class GensecTests(samba.tests.TestCase):
def setUp(self):
)
import samba.tests
+
class KerberosOptionTests(samba.tests.TestCase):
def test_parse_true(self):
dspath = 'CN=Policies,CN=System,DC=addom,DC=samba,DC=example,DC=com'
gpt_data = '[General]\nVersion=%d'
+
class GPOTests(tests.TestCase):
def setUp(self):
super(GPOTests, self).setUp()
from samba.dcerpc import drsuapi, misc, dns
from samba.credentials import Credentials
+
def get_logger(name="subunit"):
"""Get a logger object."""
import logging
logger.addHandler(logging.StreamHandler(sys.stderr))
return logger
+
class JoinTestCase(DNSTKeyTest):
def setUp(self):
self.server = samba.tests.env_get_var_value("SERVER")
'CN=LOCALVAMPIREDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
}
+
class KCCTests(samba.tests.TestCase):
def setUp(self):
super(KCCTests, self).setUp()
MACHINE_NAME = "krb5credstest"
+
class PyKrb5CredentialsTests(TestCase):
def setUp(self):
import sys
import os
+
class LibsmbTestCase(samba.tests.TestCase):
class OpenClose(threading.Thread):
# the python bindings for LoadParm objects map (by default) to a single global
# object in the underlying C code. E.g. if we create 2 different LoadParm
# objects in python, really they're just the same object underneath.
+
+
class LoadParmTest(TestCaseInTempDir):
def test_global_loadparm(self):
for samba.dcerpc.lsa.String
"""
+
class LsaStringTests(TestCase):
def test_default_constructor(self):
from samba import NTSTATUSError, ntstatus
import ctypes
+
class NetJoinTests(samba.tests.TestCaseInTempDir):
def setUp(self):
from samba import NTSTATUSError, ntstatus
import ctypes
+
class NetJoinNoSpnegoTests(samba.tests.TestCaseInTempDir):
def setUp(self):
import samba, os, random, sys
from samba import netbios
+
class NetBiosTests(samba.tests.TestCase):
def setUp(self):
super(NetBiosTests, self).setUp()
from samba.netcmd.main import cmd_sambatool
import samba.tests
+
class NetCmdTestCase(samba.tests.TestCase):
def run_netcmd(self, cmd_klass, args, retcode=0):
Tests whether the netlogon service is running
"""
+
class NetlogonServiceTests(TestCase):
def setUp(self):
Tests behaviour when NTLM is disabled
"""
+
class NtlmDisabledTests(TestCase):
def setUp(self):
import pypamtest
import os
+
class SimplePamTests(samba.tests.TestCase):
def test_authenticate(self):
domain = os.environ["DOMAIN"]
import pypamtest
import os
+
class PasswordExpirePamTests(samba.tests.TestCase):
def test_auth_expire_warning(self):
domain = os.environ["DOMAIN"]
# Get named package from the passed supplemental credentials
#
# returns the package and it's position within the supplemental credentials
+
+
def get_package(sc, name):
if sc is None:
return None
# Calculate the MD5 password digest from the supplied user, realm and password
#
+
+
def calc_digest(user, realm, password):
data = "%s:%s:%s" % (user, realm, password)
from samba.dcerpc import drsblobs
import binascii
+
class PassWordHashFl2008Tests(PassWordHashTests):
def setUp(self):
from samba.tests.pso import PasswordSettings
import samba
+
class PassWordHashGpgmeTests(PassWordHashTests):
def setUp(self):
import binascii
import os
+
def attid_equal(a1, a2):
return (a1 & 0xffffffff) == (a2 & 0xffffffff)
+
class PassWordHashLDAPTests(PassWordHashTests):
def setUp(self):
from samba import check_password_quality
from samba.tests import TestCase, TestCaseInTempDir
+
class PasswordQualityTests(TestCase):
def test_check_password_quality(self):
self.assertFalse(check_password_quality(""),
import samba.tests
from samba.samdb import SamDB
+
class PasswordCommon:
@staticmethod
return auth.user_session(self.samdb, lp_ctx=self.lp, dn=dn,
session_info_flags=flags)
+
class UnixSessionedPosixAclMappingTests(PosixAclMappingTests):
"""
Run same test suite with session enabled.
import samba.tests
from samba.tests import env_loadparm, TestCase
+
def create_dummy_secretsdb(path, lp=None):
"""Create a dummy secrets database for use in tests.
if os.path.exists(secrets_tdb_path):
os.unlink(secrets_tdb_path)
+
class FindNssTests(TestCase):
"""Test findnss() function."""
MACHINE_NAME = "PCTM"
USER_NAME = "PCTU"
+
class PyCredentialsTests(TestCase):
def setUp(self):
#
# Build the logon data required by NetrLogonSamLogonWithFlags
+
+
def samlogon_logon_info(domain_name, computer_name, creds,
flags=CLI_CRED_NTLMv2_AUTH):
#
# Build the samlogon target info.
+
+
def samlogon_target(domain_name, computer_name):
target_info = ntlmssp.AV_PAIR_LIST()
target_info.count = 3
if os.path.exists(DATADIR):
break
+
class IdmapDbTestCase(TestCase):
def setUp(self):
break
return open(os.path.join(datadir, filename), 'r').read()
+
def ldb_debug(l, text):
print(text)
from samba.ndr import ndr_unpack, ndr_pack
from samba.dcerpc import dnsp
+
class ComputerCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool computer subcommands"""
computers = []
from samba.dcerpc import dnsp
from samba.tests.samba_tool.base import SambaToolCmdTest
+
class DnsCmdTestCase(SambaToolCmdTest):
def setUp(self):
super(DnsCmdTestCase, self).setUp()
import ldb
from samba.tests.samba_tool.base import SambaToolCmdTest
+
class ForestCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool dsacl subcommands"""
samdb = None
import os, ldb
from samba.tests.samba_tool.base import SambaToolCmdTest
+
class FsmoCmdTestCase(SambaToolCmdTest):
"""Test for samba-tool fsmo show subcommand"""
from samba.tests.samba_tool.base import SambaToolCmdTest
import shutil
+
def has_difference(path1, path2, binary=True, xml=True, sortlines=False):
"""Use this function to determine if the GPO backup differs from another.
return None
+
class GpoCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool time subcommands"""
dsdb
)
+
class GroupCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool group subcommands"""
groups = []
import os, ldb
from samba.tests.samba_tool.base import SambaToolCmdTest
+
class JoinCmdTestCase(SambaToolCmdTest):
"""Test for samba-tool fsmo show subcommand"""
from samba.tests.samba_tool.base import SambaToolCmdTest
import random
+
class NtACLCmdSysvolTestCase(SambaToolCmdTest):
"""Tests for samba-tool ntacl sysvol* subcommands"""
self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertEquals(out, "", "Shouldn't be any output messages")
+
class NtACLCmdGetSetTestCase(SambaToolCmdTest):
"""Tests for samba-tool ntacl get/set subcommands"""
import ldb
from samba.tests.samba_tool.base import SambaToolCmdTest
+
class OUCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool ou subcommands"""
ous = []
import os
from samba.tests.samba_tool.base import SambaToolCmdTest
+
class ProcessCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool process subcommands"""
import os
import shutil
+
class ProvisionPasswordTestCase(SambaToolCmdTest):
"""Test for password validation in domain provision subcommand"""
from samba.credentials import Credentials
from samba.auth import system_session
+
class RodcCmdTestCase(SambaToolCmdTest):
def setUp(self):
super(RodcCmdTestCase, self).setUp()
import ldb
from samba.tests.samba_tool.base import SambaToolCmdTest
+
class SchemaCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool dsacl subcommands"""
samdb = None
from time import localtime, strptime, mktime
from samba.tests.samba_tool.base import SambaToolCmdTest
+
class TimeCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool time subcommands"""
from samba.ndr import ndr_unpack
from samba.dcerpc import drsblobs
+
class UserCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool user subcommands"""
users = []
dsdb
)
+
class UserCheckPwdTestCase(SambaToolCmdTest):
"""Tests for samba-tool user subcommands"""
users = []
# Get the value of an attribute from the output string
# Note: Does not correctly handle values spanning multiple lines,
# which is acceptable for it's usage in these tests.
+
+
def _get_attribute(out, name):
p = re.compile("^" + name + ":\s+(\S+)")
for line in out.split("\n"):
return m.group(1)
return ""
+
class UserCmdCryptShaTestCase(SambaToolCmdTest):
"""
Tests for samba-tool user subcommands generation of the virtualCryptSHA256
# Calculate the MD5 password digest from the supplied user, realm and password
#
+
+
def calc_digest(user, realm, password):
data = "%s:%s:%s" % (user, realm, password)
if isinstance(data, text_type):
test_dir = os.path.join(addom, 'testing_%d' % random.randint(0, 0xFFFF))
test_file = os.path.join(test_dir, 'testing').replace('/', '\\')
+
class SMBTests(samba.tests.TestCase):
def setUp(self):
super(SMBTests, self).setUp()
import samba.tests
from samba import strcasecmp_m, strstr_m
+
def signum(a):
if a < 0:
return -1
b.encode('utf-8'))),
expect)
+
class strstr_m_Tests(samba.tests.TestCase):
"""strstr_m tests in simple ASCII and unicode strings"""
import samba.tests
from samba.dcerpc import security
+
def dummymessage(a=None, b=None):
pass
smb_conf_path = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "ad_dc_ntvfs", "etc/smb.conf")
+
class UpgradeProvisionBasicLdbHelpersTestCase(TestCaseInTempDir):
"""Some simple tests for individual functions in the provisioning code.
"""
import shutil
import os
+
class XattrTests(TestCase):
def _tmpfilename(self):
'Could not add posix attrs for AD entry for sid=%s, (%s)',
str(sid), str(e))
+
def add_ad_posix_idmap_entry(samdb, sid, xid, xid_type, logger):
"""Create idmap entry
for (value_name, (value_type, value_data)) in samba3_regdb.values(key).items():
key_handle.set_value(value_name, value_type, value_data)
+
def get_posix_attr_from_ldap_backend(logger, ldb_object, base_dn, user, attr):
"""Get posix attributes from a samba3 ldap backend
:param ldbs: a list of ldb connection objects
paths = provision_paths_from_lp(lp, lp.get("realm"))
return paths
+
def update_policyids(names, samdb):
"""Update policy ids that could have changed after sam update
"oEMInformation")
samdb.modify(delta)
+
def update_gpo(paths, samdb, names, lp, message):
"""Create missing GPO file object if needed
"""
if not os.path.isdir(dir):
create_gpo_struct(dir)
+
def increment_calculated_keyversion_number(samdb, rootdn, hashDns):
"""For a given hash associating dn and a number, this function will
update the replPropertyMetaData of each dn in the hash, so that the
samdb.set_attribute_replmetadata_version(str(e.dn),
"unicodePwd",
version, True)
+
+
def delta_update_basesamdb(refsampath, sampath, creds, session, lp, message):
"""Update the provision container db: sam.ldb
This function is aimed for alpha9 and newer;
expr = "%s)" %expr
return expr
+
def update_machine_account_password(samdb, secrets_ldb, names):
"""Update (change) the password of the current DC both in the SAM db and in
secret one
raise ProvisioningError("Unable to find a Secure Channel"
"of type SEC_CHAN_BDC")
+
def update_dns_account_password(samdb, secrets_ldb, names):
"""Update (change) the password of the dns both in the SAM db and in
secret one
secrets_ldb.modify(msg)
+
def update_krbtgt_account_password(samdb):
"""Update (change) the password of the krbtgt account
samdb.modify(msg)
+
def search_constructed_attrs_stored(samdb, rootdn, attrs):
"""Search a given sam DB for calculated attributes that are
still stored in the db.
return hashAtt
+
def findprovisionrange(samdb, basedn):
""" Find ranges of usn grouped by invocation id and then by timestamp
rouned at 1 minute
return (hash_id, nb_obj)
+
def print_provision_ranges(dic, limit_print, dest, samdb_path, invocationid):
""" print the differents ranges passed as parameter
ldif = "dn: @PROVISION\nprovisionnerID: %s\n%s" % (invocationid, ldif)
open(file, 'w').write(ldif)
+
def int64range2str(value):
"""Display the int64 range stored in value as xxx-yyy
from __future__ import print_function
+
def render_placeholder(environ, start_response):
"""Send the user a simple placeholder about missing SWAT."""
status = '200 OK'
'fail': [("fail", 'echo failing && /bin/false', "text/plain")]
}
+
def do_print(msg):
print("%s" % msg)
sys.stdout.flush()
sys.stderr.flush()
+
def run_cmd(cmd, dir=".", show=None, output=False, checkfail=True):
if show is None:
show = options.verbose
os.dup2(0, 1)
os.dup2(0, 2)
+
def write_pidfile(fname):
'''write a pid file, cleanup on exit'''
f = open(fname, mode='w')
(rebase_remote, rebase_branch),
show=True, dir=test_master)
+
def push_to(push_url, push_branch="master"):
push_remote = "pushto"
do_print("Pushing to %s" % push_url)
parser.add_option("", "--restrict-tests", help="run as make test with this TESTS= regex",
default='')
+
def send_email(subject, text, log_tar):
if options.email is None:
do_print("not sending email because the recipient is not set")
s.set_debuglevel(1)
s.quit()
+
def email_failure(status, failed_task, failed_stage, failed_tag, errstr,
elapsed_time, log_base=None, add_log_tail=True):
'''send an email to options.email about the failure'''
% (options.branch, platform.node(), failed_task, failed_stage),
text, logs)
+
def email_success(elapsed_time, log_base=None):
'''send an email to options.email about a successful build'''
user = os.getenv("USER")
else:
return call(cmd, shell=True, cwd=dir)
+
def find_git_root():
'''get to the top of the git repo'''
p = os.getcwd()
f.write("exit 0\n")
f.close()
+
def cleanup():
run_cmd("git bisect reset", dir=gitroot)
os.unlink(f.name)
if options.output is None:
parser.error("No output file specified")
+
def iterate_all(path):
"""Iterate and yield all the parameters.
"ustring" : "_STRING",
}
+
def generate_functions(path_in, path_out):
f = open(path_out, 'w')
try:
'ustring' : 'char *',
}
+
def make_s3_param_proto(path_in, path_out):
file_out = open(path_out, 'w')
try:
finally:
file_out.close()
+
def get_header(path):
header = os.path.basename(path).upper()
header = header.replace(".", "_").replace("\\", "_").replace("-", "_")
return "__%s__" % header
+
def make_param_defs(path_in, path_out, scope):
file_out = open(path_out, 'w')
try:
"ustring" : "P_USTRING",
}
+
def make_param_table(path_in, path_out):
file_out = open(path_out, 'w')
try:
import subprocess
import sys
+
def srcdir():
return os.path.normpath(os.getenv("SRCDIR", os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")))
+
def source4dir():
return os.path.normpath(os.path.join(srcdir(), "source4"))
+
def source3dir():
return os.path.normpath(os.path.join(srcdir(), "source3"))
+
def bindir():
return os.path.normpath(os.getenv("BINDIR", "./bin"))
+
def binpath(name):
return os.path.join(bindir(), name)
'testsuite-success', 'testsuite-error',
'uxsuccess', 'testsuite-uxsuccess'])
+
class TestsuiteEnabledTestResult(unittest.TestResult):
def start_testsuite(self, name):
from Configure import conf
+
@conf
def CHECK_SAMBA3_CHARSET(conf, crossbuild=False):
'''Check for default charsets for Samba3
wbinfo = sys.argv[1]
netcmd = sys.argv[2]
+
def flush_cache(sids=[], uids=[], gids=[]):
for sid in sids:
os.system(netcmd + (" cache del IDMAP/SID2XID/%s" % (sid)))
for gids in gids:
os.system(netcmd + (" cache del IDMAP/GID2SID/%s" % (gid)))
+
def fill_cache(inids, idtype='gid'):
for inid in inids:
if inid is None:
# Check the list produced by the sids-to-xids call with the
# singular variant (sid-to-xid) for each sid in turn.
+
+
def check_singular(sids, ids, idtype='gid'):
i = 0
for sid in sids:
# Check the list produced by the sids-to-xids call with the
# multiple variant (sid-to-xid) for each sid in turn.
+
+
def check_multiple(sids, idtypes):
sids2xids = subprocess.Popen([wbinfo, '--sids-to-unix-ids=' + ','.join(sids)],
stdout=subprocess.PIPE).communicate()[0].strip()
'--option=torture:writetimeupdatedelay=500000',
])
+
def plansmbtorture4testsuite(name, env, options, description=''):
if description == '':
modname = "samba3.%s" % (name, )
import sys
from optparse import OptionParser
+
class ReadChildError(Exception):
pass
+
class WriteChildError(Exception):
pass
+
def readLine(pipe):
"""readLine(pipe) -> str
Read a line from the child's pipe, returns the string read.
return buf[:newline]
+
def writeLine(pipe, buf):
"""writeLine(pipe, buf) -> nul
Write a line to the child's pipe.
raise WriteChildError()
os.write(pipe, "\n")
+
def parseCommandLine():
"""parseCommandLine() -> (opts, ntlm_auth_path)
Parse the command line.
else:
objectclass = None
+
def uniq_list(alist):
"""return a unique list"""
set = {}
schema_base = rootDse["schemaNamingContext"][0]
+
def possible_inferiors_search(db, oc):
"""return the possible inferiors via a search for the possibleInferiors attribute"""
res = db.search(base=schema_base,
classinfo[oc]["SUPCLASSES"] = list
return list
+
def auxclasses(classinfo, oclist):
list = []
if oclist == []:
list.extend(list2)
return list
+
def subclasses(classinfo, oclist):
list = []
for oc in oclist:
list.extend(classinfo[oc]["SUBCLASSES"])
return list
+
def posssuperiors(classinfo, oclist):
list = []
for oc in oclist:
list.extend(list2)
return list
+
def pull_classinfo(db):
"""At startup we build a classinfo[] dictionary that holds all the information needed to construct the possible inferiors"""
classinfo = {}
return classinfo
+
def is_in_list(list, c):
for a in list:
if c == a:
return True
return False
+
def possible_inferiors_constructed(db, classinfo, c):
list = []
for oc in classinfo:
list.sort()
return list
+
def test_class(db, classinfo, oc):
"""test to see if one objectclass returns the correct possibleInferiors"""
print("test: objectClass.%s" % oc)
else:
print("success: objectClass.%s" % oc)
+
def get_object_classes(db):
"""return a list of all object classes"""
list = []
# Tests start here
#
+
class AclTests(samba.tests.TestCase):
def setUp(self):
del self.ldb_admin
# tests on ldap add operations
+
+
class AclAddTests(AclTests):
def setUp(self):
self.fail()
# tests on ldap modify operations
+
+
class AclModifyTests(AclTests):
def setUp(self):
self.fail()
# enable these when we have search implemented
+
+
class AclSearchTests(AclTests):
def setUp(self):
# tests on ldap delete operations
+
+
class AclDeleteTests(AclTests):
def setUp(self):
self.fail()
# tests on ldap rename operations
+
+
class AclRenameTests(AclTests):
def setUp(self):
(num, _) = e34.args
self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+
class AclExtendedTests(AclTests):
def setUp(self):
self.assertEqual(len(res), 1)
self.assertTrue("nTSecurityDescriptor" in res[0].keys())
+
class AclUndeleteTests(AclTests):
def setUp(self):
(num, _) = e38.args
self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
+
class AclSPNTests(AclTests):
def setUp(self):
test_number = 0
active_links = set()
+
class UserTests(samba.tests.TestCase):
def add_if_possible(self, *args, **kwargs):
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
+
class UserTests(samba.tests.TestCase):
def setUp(self):
class PerfTestException(Exception):
pass
+
class UserTests(samba.tests.TestCase):
def setUp(self):
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
+
class BaseDeleteTests(samba.tests.TestCase):
def GUID_string(self, guid):
(num, _) = e15.args
self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
+
class BasicTreeDeleteTests(BasicDeleteTests):
def setUp(self):
# Tests start here
#
+
class DirsyncBaseTests(samba.tests.TestCase):
def setUp(self):
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
+
class BasicTests(samba.tests.TestCase):
def setUp(self):
self.assertTrue(len(res[0]["msTSExpireDate"]) == 1)
self.assertEquals(res[0]["msTSExpireDate"][0], v_get)
+
class BaseDnTests(samba.tests.TestCase):
def setUp(self):
# set SCALE = 100 for normal test, or 1 for testing the test.
SCALE = 100
+
class UserTests(samba.tests.TestCase):
def get_file_blob(self, filename):
creds = credopts.get_credentials(lp)
creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+
class LDAPNotificationTest(samba.tests.TestCase):
def setUp(self):
# Tests start here
#
+
class PasswordTests(password_lockout_base.BasePasswordTestCase):
def setUp(self):
self.host = host
import time
+
class BasePasswordTestCase(PasswordTestCase):
def _open_samr_user(self, res):
self.assertTrue("objectSid" in res[0])
# Tests start here
#
+
class PasswordTests(PasswordTestCase):
def setUp(self):
import password_lockout_base
+
def passwd_encode(pw):
return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
return res[0]['serverReference'][0]
+
class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
counter = itertools.count(1).next
userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
msDSUserAccountControlComputed=0)
+
class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
counter = itertools.count(1).next
self._test_multiple_logon(self.lockout1ntlm_creds)
+
def main():
global RODC, RWDC, CREDS, LP
parser = optparse.OptionParser(
creds = credopts.get_credentials(lp)
creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+
class SamTests(samba.tests.TestCase):
def setUp(self):
# Tests start here
#
+
class DescriptorTests(samba.tests.TestCase):
def get_users_domain_dn(self, name):
# Default descriptor tests #####################################################################
+
class OwnerGroupDescriptorTests(DescriptorTests):
def deleteAll(self):
########################################################################################
# Inheritance tests for DACL
+
class DaclDescriptorTests(DescriptorTests):
def deleteAll(self):
controls=["extended_dn:1:0", "sd_flags:1:0", "search_options:1:1"])
self.assertFalse("nTSecurityDescriptor" in res[0])
+
class RightsAttributesTests(DescriptorTests):
def deleteAll(self):
self.assertTrue("displayName" in res[0]["allowedAttributesEffective"])
self.assertTrue("managedBy" in res[0]["allowedAttributesEffective"])
+
class SdAutoInheritTests(DescriptorTests):
def deleteAll(self):
delete_force(self.ldb_admin, self.sub_dn)
# Tests start here
#
+
class SitesBaseTests(samba.tests.TestCase):
def setUp(self):
creds = credopts.get_credentials(lp)
creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+
def closure(vSet, wSet, aSet):
for edge in aSet:
start, end = edge
wSet.add(end)
closure(vSet, wSet, aSet)
+
class StaticTokenTest(samba.tests.TestCase):
def setUp(self):
print("difference : %s" % sidset1.difference(sidset2))
self.fail(msg="calculated groups don't match against user PAC tokenGroups")
+
class DynamicTokenTest(samba.tests.TestCase):
def get_creds(self, target_username, target_password):
self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
self._expected_user_restore_metadata())
+
class RestoreUserPwdObjectTestCase(RestoredObjectAttributesBaseTestCase):
"""Test cases for delete/reanimate user objects with password"""
self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
self._expected_userpw_restore_metadata())
+
class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
"""Test different scenarios for delete/reanimate group objects"""
_swig_property = property
except NameError:
pass # Python < 2.2 doesn't have 'property'.
+
+
def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
if (name == "thisown"): return self.this.own(value)
if (name == "this"):
else:
raise AttributeError("You cannot add attributes to %s" % self)
+
def _swig_setattr(self, class_type, name, value):
return _swig_setattr_nondynamic(self, class_type, name, value, 0)
+
def _swig_getattr(self, class_type, name):
if (name == "thisown"): return self.this.own()
method = class_type.__swig_getmethods__.get(name, None)
if method: return method(self)
raise AttributeError(name)
+
def _swig_repr(self):
try: strthis = "proxy of " + self.this.__repr__()
except: strthis = ""
WBEM_ConnectServer = _wmi.WBEM_ConnectServer
+
+
class IUnknown(object):
thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
__repr__ = _swig_repr
IUnknown_swigregister = _wmi.IUnknown_swigregister
IUnknown_swigregister(IUnknown)
+
class IWbemServices(object):
thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
__repr__ = _swig_repr
IWbemServices_swigregister = _wmi.IWbemServices_swigregister
IWbemServices_swigregister(IWbemServices)
+
class IEnumWbemClassObject(object):
thisown = _swig_property(lambda x: x.this.own(), lambda x, v: x.this.own(v), doc='The membership flag')
__repr__ = _swig_repr
subgraph = {}
+
def add_deps(node):
if node in graph and node not in subgraph:
subgraph[node] = graph[node]
opts = parser.parse_args()[0]
+
def printdirsync(ctl):
arr = ctl.split(':')
if arr[0] == 'dirsync':
return (pfm.ctr, pfm_schi)
+
def _samdb_fetch_schi(samdb):
"""Fetch schemaInfo stored in SamDB using LDB connection"""
res = samdb.search(base=samdb.get_schema_basedn(), expression="", scope=SCOPE_BASE, attrs=["*"])
pfm_schi.marker = 0xFF;
return pfm_schi
+
def _drs_fetch_pfm(server, samdb, creds, lp):
"""Fetch prefixMap using DRS interface"""
binding_str = "ncacn_ip_tcp:%s[print,seal]" % server
pfm.num_mappings -= 1
return (pfm, pfm_schi)
+
def _pfm_verify(drs_pfm, ldb_pfm):
errors = []
if drs_pfm.num_mappings != ldb_pfm.num_mappings:
errors.append("[%2d] differences in (%s)" % (i, it_err))
return errors
+
def _pfm_schi_verify(drs_schi, ldb_schi):
errors = []
print(drs_schi.revision)
from ldif import LDIFWriter
+
class globals:
def __init__(self):
self.global_objs = {}
continue
self.global_objs = {}
+
def attid_equal(a1, a2):
return (a1 & 0xffffffff) == (a2 & 0xffffffff)
# Tests start here
#
+
class SpeedTest(samba.tests.TestCase):
def find_domain_sid(self, ldb):
for dn in dn_list:
delete_force(self.ldb_admin, dn)
+
class SpeedTestAddDel(SpeedTest):
def setUp(self):
"""
self.run_bundle(10000)
+
class AclSearchSpeedTest(SpeedTest):
def setUp(self):
print("OPTIONS %s" % " ".join(smbtorture4_options), file=sys.stderr)
+
def plansmbtorture4testsuite(name, env, options, modname=None):
return selftesthelpers.plansmbtorture4testsuite(name, env, options,
target='samba4', modname=modname)
subunitrun = valgrindify(python) + " " + os.path.join(samba4srcdir, "scripting/bin/subunitrun")
if extra_python is not None:
subunitrun3 = valgrindify(extra_python) + " " + os.path.join(samba4srcdir, "scripting/bin/subunitrun")
+
+
def planoldpythontestsuite(env, module, name=None, extra_path=[], environ={}, extra_args=[], py3_compatible=False):
environ = dict(environ)
py_path = list(extra_path)
from samba.dcerpc import drsuapi
+
class DrsCracknamesTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsCracknamesTestCase, self).setUp()
)
from samba.compat import cmp_fn
+
class DrsBaseTestCase(SambaToolCmdTest):
"""Base class implementation for all DRS python tests.
It is intended to provide common initialization and
import drs_base
+
class DrsFsmoTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
from samba.compat import cmp_to_key_fn
from samba.compat import cmp_fn
+
def _linked_attribute_compare(la1, la2):
"""See CompareLinks() in MS-DRSR section 4.1.10.5.17"""
la1, la1_target = la1
self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+
class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaPrefixMapTestCase, self).setUp()
pfm.ctr.num_mappings += 1
return pfm.ctr
+
class DrsReplicaSyncSortTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaSyncSortTestCase, self).setUp()
from samba.dcerpc import drsuapi, security
from samba.credentials import DONT_USE_KERBEROS
+
class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
"""Confirm the behaviour of DsGetNCChanges for unprivileged users"""
import time
+
class LATestException(Exception):
pass
import drs_base, ldb
from samba.dcerpc.drsuapi import *
+
class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase):
def _ds_bind(self, server_name):
import random
import time
+
def drs_get_rodc_partial_attribute_set(samdb, samdb1, exceptions=[]):
'''get a list of attributes for RODC replication'''
partial_attribute_set = drsuapi.DsPartialAttributeSet()
partial_attribute_set.num_attids = len(attids)
return partial_attribute_set
+
class DrsRodcTestCase(drs_base.DrsBaseTestCase):
"""Intended as a semi-black box test case for replication involving
an RODC."""
from samba.drs_utils import drs_DsBind
from samba import dsdb
+
class DrsReplSchemaTestCase(drs_base.DrsBaseTestCase):
# prefix for all objects created
from ldb import (
SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT)
+
class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
"""Intended as a black box test case for DsReplicaSync
implementation. It should test the behavior of this
from ldb import (
SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT)
+
class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
"""Intended as a black box test case for DsReplicaSync
implementation. It should test the behavior of this
from samba.ndr import ndr_pack
from samba.dcerpc import security
+
class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
"""Intended as a semi-black box test case for DsGetNCChanges
implementation for extended operations. It should be testing
import ldb
import drs_base
+
class SambaToolDrsTests(drs_base.DrsBaseTestCase):
"""Blackbox test case for samba-tool drs."""
# Tests start here
#
+
class Libnet_SetPwdTest(samba.tests.TestCase):
########################################################################################
import optparse
import wintest
+
def set_libpath(t):
t.putenv("LD_LIBRARY_PATH", "${PREFIX}/lib")
+
def set_krb5_conf(t):
t.run_cmd("mkdir -p ${PREFIX}/etc")
t.write_file("${PREFIX}/etc/krb5.conf",
t.putenv("KRB5_CONFIG", '${PREFIX}/etc/krb5.conf')
+
def build_s3(t):
'''build samba3'''
t.info('Building s3')
t.run_cmd('rm -rf ${PREFIX}')
t.run_cmd('make install')
+
def start_s3(t):
t.info('Starting Samba3')
t.chdir("${PREFIX}")
t.run_cmd(['sbin/smbd', "-D"])
t.port_wait("${INTERFACE_IP}", 139)
+
def test_wbinfo(t):
t.info('Testing wbinfo')
t.chdir('${PREFIX}')
child.sendline("cd ..")
child.sendline("rmdir testdir")
+
def create_shares(t):
t.info("Adding test shares")
t.chdir('${PREFIX}')
panic action = xterm -e gdb --pid %d
''')
+
def join_as_member(t, vm):
'''join a windows domain as a member server'''
t.setwinvars(vm)
t.cmd_contains("host -t A ${HOSTNAME}.${WIN_REALM}",
['${HOSTNAME}.${WIN_REALM} has address'])
+
def create_root_account(t, vm):
t.setwinvars(vm)
t.info("Creating 'root' account for testing Samba3 member server")
child.sendline("user edit disabled root no")
child.expect("Set root's disabled flag")
+
def test_join_as_member(t, vm):
'''test the domain join'''
t.setwinvars(vm)
import sys, os
import wintest, pexpect, time, subprocess
+
def set_krb5_conf(t):
t.putenv("KRB5_CONFIG", '${PREFIX}/private/krb5.conf')
+
def build_s4(t):
'''build samba4'''
t.info('Building s4')
'--option', 'panic action=gnome-terminal -e "gdb --pid %d"', '--option', 'max protocol=nt1'])
t.port_wait("${INTERFACE_IP}", 139)
+
def test_smbclient(t):
'''test smbclient against localhost'''
t.info('Testing smbclient')
t.cmd_contains("host -t A ${HOSTNAME}.${LCREALM}",
['${HOSTNAME}.${LCREALM} has address'])
+
def test_kerberos(t):
'''test that kerberos is OK'''
t.info("Testing kerberos")
t.run_winjoin(t, "${LCREALM}")
+
def test_winjoin(t, vm):
t.info("Checking the windows join is OK")
smbclient = t.getvar("smbclient")
child = t.open_telnet("${WIN_HOSTNAME}", "${WIN_DOMAIN}\\administrator", "${WIN_PASS}", set_time=True)
t.get_ipconfig(child)
+
def join_as_dc(t, vm):
'''join a windows domain as a DC'''
t.setwinvars(vm)
import optparse
import sys, os, time, re
+
class wintest():
'''testing of Samba against windows VMs'''