import os
import time
+import base64
import ldb
from samba.tests.samba_tool.base import SambaToolCmdTest
from samba import (
+ credentials,
nttime2unix,
dsdb
)
+from samba.ndr import ndr_unpack
+from samba.dcerpc import drsblobs
class UserCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool user subcommands"""
def setUp(self):
super(UserCmdTestCase, self).setUp()
self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.users = []
self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
for user in self.users:
(result, out, err) = user["createUserFn"](user)
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
# try to delete all the 4 users we just added
for user in self.users:
(result, out, err) = self.runsubcmd("user", "delete", user["name"])
- self.assertCmdSuccess(result, "Can we delete users")
+ self.assertCmdSuccess(result, out, err, "Can we delete users")
found = self._find_user(user["name"])
self.assertIsNone(found)
# test adding users with --use-username-as-cn
for user in self.users:
- (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"],
+ (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
"--use-username-as-cn",
"--surname=%s" % user["surname"],
"--given-name=%s" % user["given-name"],
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
self.assertEquals("%s" % found.get("cn"), "%(name)s" % user)
self.assertEquals("%s" % found.get("name"), "%(name)s" % user)
+ def _verify_supplementalCredentials(self, ldif,
+ min_packages=3,
+ max_packages=6):
+ msgs = self.samdb.parse_ldif(ldif)
+ (changetype, obj) = next(msgs)
+ self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
+ sc_blob = obj["supplementalCredentials"][0]
+ sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
+
+ self.assertGreaterEqual(sc.sub.num_packages,
+ min_packages, "min_packages check")
+ self.assertLessEqual(sc.sub.num_packages,
+ max_packages, "max_packages check")
+
+ if max_packages == 0:
+ return
+
+ def find_package(packages, name, start_idx=0):
+ for i in range(start_idx, len(packages)):
+ if packages[i].name == name:
+ return (i, packages[i])
+ return (None, None)
+
+ # The ordering is this
+ #
+ # Primary:Kerberos-Newer-Keys (optional)
+ # Primary:Kerberos
+ # Primary:WDigest
+ # Primary:CLEARTEXT (optional)
+ # Primary:SambaGPG (optional)
+ #
+ # And the 'Packages' package is insert before the last
+ # other package.
+
+ nidx = 0
+ (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
+ self.assertIsNotNone(pp, "Packages required")
+ self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
+ "Packages needs to be at num_packages - 1")
+
+ (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
+ start_idx=nidx)
+ if knidx is not None:
+ self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
+ nidx = nidx + 1
+ if nidx == pidx:
+ nidx = nidx + 1
+
+ (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
+ start_idx=nidx)
+ self.assertIsNotNone(pp, "Primary:Kerberos required")
+ self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
+ nidx = nidx + 1
+ if nidx == pidx:
+ nidx = nidx + 1
+
+ (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
+ start_idx=nidx)
+ self.assertIsNotNone(pp, "Primary:WDigest required")
+ self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
+ nidx = nidx + 1
+ if nidx == pidx:
+ nidx = nidx + 1
+
+ (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
+ start_idx=nidx)
+ if cidx is not None:
+ self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
+ nidx = nidx + 1
+ if nidx == pidx:
+ nidx = nidx + 1
+
+ (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
+ start_idx=nidx)
+ if gidx is not None:
+ self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
+ nidx = nidx + 1
+ if nidx == pidx:
+ nidx = nidx + 1
+
+ self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
def test_setpassword(self):
for user in self.users:
"--newpassword=%s" % newpasswd,
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- # self.assertCmdSuccess(result, "Ensure setpassword runs")
+ self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
self.assertEquals(err,"","setpassword with url")
self.assertMatch(out, "Changed password OK", "setpassword with url")
+ attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
+ (result, out, err) = self.runsubcmd("user", "syncpasswords",
+ "--cache-ldb-initialize",
+ "--attributes=%s" % attributes,
+ "--decrypt-samba-gpg")
+ self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
+ self.assertEqual(err,"","getpassword without url")
+ cache_attrs = {
+ "objectClass": { "value": "userSyncPasswords" },
+ "samdbUrl": { },
+ "dirsyncFilter": { },
+ "dirsyncAttribute": { },
+ "dirsyncControl": { "value": "dirsync:1:0:0"},
+ "passwordAttribute": { },
+ "decryptSambaGPG": { },
+ "currentTime": { },
+ }
+ for a in cache_attrs.keys():
+ v = cache_attrs[a].get("value", "")
+ self.assertMatch(out, "%s: %s" % (a, v),
+ "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
+
+ (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
+ self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
+ self.assertEqual(err,"","syncpasswords --no-wait")
+ self.assertMatch(out, "dirsync_loop(): results 0",
+ "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
+ for user in self.users:
+ self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
+ "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
+
for user in self.users:
newpasswd = self.randomPass()
+ creds = credentials.Credentials()
+ creds.set_anonymous()
+ creds.set_password(newpasswd)
+ nthash = creds.get_nt_hash()
+ unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
+ virtualClearTextUTF8 = base64.b64encode(newpasswd).decode('utf8')
+ virtualClearTextUTF16 = base64.b64encode(unicode(newpasswd, 'utf-8').encode('utf-16-le')).decode('utf8')
+
(result, out, err) = self.runsubcmd("user", "setpassword",
user["name"],
"--newpassword=%s" % newpasswd)
- # self.assertCmdSuccess(result, "Ensure setpassword runs")
+ self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
self.assertEquals(err,"","setpassword without url")
self.assertMatch(out, "Changed password OK", "setpassword without url")
+ (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
+ self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
+ self.assertEqual(err,"","syncpasswords --no-wait")
+ self.assertMatch(out, "dirsync_loop(): results 0",
+ "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
+ self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
+ "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
+ self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
+ "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
+ self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
+ "getpassword unicodePwd: out[%s]" % out)
+ self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
+ "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
+ self.assertMatch(out, "supplementalCredentials:: ",
+ "getpassword supplementalCredentials: out[%s]" % out)
+ if "virtualSambaGPG:: " in out:
+ self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
+ "getpassword virtualClearTextUTF8: out[%s]" % out)
+ self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
+ "getpassword virtualClearTextUTF16: out[%s]" % out)
+ self.assertMatch(out, "virtualSSHA: ",
+ "getpassword virtualSSHA: out[%s]" % out)
+
+ (result, out, err) = self.runsubcmd("user", "getpassword",
+ user["name"],
+ "--attributes=%s" % attributes,
+ "--decrypt-samba-gpg")
+ self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
+ self.assertEqual(err,"","getpassword without url")
+ self.assertMatch(out, "Got password OK", "getpassword without url")
+ self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
+ "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
+ self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
+ "getpassword unicodePwd: out[%s]" % out)
+ self.assertMatch(out, "supplementalCredentials:: ",
+ "getpassword supplementalCredentials: out[%s]" % out)
+ self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
+ if "virtualSambaGPG:: " in out:
+ self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
+ "getpassword virtualClearTextUTF8: out[%s]" % out)
+ self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
+ "getpassword virtualClearTextUTF16: out[%s]" % out)
+ self.assertMatch(out, "virtualSSHA: ",
+ "getpassword virtualSSHA: out[%s]" % out)
+
for user in self.users:
newpasswd = self.randomPass()
(result, out, err) = self.runsubcmd("user", "setpassword",
"--must-change-at-next-login",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- # self.assertCmdSuccess(result, "Ensure setpassword runs")
+ self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
self.assertEquals(err,"","setpassword with forced change")
self.assertMatch(out, "Changed password OK", "setpassword with forced change")
def test_setexpiry(self):
- twodays = time.time() + (2 * 24 * 60 * 60)
-
for user in self.users:
+ twodays = time.time() + (2 * 24 * 60 * 60)
+
(result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
"--days=2",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Can we run setexpiry with names")
+ self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
- for user in self.users:
found = self._find_user(user["name"])
expires = nttime2unix(int("%s" % found.get("accountExpires")))
# now run the expiration based on a filter
fourdays = time.time() + (4 * 24 * 60 * 60)
(result, out, err) = self.runsubcmd("user", "setexpiry",
- "--filter", "(&(objectClass=user)(company=comp2))",
- "--days=4",
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Can we run setexpiry with a filter")
+ "--filter", "(&(objectClass=user)(company=comp2))",
+ "--days=4",
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
for user in self.users:
found = self._find_user(user["name"])
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Error running list")
+ self.assertCmdSuccess(result, out, err, "Error running list")
search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
(ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
name = userobj.get("samaccountname", idx=0)
found = self.assertMatch(out, name,
"user '%s' not found" % name)
+
+ def test_show(self):
+ for user in self.users:
+ (result, out, err) = self.runsubcmd(
+ "user", "show", user["name"],
+ "--attributes=sAMAccountName,company",
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"],
+ os.environ["DC_PASSWORD"]))
+ self.assertCmdSuccess(result, out, err, "Error running show")
+
+ expected_out = """dn: CN=%s %s,CN=Users,%s
+company: %s
+sAMAccountName: %s
+
+""" % (user["given-name"], user["surname"], self.samdb.domain_dn(),
+ user["company"], user["name"])
+
+ self.assertEqual(out, expected_out,
+ "Unexpected show output for user '%s'" %
+ user["name"])
+
+ def test_move(self):
+ full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
+ (result, out, err) = self.runsubcmd("ou", "create", full_ou_dn)
+ self.assertCmdSuccess(result, out, err)
+ self.assertEquals(err, "", "There shouldn't be any error message")
+ self.assertIn('Created ou "%s"' % full_ou_dn, out)
+
+ for user in self.users:
+ (result, out, err) = self.runsubcmd(
+ "user", "move", user["name"], full_ou_dn)
+ self.assertCmdSuccess(result, out, err, "Error running move")
+ self.assertIn('Moved user "%s" into "%s"' %
+ (user["name"], full_ou_dn), out)
+
+ # Should fail as users objects are in OU
+ (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
+ self.assertCmdFail(result)
+ self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
+ "(it has %d children)!") % len(self.users), err)
+
+ for user in self.users:
+ new_dn = "CN=Users,%s" % self.samdb.domain_dn()
+ (result, out, err) = self.runsubcmd(
+ "user", "move", user["name"], new_dn)
+ self.assertCmdSuccess(result, out, err, "Error running move")
+ self.assertIn('Moved user "%s" into "%s"' %
+ (user["name"], new_dn), out)
+
+ (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
+ self.assertCmdSuccess(result, out, err,
+ "Failed to delete ou '%s'" % full_ou_dn)
+
def test_getpwent(self):
try:
import pwd
return
-# samba-tool user add command didn't support users with empty gecos if none is
+# samba-tool user create command didn't support users with empty gecos if none is
# specified on the command line and the user hasn't one in the passwd file it
# will fail, so let's add some contents
"loginShell": u[6],
})
# check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
- (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"],
- "--surname=%s" % user["surname"],
- "--given-name=%s" % user["given-name"],
- "--job-title=%s" % user["job-title"],
- "--department=%s" % user["department"],
- "--description=%s" % user["description"],
- "--company=%s" % user["company"],
- "--gecos=%s" % user["gecos"],
- "--rfc2307-from-nss",
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
+ "--surname=%s" % user["surname"],
+ "--given-name=%s" % user["given-name"],
+ "--job-title=%s" % user["job-title"],
+ "--department=%s" % user["department"],
+ "--description=%s" % user["description"],
+ "--company=%s" % user["company"],
+ "--gecos=%s" % user["gecos"],
+ "--rfc2307-from-nss",
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- msg = "command should return %s" % err
- self.assertCmdSuccess(result, msg)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
user = self._randomPosixUser({"name": u[0]})
# create a user with posix attributes from nss but override all of them with the
# random ones just obtained
- (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"],
- "--surname=%s" % user["surname"],
- "--given-name=%s" % user["given-name"],
- "--job-title=%s" % user["job-title"],
- "--department=%s" % user["department"],
- "--description=%s" % user["description"],
- "--company=%s" % user["company"],
- "--rfc2307-from-nss",
- "--gecos=%s" % user["gecos"],
- "--login-shell=%s" % user["loginShell"],
- "--uid=%s" % user["uid"],
- "--uid-number=%s" % user["uidNumber"],
- "--gid-number=%s" % user["gidNumber"],
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
+ "--surname=%s" % user["surname"],
+ "--given-name=%s" % user["given-name"],
+ "--job-title=%s" % user["job-title"],
+ "--department=%s" % user["department"],
+ "--description=%s" % user["description"],
+ "--company=%s" % user["company"],
+ "--rfc2307-from-nss",
+ "--gecos=%s" % user["gecos"],
+ "--login-shell=%s" % user["loginShell"],
+ "--uid=%s" % user["uid"],
+ "--uid-number=%s" % user["uidNumber"],
+ "--gid-number=%s" % user["gidNumber"],
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- msg = "command should return %s" % err
- self.assertCmdSuccess(result, msg)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
"description": self.randomName(count=100),
"createUserFn": self._create_user,
"checkUserFn": self._check_user,
- }
+ }
user.update(base)
return user
self._check_user(user)
def _create_user(self, user):
- return self.runsubcmd("user", "add", user["name"], user["password"],
- "--surname=%s" % user["surname"],
- "--given-name=%s" % user["given-name"],
- "--job-title=%s" % user["job-title"],
- "--department=%s" % user["department"],
- "--description=%s" % user["description"],
- "--company=%s" % user["company"],
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ return self.runsubcmd("user", "create", user["name"], user["password"],
+ "--surname=%s" % user["surname"],
+ "--given-name=%s" % user["given-name"],
+ "--job-title=%s" % user["job-title"],
+ "--department=%s" % user["department"],
+ "--description=%s" % user["description"],
+ "--company=%s" % user["company"],
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
def _create_posix_user(self, user):
""" create a new user with RFC2307 attributes """
return self.runsubcmd("user", "create", user["name"], user["password"],
- "--surname=%s" % user["surname"],
- "--given-name=%s" % user["given-name"],
- "--job-title=%s" % user["job-title"],
- "--department=%s" % user["department"],
- "--description=%s" % user["description"],
- "--company=%s" % user["company"],
- "--gecos=%s" % user["gecos"],
- "--login-shell=%s" % user["loginShell"],
- "--uid=%s" % user["uid"],
- "--uid-number=%s" % user["uidNumber"],
- "--gid-number=%s" % user["gidNumber"],
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ "--surname=%s" % user["surname"],
+ "--given-name=%s" % user["given-name"],
+ "--job-title=%s" % user["job-title"],
+ "--department=%s" % user["department"],
+ "--description=%s" % user["description"],
+ "--company=%s" % user["company"],
+ "--gecos=%s" % user["gecos"],
+ "--login-shell=%s" % user["loginShell"],
+ "--uid=%s" % user["uid"],
+ "--uid-number=%s" % user["uidNumber"],
+ "--gid-number=%s" % user["gidNumber"],
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
def _find_user(self, name):
search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
userlist = self.samdb.search(base=self.samdb.domain_dn(),
- scope=ldb.SCOPE_SUBTREE,
- expression=search_filter, attrs=[])
+ scope=ldb.SCOPE_SUBTREE,
+ expression=search_filter, attrs=[])
if userlist:
return userlist[0]
else: