def setUp(self):
super(UserCmdTestCase, self).setUp()
self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.users = []
self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
nidx = nidx + 1
(kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
- start_idx=nidx)
+ start_idx=nidx)
self.assertIsNotNone(pp, "Primary:Kerberos required")
self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
nidx = nidx + 1
nidx = nidx + 1
(cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
- start_idx=nidx)
+ start_idx=nidx)
if cidx is not None:
self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
nidx = nidx + 1
for a in cache_attrs.keys():
v = cache_attrs[a].get("value", "")
self.assertMatch(out, "%s: %s" % (a, v),
- "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
+ "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
(result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
self.assertEqual(err,"","syncpasswords --no-wait")
self.assertMatch(out, "dirsync_loop(): results 0",
- "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
+ "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
for user in self.users:
self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
- "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
+ "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
for user in self.users:
newpasswd = self.randomPass()
creds.set_anonymous()
creds.set_password(newpasswd)
nthash = creds.get_nt_hash()
- unicodePwd = base64.b64encode(creds.get_nt_hash())
- virtualClearTextUTF8 = base64.b64encode(newpasswd)
- virtualClearTextUTF16 = base64.b64encode(unicode(newpasswd, 'utf-8').encode('utf-16-le'))
+ unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
+ virtualClearTextUTF8 = base64.b64encode(newpasswd).decode('utf8')
+ virtualClearTextUTF16 = base64.b64encode(unicode(newpasswd, 'utf-8').encode('utf-16-le')).decode('utf8')
(result, out, err) = self.runsubcmd("user", "setpassword",
user["name"],
self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
self.assertEqual(err,"","syncpasswords --no-wait")
self.assertMatch(out, "dirsync_loop(): results 0",
- "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
+ "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
- "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
+ "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
- "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
+ "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
- "getpassword unicodePwd: out[%s]" % out)
+ "getpassword unicodePwd: out[%s]" % out)
self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
- "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
+ "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
self.assertMatch(out, "supplementalCredentials:: ",
- "getpassword supplementalCredentials: out[%s]" % out)
+ "getpassword supplementalCredentials: out[%s]" % out)
if "virtualSambaGPG:: " in out:
self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
- "getpassword virtualClearTextUTF8: out[%s]" % out)
+ "getpassword virtualClearTextUTF8: out[%s]" % out)
self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
- "getpassword virtualClearTextUTF16: out[%s]" % out)
+ "getpassword virtualClearTextUTF16: out[%s]" % out)
self.assertMatch(out, "virtualSSHA: ",
- "getpassword virtualSSHA: out[%s]" % out)
+ "getpassword virtualSSHA: out[%s]" % out)
(result, out, err) = self.runsubcmd("user", "getpassword",
user["name"],
self.assertEqual(err,"","getpassword without url")
self.assertMatch(out, "Got password OK", "getpassword without url")
self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
- "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
+ "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
- "getpassword unicodePwd: out[%s]" % out)
+ "getpassword unicodePwd: out[%s]" % out)
self.assertMatch(out, "supplementalCredentials:: ",
- "getpassword supplementalCredentials: out[%s]" % out)
+ "getpassword supplementalCredentials: out[%s]" % out)
self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
if "virtualSambaGPG:: " in out:
self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
- "getpassword virtualClearTextUTF8: out[%s]" % out)
+ "getpassword virtualClearTextUTF8: out[%s]" % out)
self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
- "getpassword virtualClearTextUTF16: out[%s]" % out)
+ "getpassword virtualClearTextUTF16: out[%s]" % out)
self.assertMatch(out, "virtualSSHA: ",
- "getpassword virtualSSHA: out[%s]" % out)
+ "getpassword virtualSSHA: out[%s]" % out)
for user in self.users:
newpasswd = self.randomPass()
def test_setexpiry(self):
- twodays = time.time() + (2 * 24 * 60 * 60)
-
for user in self.users:
+ twodays = time.time() + (2 * 24 * 60 * 60)
+
(result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
"--days=2",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
- for user in self.users:
found = self._find_user(user["name"])
expires = nttime2unix(int("%s" % found.get("accountExpires")))
# now run the expiration based on a filter
fourdays = time.time() + (4 * 24 * 60 * 60)
(result, out, err) = self.runsubcmd("user", "setexpiry",
- "--filter", "(&(objectClass=user)(company=comp2))",
- "--days=4",
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ "--filter", "(&(objectClass=user)(company=comp2))",
+ "--days=4",
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
for user in self.users:
"--attributes=sAMAccountName,company",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
- os.environ["DC_PASSWORD"]))
+ os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Error running show")
expected_out = """dn: CN=%s %s,CN=Users,%s
sAMAccountName: %s
""" % (user["given-name"], user["surname"], self.samdb.domain_dn(),
- user["company"], user["name"])
+ user["company"], user["name"])
self.assertEqual(out, expected_out,
"Unexpected show output for user '%s'" %
})
# check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
(result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
- "--surname=%s" % user["surname"],
- "--given-name=%s" % user["given-name"],
- "--job-title=%s" % user["job-title"],
- "--department=%s" % user["department"],
- "--description=%s" % user["description"],
- "--company=%s" % user["company"],
- "--gecos=%s" % user["gecos"],
- "--rfc2307-from-nss",
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ "--surname=%s" % user["surname"],
+ "--given-name=%s" % user["given-name"],
+ "--job-title=%s" % user["job-title"],
+ "--department=%s" % user["department"],
+ "--description=%s" % user["description"],
+ "--company=%s" % user["company"],
+ "--gecos=%s" % user["gecos"],
+ "--rfc2307-from-nss",
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
# create a user with posix attributes from nss but override all of them with the
# random ones just obtained
(result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
- "--surname=%s" % user["surname"],
- "--given-name=%s" % user["given-name"],
- "--job-title=%s" % user["job-title"],
- "--department=%s" % user["department"],
- "--description=%s" % user["description"],
- "--company=%s" % user["company"],
- "--rfc2307-from-nss",
- "--gecos=%s" % user["gecos"],
- "--login-shell=%s" % user["loginShell"],
- "--uid=%s" % user["uid"],
- "--uid-number=%s" % user["uidNumber"],
- "--gid-number=%s" % user["gidNumber"],
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ "--surname=%s" % user["surname"],
+ "--given-name=%s" % user["given-name"],
+ "--job-title=%s" % user["job-title"],
+ "--department=%s" % user["department"],
+ "--description=%s" % user["description"],
+ "--company=%s" % user["company"],
+ "--rfc2307-from-nss",
+ "--gecos=%s" % user["gecos"],
+ "--login-shell=%s" % user["loginShell"],
+ "--uid=%s" % user["uid"],
+ "--uid-number=%s" % user["uidNumber"],
+ "--gid-number=%s" % user["gidNumber"],
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
"description": self.randomName(count=100),
"createUserFn": self._create_user,
"checkUserFn": self._check_user,
- }
+ }
user.update(base)
return user
def _create_user(self, user):
return self.runsubcmd("user", "create", user["name"], user["password"],
- "--surname=%s" % user["surname"],
- "--given-name=%s" % user["given-name"],
- "--job-title=%s" % user["job-title"],
- "--department=%s" % user["department"],
- "--description=%s" % user["description"],
- "--company=%s" % user["company"],
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ "--surname=%s" % user["surname"],
+ "--given-name=%s" % user["given-name"],
+ "--job-title=%s" % user["job-title"],
+ "--department=%s" % user["department"],
+ "--description=%s" % user["description"],
+ "--company=%s" % user["company"],
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
def _create_posix_user(self, user):
""" create a new user with RFC2307 attributes """
return self.runsubcmd("user", "create", user["name"], user["password"],
- "--surname=%s" % user["surname"],
- "--given-name=%s" % user["given-name"],
- "--job-title=%s" % user["job-title"],
- "--department=%s" % user["department"],
- "--description=%s" % user["description"],
- "--company=%s" % user["company"],
- "--gecos=%s" % user["gecos"],
- "--login-shell=%s" % user["loginShell"],
- "--uid=%s" % user["uid"],
- "--uid-number=%s" % user["uidNumber"],
- "--gid-number=%s" % user["gidNumber"],
- "-H", "ldap://%s" % os.environ["DC_SERVER"],
- "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
+ "--surname=%s" % user["surname"],
+ "--given-name=%s" % user["given-name"],
+ "--job-title=%s" % user["job-title"],
+ "--department=%s" % user["department"],
+ "--description=%s" % user["description"],
+ "--company=%s" % user["company"],
+ "--gecos=%s" % user["gecos"],
+ "--login-shell=%s" % user["loginShell"],
+ "--uid=%s" % user["uid"],
+ "--uid-number=%s" % user["uidNumber"],
+ "--gid-number=%s" % user["gidNumber"],
+ "-H", "ldap://%s" % os.environ["DC_SERVER"],
+ "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
def _find_user(self, name):
search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
userlist = self.samdb.search(base=self.samdb.domain_dn(),
- scope=ldb.SCOPE_SUBTREE,
- expression=search_filter, attrs=[])
+ scope=ldb.SCOPE_SUBTREE,
+ expression=search_filter, attrs=[])
if userlist:
return userlist[0]
else: