gensec,
generate_random_password,
Ldb,
- )
+)
from samba.net import Net
from samba.netcmd import (
CommandError,
SuperCommand,
Option,
- )
-
+)
+from samba.compat import text_type
try:
import io
except ImportError as e:
gpgme_support = False
decrypt_samba_gpg_help = "Decrypt the SambaGPG password not supported, " + \
- "python-gpgme required"
+ "python-gpgme required"
disabled_virtual_attributes = {
- }
+}
virtual_attributes = {
"virtualClearTextUTF8": {
"flags": ldb.ATTR_FLAG_FORCE_BASE64_LDIF,
- },
+ },
"virtualClearTextUTF16": {
"flags": ldb.ATTR_FLAG_FORCE_BASE64_LDIF,
- },
+ },
"virtualSambaGPG": {
"flags": ldb.ATTR_FLAG_FORCE_BASE64_LDIF,
- },
- }
+ },
+}
get_random_bytes_fn = None
if get_random_bytes_fn is None:
# we can ignore the possible == at the end
# of the base64 string
# we just need to replace '+' by '.'
- b64salt = base64.b64encode(salt)[0:16].replace('+', '.')
+ b64salt = base64.b64encode(salt)[0:16].replace(b'+', b'.').decode('utf8')
crypt_salt = ""
if rounds != 0:
crypt_salt = "$%s$rounds=%s$%s$" % (alg, rounds, b64salt)
h = hashlib.sha1()
h = None
virtual_attributes["virtualSSHA"] = {
- }
+ }
except ImportError as e:
reason = "hashlib.sha1()"
if random_reason:
reason += " required"
disabled_virtual_attributes["virtualSSHA"] = {
"reason" : reason,
- }
+ }
for (alg, attr) in [("5", "virtualCryptSHA256"), ("6", "virtualCryptSHA512")]:
try:
v = get_crypt_value(alg, "")
v = None
virtual_attributes[attr] = {
- }
+ }
except ImportError as e:
reason = "crypt"
if random_reason:
reason += " required"
disabled_virtual_attributes[attr] = {
"reason" : reason,
- }
+ }
except NotImplementedError as e:
reason = "modern '$%s$' salt in crypt(3) required" % (alg)
disabled_virtual_attributes[attr] = {
"reason" : reason,
- }
+ }
# Add the wDigest virtual attributes, virtualWDigest01 to virtualWDigest29
for x in range(1, 30):
takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
- metavar="URL", dest="H"),
+ metavar="URL", dest="H"),
Option("--must-change-at-next-login",
- help="Force password to be changed on next login",
- action="store_true"),
+ help="Force password to be changed on next login",
+ action="store_true"),
Option("--random-password",
- help="Generate random password",
- action="store_true"),
+ help="Generate random password",
+ action="store_true"),
Option("--smartcard-required",
- help="Require a smartcard for interactive logons",
- action="store_true"),
+ help="Require a smartcard for interactive logons",
+ action="store_true"),
Option("--use-username-as-cn",
- help="Force use of username as user's CN",
- action="store_true"),
+ help="Force use of username as user's CN",
+ action="store_true"),
Option("--userou",
- help="DN of alternative location (without domainDN counterpart) to default CN=Users in which new user object will be created. E. g. 'OU=<OU name>'",
- type=str),
+ help="DN of alternative location (without domainDN counterpart) to default CN=Users in which new user object will be created. E. g. 'OU=<OU name>'",
+ type=str),
Option("--surname", help="User's surname", type=str),
Option("--given-name", help="User's given name", type=str),
Option("--initials", help="User's initials", type=str),
Option("--telephone-number", help="User's phone number", type=str),
Option("--physical-delivery-office", help="User's office location", type=str),
Option("--rfc2307-from-nss",
- help="Copy Unix user attributes from NSS (will be overridden by explicit UID/GID/GECOS/shell)",
- action="store_true"),
+ help="Copy Unix user attributes from NSS (will be overridden by explicit UID/GID/GECOS/shell)",
+ action="store_true"),
Option("--nis-domain", help="User's Unix/RFC2307 NIS domain", type=str),
Option("--unix-home", help="User's Unix/RFC2307 home directory",
- type=str),
+ type=str),
Option("--uid", help="User's Unix/RFC2307 username", type=str),
Option("--uid-number", help="User's Unix/RFC2307 numeric UID", type=int),
Option("--gid-number", help="User's Unix/RFC2307 primary GID number", type=int),
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username, password=None, credopts=None, sambaopts=None,
versionopts=None, H=None, must_change_at_next_login=False,
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username, credopts=None, sambaopts=None, versionopts=None,
H=None):
credentials=creds, lp=lp)
filter = ("(&(sAMAccountName=%s)(sAMAccountType=805306368))" %
- ldb.binary_encode(username))
+ ldb.binary_encode(username))
try:
res = samdb.search(base=samdb.domain_dn(),
takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
metavar="URL", dest="H"),
- ]
+ ]
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, sambaopts=None, credopts=None, versionopts=None, H=None):
lp = sambaopts.get_loadparm()
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
metavar="URL", dest="H"),
Option("--filter", help="LDAP Filter to set password on", type=str),
- ]
+ ]
takes_args = ["username?"]
Option("-H", "--URL", help="LDB URL for database or target server", type=str,
metavar="URL", dest="H"),
Option("--filter", help="LDAP Filter to set password on", type=str),
- ]
+ ]
takes_args = ["username?"]
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username=None, sambaopts=None, credopts=None,
versionopts=None, filter=None, H=None):
takes_options = [
Option("--newpassword", help="New password", type=str),
- ]
+ ]
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, credopts=None, sambaopts=None, versionopts=None,
- newpassword=None):
+ newpassword=None):
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
self.outf.write("Sorry, passwords do not match.\n")
try:
- net.change_password(password.encode('utf-8'))
+ if not isinstance(password, text_type):
+ password = password.decode('utf8')
+ net.change_password(password)
except Exception as msg:
# FIXME: catch more specific exception
raise CommandError("Failed to change password : %s" % msg)
help="Force password to be changed on next login",
action="store_true"),
Option("--random-password",
- help="Generate random password",
- action="store_true"),
+ help="Generate random password",
+ action="store_true"),
Option("--smartcard-required",
- help="Require a smartcard for interactive logons",
- action="store_true"),
+ help="Require a smartcard for interactive logons",
+ action="store_true"),
Option("--clear-smartcard-required",
- help="Don't require a smartcard for interactive logons",
- action="store_true"),
- ]
+ help="Don't require a smartcard for interactive logons",
+ action="store_true"),
+ ]
takes_args = ["username?"]
for h in up.hashes:
if (scheme_match is None and
- h.scheme == SCHEME and
- h.value.startswith(scheme_prefix)):
+ h.scheme == SCHEME and
+ h.value.startswith(scheme_prefix)):
scheme_match = h.value
if h.scheme == SCHEME and h.value.startswith(prefix):
return (h.value, scheme_match)
h.update(u8)
h.update(salt)
bv = h.digest() + salt
- v = "{SSHA}" + base64.b64encode(bv)
+ v = "{SSHA}" + base64.b64encode(bv).decode('utf8')
elif a == "virtualCryptSHA256":
rounds = get_rounds(attr_opts[a])
x = get_virtual_crypt_value(a, 5, rounds, username, account_name)
Option("--decrypt-samba-gpg",
help=decrypt_samba_gpg_help,
action="store_true", default=False, dest="decrypt_samba_gpg"),
- ]
+ ]
takes_args = ["username?"]
userPrincipalName and userAccountControl.
It recovers from LDAP disconnects and updates the cache in conservative way
-(in single steps after each succesfully processed change). An error from
+(in single steps after each successfully processed change). An error from
the script (specified by '--script') will result in fatal error and this
command will exit. But the cache state should be still valid and can be
resumed in the next "Sync Loop Run".
Option("--terminate",
help="Send a SIGTERM to an already running (daemon) process",
action="store_true", default=False, dest="terminate"),
- ]
+ ]
def run(self, cache_ldb_initialize=False, cache_ldb=None,
H=None, filter=None,
dirsync_filter = "(&" + \
"(objectClass=user)" + \
"(userAccountControl:%s:=%u)" % (
- ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT) + \
+ ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT) + \
"(!(sAMAccountName=krbtgt*))" + \
")"
self.sync_command = sync_command
add_ldif = "dn: %s\n" % self.cache_dn
add_ldif += "objectClass: userSyncPasswords\n"
- add_ldif += "samdbUrl:: %s\n" % base64.b64encode(self.samdb_url)
- add_ldif += "dirsyncFilter:: %s\n" % base64.b64encode(self.dirsync_filter)
+ add_ldif += "samdbUrl:: %s\n" % base64.b64encode(self.samdb_url).decode('utf8')
+ add_ldif += "dirsyncFilter:: %s\n" % base64.b64encode(self.dirsync_filter).decode('utf8')
for a in self.dirsync_attrs:
- add_ldif += "dirsyncAttribute:: %s\n" % base64.b64encode(a)
+ add_ldif += "dirsyncAttribute:: %s\n" % base64.b64encode(a).decode('utf8')
add_ldif += "dirsyncControl: %s\n" % self.dirsync_controls[0]
for a in self.password_attrs:
- add_ldif += "passwordAttribute:: %s\n" % base64.b64encode(a)
+ add_ldif += "passwordAttribute:: %s\n" % base64.b64encode(a).decode('utf8')
if self.decrypt_samba_gpg == True:
add_ldif += "decryptSambaGPG: TRUE\n"
else:
self.current_pid = None
self.outf.write("Initialized cache_ldb[%s]\n" % (cache_ldb))
msgs = self.cache.parse_ldif(add_ldif)
- changetype,msg = msgs.next()
+ changetype,msg = next(msgs)
ldif = self.cache.write_ldif(msg, ldb.CHANGETYPE_NONE)
self.outf.write("%s" % ldif)
else:
if self.lockfd != -1:
got_exclusive = False
# Try 5 times to get the exclusiv lock.
- for i in xrange(0, 5):
+ for i in range(0, 5):
try:
fcntl.lockf(self.lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB)
got_exclusive = True
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username, credopts=None, sambaopts=None, versionopts=None,
H=None, editor=None):
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username, credopts=None, sambaopts=None, versionopts=None,
H=None, user_attrs=None):
"sambaopts": options.SambaOptions,
"credopts": options.CredentialsOptions,
"versionopts": options.VersionOptions,
- }
+ }
def run(self, username, new_parent_dn, credopts=None, sambaopts=None,
versionopts=None, H=None):