for x in range(num_entries):
ret.append((array.entries[x].idx, array.entries[x].name))
return ret
-
+
def test_Connect(samr):
"""test the samr_Connect interface"""
samr.Close(dom_handle)
if len(sys.argv) != 2:
- print "Usage: samr.js <BINDING>"
- sys.exit(1)
+ print "Usage: samr.js <BINDING>"
+ sys.exit(1)
binding = sys.argv[1]
# This help formatter does text wrapping and preserves newlines
class PlainHelpFormatter(optparse.IndentedHelpFormatter):
def format_description(self,description=""):
- desc_width = self.width - self.current_indent
- indent = " "*self.current_indent
- paragraphs = description.split('\n')
- wrapped_paragraphs = [
- textwrap.fill(p,
- desc_width,
- initial_indent=indent,
- subsequent_indent=indent)
- for p in paragraphs]
- result = "\n".join(wrapped_paragraphs) + "\n"
- return result
+ desc_width = self.width - self.current_indent
+ indent = " "*self.current_indent
+ paragraphs = description.split('\n')
+ wrapped_paragraphs = [
+ textwrap.fill(p,
+ desc_width,
+ initial_indent=indent,
+ subsequent_indent=indent)
+ for p in paragraphs]
+ result = "\n".join(wrapped_paragraphs) + "\n"
+ return result
def format_epilog(self, epilog):
if epilog:
undetermined_max_args = False
for i, arg in enumerate(self.takes_args):
if arg[-1] != "?" and arg[-1] != "*":
- min_args += 1
+ min_args += 1
if arg[-1] == "+" or arg[-1] == "*":
- undetermined_max_args = True
+ undetermined_max_args = True
else:
- max_args += 1
+ max_args += 1
if (len(args) < min_args) or (not undetermined_max_args and len(args) > max_args):
parser.print_usage()
return -1
return ""
try:
- import samba.dckeytab
+ import samba.dckeytab
except ImportError:
- cmd_domain_export_keytab = None
+ cmd_domain_export_keytab = None
else:
- class cmd_domain_export_keytab(Command):
- """Dump Kerberos keys of the domain into a keytab."""
+ class cmd_domain_export_keytab(Command):
+ """Dump Kerberos keys of the domain into a keytab."""
- synopsis = "%prog <keytab> [options]"
+ synopsis = "%prog <keytab> [options]"
- takes_optiongroups = {
- "sambaopts": options.SambaOptions,
- "credopts": options.CredentialsOptions,
- "versionopts": options.VersionOptions,
- }
+ takes_optiongroups = {
+ "sambaopts": options.SambaOptions,
+ "credopts": options.CredentialsOptions,
+ "versionopts": options.VersionOptions,
+ }
- takes_options = [
- Option("--principal", help="extract only this principal", type=str),
- ]
+ takes_options = [
+ Option("--principal", help="extract only this principal", type=str),
+ ]
- takes_args = ["keytab"]
+ takes_args = ["keytab"]
- def run(self, keytab, credopts=None, sambaopts=None, versionopts=None, principal=None):
- lp = sambaopts.get_loadparm()
- net = Net(None, lp)
- net.export_keytab(keytab=keytab, principal=principal)
+ def run(self, keytab, credopts=None, sambaopts=None, versionopts=None, principal=None):
+ lp = sambaopts.get_loadparm()
+ net = Net(None, lp)
+ net.export_keytab(keytab=keytab, principal=principal)
class cmd_domain_info(Command):
takes_options.extend(common_provision_join_options)
if samba.is_ntvfs_fileserver_built():
- takes_options.extend(common_ntvfs_options)
+ takes_options.extend(common_ntvfs_options)
takes_args = ["domain", "role?"]
def netr_DomainTrust_to_name(self, t):
if t.trust_type == lsa.LSA_TRUST_TYPE_DOWNLEVEL:
- return t.netbios_name
+ return t.netbios_name
return t.dns_name
primary = None
primary_parent = None
for _t in a:
- if _t.trust_flags & netlogon.NETR_TRUST_FLAG_PRIMARY:
- primary = _t
- if not _t.trust_flags & netlogon.NETR_TRUST_FLAG_TREEROOT:
- primary_parent = a[_t.parent_index]
- break
+ if _t.trust_flags & netlogon.NETR_TRUST_FLAG_PRIMARY:
+ primary = _t
+ if not _t.trust_flags & netlogon.NETR_TRUST_FLAG_TREEROOT:
+ primary_parent = a[_t.parent_index]
+ break
if t.trust_flags & netlogon.NETR_TRUST_FLAG_IN_FOREST:
if t is primary_parent:
self.outf.write("Stored uPNSuffixes attributes[%d]:\n" % len(stored_upn_vals))
for v in stored_upn_vals:
- self.outf.write("TLN: %-32s DNS[*.%s]\n" % ("", v))
+ self.outf.write("TLN: %-32s DNS[*.%s]\n" % ("", v))
self.outf.write("Stored msDS-SPNSuffixes attributes[%d]:\n" % len(stored_spn_vals))
for v in stored_spn_vals:
- self.outf.write("TLN: %-32s DNS[*.%s]\n" % ("", v))
+ self.outf.write("TLN: %-32s DNS[*.%s]\n" % ("", v))
if not require_update:
return
self.outf.write("Update uPNSuffixes attributes[%d]:\n" % len(update_upn_vals))
for v in update_upn_vals:
- self.outf.write("TLN: %-32s DNS[*.%s]\n" % ("", v))
+ self.outf.write("TLN: %-32s DNS[*.%s]\n" % ("", v))
self.outf.write("Update msDS-SPNSuffixes attributes[%d]:\n" % len(update_spn_vals))
for v in update_spn_vals:
- self.outf.write("TLN: %-32s DNS[*.%s]\n" % ("", v))
+ self.outf.write("TLN: %-32s DNS[*.%s]\n" % ("", v))
update_msg = ldb.Message()
update_msg.dn = stored_msg.dn
self.outf.write("Failed search of base=%s\n" % self.search_base)
raise
for x in res:
- dn_list.append(x["dn"].get_linearized())
+ dn_list.append(x["dn"].get_linearized())
#
global summary
#
dnsdomain = netbiosname.lower()
if rootdn is None:
- rootdn = domaindn
+ rootdn = domaindn
if configdn is None:
configdn = "CN=Configuration," + rootdn
except LdbError as e5:
(enum, estr) = e5.args
if enum == ldb.ERR_NO_SUCH_OBJECT:
- raise DemoteException("Given DN %s doesn't exist" % ntds_dn)
+ raise DemoteException("Given DN %s doesn't exist" % ntds_dn)
else:
raise
if (len(msgs) == 0):
for parameter in root:
name = parameter.attrib.get('name')
if parameter.attrib.get('removed') == "1":
- continue
+ continue
yield name
syn = parameter.findall('synonym')
if syn is not None:
name = parameter.attrib.get("name")
param_type = parameter.attrib.get("type")
if parameter.attrib.get('removed') == "1":
- continue
+ continue
values = parameter.findall("value")
defaults = []
for value in values:
elif context == "S":
section = "test"
else:
- self.fail("%s has no valid context" % param)
+ self.fail("%s has no valid context" % param)
p = subprocess.Popen(program + ["-s", self.smbconf,
"--section-name", section, "--parameter-name", param],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self.topdir).communicate()
elif context == "S":
section = "test"
else:
- self.fail("%s has no valid context" % param)
+ self.fail("%s has no valid context" % param)
p = subprocess.Popen(program + ["-s", self.smbconf,
"--section-name", section, "--parameter-name", param,
"--option", "%s = %s" % (param, default)],
elif context == "S":
section = "test"
else:
- self.fail("%s has no valid context" % param)
+ self.fail("%s has no valid context" % param)
value_to_use = arbitrary.get(param_type)
if value_to_use is None:
def generate_functions(path_in, path_out):
f = open(path_out, 'w')
try:
- f.write('/* This file was automatically generated by generate_param.py. DO NOT EDIT */\n\n')
- for parameter in iterate_all(options.filename):
+ f.write('/* This file was automatically generated by generate_param.py. DO NOT EDIT */\n\n')
+ for parameter in iterate_all(options.filename):
# filter out parameteric options
if ':' in parameter['name']:
continue
output_string += 'const '
param_type = mapping.get(parameter['type'])
if param_type is None:
- raise Exception(parameter['name'] + " has an invalid context " + parameter['context'])
+ raise Exception(parameter['name'] + " has an invalid context " + parameter['context'])
output_string += param_type
output_string += "lp_%s" % parameter['function']
output_string += 'const '
param_type = mapping.get(parameter['type'])
if param_type is None:
- raise Exception(parameter['name'] + " has an invalid context " + parameter['context'])
+ raise Exception(parameter['name'] + " has an invalid context " + parameter['context'])
output_string += param_type
output_string += "lpcfg_%s" % parameter['function']
else:
raise Exception(parameter['name'] + " has an invalid param type " + parameter['type'])
-
+
file_out.write(output_string)
finally:
file_out.close()
file_out.write("struct loadparm_service \n")
file_out.write("{\n")
file_out.write("\tbool autoloaded;\n")
-
+
for parameter in iterate_all(path_in):
# filter out parameteric options
if ':' in parameter['name']:
output_string = "\t"
param_type = mapping.get(parameter['type'])
if param_type is None:
- raise Exception(parameter['name'] + " has an invalid context " + parameter['context'])
+ raise Exception(parameter['name'] + " has an invalid context " + parameter['context'])
output_string += param_type
output_string += " %s;\n" % parameter['function']
self.seen_output = True
test = self._add_prefix(test)
if self.strip_ok_output:
- self.output = ""
+ self.output = ""
self._ops.startTest(test)
print("Testing correct behavior on nonaccessible search base")
try:
- self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
- scope=SCOPE_BASE)
+ self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
+ scope=SCOPE_BASE)
except LdbError as e18:
(num, _) = e18.args
self.assertEquals(num, ERR_NO_SUCH_OBJECT)
# same as for join_RODC, but do not set any SPNs
def create_rodc(self, ctx):
- ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
- ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
- ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
-
- ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
- "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
- "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
- ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
-
- mysid = ctx.get_mysid()
- admin_dn = "<SID=%s>" % mysid
- ctx.managedby = admin_dn
-
- ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
- samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
- samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
-
- ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
- ctx.secure_channel_type = misc.SEC_CHAN_RODC
- ctx.RODC = True
- ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
- drsuapi.DRSUAPI_DRS_PER_SYNC |
- drsuapi.DRSUAPI_DRS_GET_ANC |
- drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
- drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
-
- ctx.join_add_objects()
+ ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
+ ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
+ ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
+
+ ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
+ "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
+ "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
+ ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
+
+ mysid = ctx.get_mysid()
+ admin_dn = "<SID=%s>" % mysid
+ ctx.managedby = admin_dn
+
+ ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
+ samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
+ samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
+
+ ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
+ ctx.secure_channel_type = misc.SEC_CHAN_RODC
+ ctx.RODC = True
+ ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
+ drsuapi.DRSUAPI_DRS_PER_SYNC |
+ drsuapi.DRSUAPI_DRS_GET_ANC |
+ drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
+ drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+
+ ctx.join_add_objects()
def create_dc(self, ctx):
ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
expression="samaccountname=*",
controls=["dirsync:1:0:1"])
except LdbError as l:
- self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
+ self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
def test_parentGUID_referrals(self):
res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
systemOnly: FALSE
"""
try:
- self.ldb.add_ldif(ldif)
- self.fail()
+ self.ldb.add_ldif(ldif)
+ self.fail()
except LdbError as e1:
- (num, _) = e1.args
- self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+ (num, _) = e1.args
+ self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
ldif = """
dn: CN=%s,%s""" % (class_name, self.schema_dn) + """
m["userAccountControl"] = ldb.MessageElement(str(samba.dsdb.UF_SERVER_TRUST_ACCOUNT),
ldb.FLAG_MOD_REPLACE, "userAccountControl")
try:
- self.samdb.modify(m)
- self.fail()
+ self.samdb.modify(m)
+ self.fail()
except LdbError as e10:
- (enum, estr) = e10.args
- self.assertEqual(ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS, enum)
+ (enum, estr) = e10.args
+ self.assertEqual(ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS, enum)
m = ldb.Message()
m.dn = res[0].dn
if len(args) != 1:
import os
if not "DC_SERVER" in os.environ.keys():
- parser.error("You must supply a server")
+ parser.error("You must supply a server")
args.append(os.environ["DC_SERVER"])
if creds.is_anonymous():
def add_attr(self, dn, attname, vals):
if dn not in self.global_objs:
- self.global_objs[dn] = {}
+ self.global_objs[dn] = {}
self.global_objs[dn][attname] = vals
def print_all(self):
for dn, obj in self.global_objs.items():
- self.ldif.unparse(dn, obj)
- continue
+ self.ldif.unparse(dn, obj)
+ continue
self.global_objs = {}
def attid_equal(a1,a2):
gls = globals()
try:
- f = open(cookie_file, 'r')
- store_blob = f.read()
- f.close()
-
- store_hdr = store_blob[0:28]
- (store_version, \
- store_dn_len, store_dn_ofs, \
- store_hwm_len, store_hwm_ofs, \
- store_utdv_len, store_utdv_ofs) = \
- struct.unpack("<LLLLLLL", store_hdr)
-
- store_dn = store_blob[store_dn_ofs:store_dn_ofs+store_dn_len]
- store_hwm_blob = store_blob[store_hwm_ofs:store_hwm_ofs+store_hwm_len]
- store_utdv_blob = store_blob[store_utdv_ofs:store_utdv_ofs+store_utdv_len]
-
- store_hwm = ndr_unpack(drsuapi.DsReplicaHighWaterMark, store_hwm_blob)
- store_utdv = ndr_unpack(drsblobs.replUpToDateVectorBlob, store_utdv_blob)
-
- assert store_dn == dn
- #print "%s" % ndr_print(store_hwm)
- #print "%s" % ndr_print(store_utdv)
+ f = open(cookie_file, 'r')
+ store_blob = f.read()
+ f.close()
+
+ store_hdr = store_blob[0:28]
+ (store_version, \
+ store_dn_len, store_dn_ofs, \
+ store_hwm_len, store_hwm_ofs, \
+ store_utdv_len, store_utdv_ofs) = \
+ struct.unpack("<LLLLLLL", store_hdr)
+
+ store_dn = store_blob[store_dn_ofs:store_dn_ofs+store_dn_len]
+ store_hwm_blob = store_blob[store_hwm_ofs:store_hwm_ofs+store_hwm_len]
+ store_utdv_blob = store_blob[store_utdv_ofs:store_utdv_ofs+store_utdv_len]
+
+ store_hwm = ndr_unpack(drsuapi.DsReplicaHighWaterMark, store_hwm_blob)
+ store_utdv = ndr_unpack(drsblobs.replUpToDateVectorBlob, store_utdv_blob)
+
+ assert store_dn == dn
+ #print "%s" % ndr_print(store_hwm)
+ #print "%s" % ndr_print(store_utdv)
except Exception:
- store_dn = dn
- store_hwm = drsuapi.DsReplicaHighWaterMark()
- store_hwm.tmp_highest_usn = 0
- store_hwm.reserved_usn = 0
- store_hwm.highest_usn = 0
- store_utdv = None
+ store_dn = dn
+ store_hwm = drsuapi.DsReplicaHighWaterMark()
+ store_hwm.tmp_highest_usn = 0
+ store_hwm.reserved_usn = 0
+ store_hwm.highest_usn = 0
+ store_utdv = None
binding_str = "ncacn_ip_tcp:%s[spnego,seal]" % server
def _create_rodc(self, ctx):
- ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
- ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
- ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
-
- ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
- "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
- "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
- ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
-
- mysid = ctx.get_mysid()
- admin_dn = "<SID=%s>" % mysid
- ctx.managedby = admin_dn
-
- ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
- samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
- samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
-
- ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
- ctx.secure_channel_type = misc.SEC_CHAN_RODC
- ctx.RODC = True
- ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
- drsuapi.DRSUAPI_DRS_PER_SYNC |
- drsuapi.DRSUAPI_DRS_GET_ANC |
- drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
- drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
-
- ctx.join_add_objects()
+ ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
+ ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
+ ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
+
+ ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
+ "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
+ "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
+ ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
+
+ mysid = ctx.get_mysid()
+ admin_dn = "<SID=%s>" % mysid
+ ctx.managedby = admin_dn
+
+ ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
+ samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
+ samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
+
+ ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
+ ctx.secure_channel_type = misc.SEC_CHAN_RODC
+ ctx.RODC = True
+ ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
+ drsuapi.DRSUAPI_DRS_PER_SYNC |
+ drsuapi.DRSUAPI_DRS_GET_ANC |
+ drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
+ drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+
+ ctx.join_add_objects()