gensec,
generate_random_password,
Ldb,
- )
+)
from samba.net import Net
from samba.netcmd import (
CommandError,
SuperCommand,
Option,
- )
-
+)
+from samba.compat import text_type
try:
import io
except ImportError as e:
gpgme_support = False
decrypt_samba_gpg_help = "Decrypt the SambaGPG password not supported, " + \
- "python-gpgme required"
+ "python-gpgme required"
disabled_virtual_attributes = {
- }
+}
virtual_attributes = {
"virtualClearTextUTF8": {
"flags": ldb.ATTR_FLAG_FORCE_BASE64_LDIF,
- },
+ },
"virtualClearTextUTF16": {
"flags": ldb.ATTR_FLAG_FORCE_BASE64_LDIF,
- },
+ },
"virtualSambaGPG": {
"flags": ldb.ATTR_FLAG_FORCE_BASE64_LDIF,
- },
- }
+ },
+}
get_random_bytes_fn = None
if get_random_bytes_fn is None:
# we can ignore the possible == at the end
# of the base64 string
# we just need to replace '+' by '.'
- b64salt = base64.b64encode(salt)[0:16].replace('+', '.')
+ b64salt = base64.b64encode(salt)[0:16].replace(b'+', b'.').decode('utf8')
crypt_salt = ""
if rounds != 0:
crypt_salt = "$%s$rounds=%s$%s$" % (alg, rounds, b64salt)
h = hashlib.sha1()
h = None
virtual_attributes["virtualSSHA"] = {
- }
+ }
except ImportError as e:
reason = "hashlib.sha1()"
if random_reason:
reason += " and " + random_reason
reason += " required"
disabled_virtual_attributes["virtualSSHA"] = {
- "reason" : reason,
- }
+ "reason": reason,
+ }
for (alg, attr) in [("5", "virtualCryptSHA256"), ("6", "virtualCryptSHA512")]:
try:
v = get_crypt_value(alg, "")
v = None
virtual_attributes[attr] = {
- }
+ }
except ImportError as e:
reason = "crypt"
if random_reason:
reason += " and " + random_reason
reason += " required"
disabled_virtual_attributes[attr] = {
- "reason" : reason,
- }
+ "reason": reason,
+ }
except NotImplementedError as e:
reason = "modern '$%s$' salt in crypt(3) required" % (alg)
disabled_virtual_attributes[attr] = {
- "reason" : reason,
- }
+ "reason": reason,
+ }
# Add the wDigest virtual attributes, virtualWDigest01 to virtualWDigest29
for x in range(1, 30):
takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
- metavar="URL", dest="H"),
+ metavar="URL", dest="H"),
Option("--must-change-at-next-login",
- help="Force password to be changed on next login",
- action="store_true"),
+ help="Force password to be changed on next login",
+ action="store_true"),
Option("--random-password",
- help="Generate random password",
- action="store_true"),
+ help="Generate random password",
+ action="store_true"),
Option("--smartcard-required",
- help="Require a smartcard for interactive logons",
- action="store_true"),
+ help="Require a smartcard for interactive logons",
+ action="store_true"),
Option("--use-username-as-cn",
- help="Force use of username as user's CN",
- action="store_true"),
+ help="Force use of username as user's CN",
+ action="store_true"),
Option("--userou",
- help="DN of alternative location (without domainDN counterpart) to default CN=Users in which new user object will be created. E. g. 'OU=<OU name>'",
- type=str),
+ help="DN of alternative location (without domainDN counterpart) to default CN=Users in which new user object will be created. E. g. 'OU=<OU name>'",
+ type=str),
Option("--surname", help="User's surname", type=str),
Option("--given-name", help="User's given name", type=str),
Option("--initials", help="User's initials", type=str),
Option("--telephone-number", help="User's phone number", type=str),
Option("--physical-delivery-office", help="User's office location", type=str),
Option("--rfc2307-from-nss",
- help="Copy Unix user attributes from NSS (will be overridden by explicit UID/GID/GECOS/shell)",
- action="store_true"),
+ help="Copy Unix user attributes from NSS (will be overridden by explicit UID/GID/GECOS/shell)",
+ action="store_true"),
Option("--nis-domain", help="User's Unix/RFC2307 NIS domain", type=str),
Option("--unix-home", help="User's Unix/RFC2307 home directory",
- type=str),
+ type=str),
Option("--uid", help="User's Unix/RFC2307 username", type=str),
Option("--uid-number", help="User's Unix/RFC2307 numeric UID", type=int),
Option("--gid-number", help="User's Unix/RFC2307 primary GID number", type=int),
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username, password=None, credopts=None, sambaopts=None,
versionopts=None, H=None, must_change_at_next_login=False,
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username, credopts=None, sambaopts=None, versionopts=None,
H=None):
credentials=creds, lp=lp)
filter = ("(&(sAMAccountName=%s)(sAMAccountType=805306368))" %
- username)
+ ldb.binary_encode(username))
try:
res = samdb.search(base=samdb.domain_dn(),
takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
metavar="URL", dest="H"),
- ]
+ ]
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, sambaopts=None, credopts=None, versionopts=None, H=None):
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp, fallback_machine=True)
samdb = SamDB(url=H, session_info=system_session(),
- credentials=creds, lp=lp)
+ credentials=creds, lp=lp)
domain_dn = samdb.domain_dn()
res = samdb.search(domain_dn, scope=ldb.SCOPE_SUBTREE,
- expression=("(&(objectClass=user)(userAccountControl:%s:=%u))"
- % (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT)),
- attrs=["samaccountname"])
+ expression=("(&(objectClass=user)(userAccountControl:%s:=%u))"
+ % (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT)),
+ attrs=["samaccountname"])
if (len(res) == 0):
return
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
metavar="URL", dest="H"),
Option("--filter", help="LDAP Filter to set password on", type=str),
- ]
+ ]
takes_args = ["username?"]
creds = credopts.get_credentials(lp, fallback_machine=True)
samdb = SamDB(url=H, session_info=system_session(),
- credentials=creds, lp=lp)
+ credentials=creds, lp=lp)
try:
samdb.enable_account(filter)
except Exception as msg:
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
metavar="URL", dest="H"),
Option("--filter", help="LDAP Filter to set password on", type=str),
- ]
+ ]
takes_args = ["username?"]
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username=None, sambaopts=None, credopts=None,
versionopts=None, filter=None, H=None):
creds = credopts.get_credentials(lp, fallback_machine=True)
samdb = SamDB(url=H, session_info=system_session(),
- credentials=creds, lp=lp)
+ credentials=creds, lp=lp)
try:
samdb.disable_account(filter)
except Exception as msg:
creds = credopts.get_credentials(lp)
samdb = SamDB(url=H, session_info=system_session(),
- credentials=creds, lp=lp)
+ credentials=creds, lp=lp)
try:
- samdb.setexpiry(filter, days*24*3600, no_expiry_req=noexpiry)
+ samdb.setexpiry(filter, days * 24 * 3600, no_expiry_req=noexpiry)
except Exception as msg:
# FIXME: Catch more specific exception
raise CommandError("Failed to set expiry for user '%s': %s" % (
takes_options = [
Option("--newpassword", help="New password", type=str),
- ]
+ ]
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, credopts=None, sambaopts=None, versionopts=None,
- newpassword=None):
+ newpassword=None):
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
self.outf.write("Sorry, passwords do not match.\n")
try:
- net.change_password(password.encode('utf-8'))
+ if not isinstance(password, text_type):
+ password = password.decode('utf8')
+ net.change_password(password)
except Exception as msg:
# FIXME: catch more specific exception
raise CommandError("Failed to change password : %s" % msg)
help="Force password to be changed on next login",
action="store_true"),
Option("--random-password",
- help="Generate random password",
- action="store_true"),
+ help="Generate random password",
+ action="store_true"),
Option("--smartcard-required",
- help="Require a smartcard for interactive logons",
- action="store_true"),
+ help="Require a smartcard for interactive logons",
+ action="store_true"),
Option("--clear-smartcard-required",
- help="Don't require a smartcard for interactive logons",
- action="store_true"),
- ]
+ help="Don't require a smartcard for interactive logons",
+ action="store_true"),
+ ]
takes_args = ["username?"]
domain, dns_domain):
if i == 1:
user = account_name
- realm= domain
+ realm = domain
elif i == 2:
user = account_name.lower()
realm = domain.lower()
digests = ndr_unpack(drsblobs.package_PrimaryWDigestBlob,
primary_wdigest)
try:
- digest = binascii.hexlify(bytearray(digests.hashes[i-1].hash))
+ digest = binascii.hexlify(bytearray(digests.hashes[i - 1].hash))
return "%s:%s:%s" % (user, realm, digest)
except IndexError:
return None
for h in up.hashes:
if (scheme_match is None and
- h.scheme == SCHEME and
- h.value.startswith(scheme_prefix)):
+ h.scheme == SCHEME and
+ h.value.startswith(scheme_prefix)):
scheme_match = h.value
if h.scheme == SCHEME and h.value.startswith(prefix):
return (h.value, scheme_match)
h.update(u8)
h.update(salt)
bv = h.digest() + salt
- v = "{SSHA}" + base64.b64encode(bv)
+ v = "{SSHA}" + base64.b64encode(bv).decode('utf8')
elif a == "virtualCryptSHA256":
rounds = get_rounds(attr_opts[a])
x = get_virtual_crypt_value(a, 5, rounds, username, account_name)
Option("--decrypt-samba-gpg",
help=decrypt_samba_gpg_help,
action="store_true", default=False, dest="decrypt_samba_gpg"),
- ]
+ ]
takes_args = ["username?"]
userPrincipalName and userAccountControl.
It recovers from LDAP disconnects and updates the cache in conservative way
-(in single steps after each succesfully processed change). An error from
+(in single steps after each successfully processed change). An error from
the script (specified by '--script') will result in fatal error and this
command will exit. But the cache state should be still valid and can be
resumed in the next "Sync Loop Run".
Option("--terminate",
help="Send a SIGTERM to an already running (daemon) process",
action="store_true", default=False, dest="terminate"),
- ]
+ ]
def run(self, cache_ldb_initialize=False, cache_ldb=None,
H=None, filter=None,
dirsync_filter = "(&" + \
"(objectClass=user)" + \
"(userAccountControl:%s:=%u)" % (
- ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT) + \
+ ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT) + \
"(!(sAMAccountName=krbtgt*))" + \
")"
self.sync_command = sync_command
add_ldif = "dn: %s\n" % self.cache_dn
add_ldif += "objectClass: userSyncPasswords\n"
- add_ldif += "samdbUrl:: %s\n" % base64.b64encode(self.samdb_url)
- add_ldif += "dirsyncFilter:: %s\n" % base64.b64encode(self.dirsync_filter)
+ add_ldif += "samdbUrl:: %s\n" % base64.b64encode(self.samdb_url).decode('utf8')
+ add_ldif += "dirsyncFilter:: %s\n" % base64.b64encode(self.dirsync_filter).decode('utf8')
for a in self.dirsync_attrs:
- add_ldif += "dirsyncAttribute:: %s\n" % base64.b64encode(a)
+ add_ldif += "dirsyncAttribute:: %s\n" % base64.b64encode(a).decode('utf8')
add_ldif += "dirsyncControl: %s\n" % self.dirsync_controls[0]
for a in self.password_attrs:
- add_ldif += "passwordAttribute:: %s\n" % base64.b64encode(a)
+ add_ldif += "passwordAttribute:: %s\n" % base64.b64encode(a).decode('utf8')
if self.decrypt_samba_gpg == True:
add_ldif += "decryptSambaGPG: TRUE\n"
else:
self.current_pid = None
self.outf.write("Initialized cache_ldb[%s]\n" % (cache_ldb))
msgs = self.cache.parse_ldif(add_ldif)
- changetype,msg = msgs.next()
+ changetype,msg = next(msgs)
ldif = self.cache.write_ldif(msg, ldb.CHANGETYPE_NONE)
self.outf.write("%s" % ldif)
else:
del dirsync_obj[a]
dirsync_obj["# %s::" % a] = ["REDACTED SECRET ATTRIBUTE"]
dirsync_ldif = self.samdb.write_ldif(dirsync_obj, ldb.CHANGETYPE_NONE)
- log_msg("# Dirsync[%d] %s %s\n%s" %(idx, guid, sid, dirsync_ldif))
+ log_msg("# Dirsync[%d] %s %s\n%s" % (idx, guid, sid, dirsync_ldif))
obj = self.get_account_attributes(self.samdb,
username="%s" % sid,
basedn="<GUID=%s>" % guid,
if self.lockfd != -1:
got_exclusive = False
# Try 5 times to get the exclusiv lock.
- for i in xrange(0, 5):
+ for i in range(0, 5):
try:
fcntl.lockf(self.lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB)
got_exclusive = True
if self.current_pid is not None:
log_msg("currentPid: %d\n" % self.current_pid)
- modify_ldif = "dn: %s\n" % (self.cache_dn)
+ modify_ldif = "dn: %s\n" % (self.cache_dn)
modify_ldif += "changetype: modify\n"
modify_ldif += "replace: currentPid\n"
if self.current_pid is not None:
self.dirsync_controls = [str(res_controls[0]),"extended_dn:1:0"]
log_msg("dirsyncControls: %r\n" % self.dirsync_controls)
- modify_ldif = "dn: %s\n" % (self.cache_dn)
+ modify_ldif = "dn: %s\n" % (self.cache_dn)
modify_ldif += "changetype: modify\n"
modify_ldif += "replace: dirsyncControl\n"
modify_ldif += "dirsyncControl: %s\n" % (self.dirsync_controls[0])
add_ldif += "currentTime: %s\n" % ldb.timestring(int(time.time()))
self.cache.add_ldif(add_ldif)
else:
- modify_ldif = "dn: %s\n" % (dn)
+ modify_ldif = "dn: %s\n" % (dn)
modify_ldif += "changetype: modify\n"
modify_ldif += "replace: lastCookie\n"
modify_ldif += "lastCookie: %s\n" % (lastCookie)
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username, credopts=None, sambaopts=None, versionopts=None,
H=None, editor=None):
line = line[2:]
plus_lines.append(line)
- user_ldif="dn: %s\n" % user_dn
+ user_ldif = "dn: %s\n" % user_dn
user_ldif += "changetype: modify\n"
for line in minus_lines:
attr, val = line.split(':', 1)
- search_attr="%s:" % attr
+ search_attr = "%s:" % attr
if not re.search(r'^' + search_attr, str(plus_lines)):
user_ldif += "delete: %s\n" % attr
user_ldif += "%s: %s\n" % (attr, val)
for line in plus_lines:
attr, val = line.split(':', 1)
- search_attr="%s:" % attr
+ search_attr = "%s:" % attr
if re.search(r'^' + search_attr, str(minus_lines)):
user_ldif += "replace: %s\n" % attr
user_ldif += "%s: %s\n" % (attr, val)
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username, credopts=None, sambaopts=None, versionopts=None,
H=None, user_attrs=None):
type=str, metavar="URL", dest="H"),
]
- takes_args = [ "username", "new_parent_dn" ]
+ takes_args = ["username", "new_parent_dn"]
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username, new_parent_dn, credopts=None, sambaopts=None,
versionopts=None, H=None):
(new_parent_dn, e.message))
full_new_user_dn = ldb.Dn(samdb, str(user_dn))
- full_new_user_dn.remove_base_components(len(user_dn)-1)
+ full_new_user_dn.remove_base_components(len(user_dn) - 1)
full_new_user_dn.add_base(full_new_parent_dn)
try: