1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from samba.tests.samba_tool.base import SambaToolCmdTest
29 from samba.ndr import ndr_unpack
30 from samba.dcerpc import drsblobs
31 from samba.common import get_bytes
32 from samba.common import get_string
33 from samba.tests import env_loadparm
36 class UserCmdTestCase(SambaToolCmdTest):
37 """Tests for samba-tool user subcommands"""
43 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
44 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
46 # Modify the default template homedir
47 lp = self.get_loadparm()
48 self.template_homedir = lp.get('template homedir')
49 lp.set('template homedir', '/home/test/%D/%U')
52 self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
53 self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
54 self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
55 self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
56 self.users.append(self._randomPosixUser({"name": "posixuser1"}))
57 self.users.append(self._randomPosixUser({"name": "posixuser2"}))
58 self.users.append(self._randomPosixUser({"name": "posixuser3"}))
59 self.users.append(self._randomPosixUser({"name": "posixuser4"}))
60 self.users.append(self._randomUnixUser({"name": "unixuser1"}))
61 self.users.append(self._randomUnixUser({"name": "unixuser2"}))
62 self.users.append(self._randomUnixUser({"name": "unixuser3"}))
63 self.users.append(self._randomUnixUser({"name": "unixuser4"}))
65 # Make sure users don't exist
66 for user in self.users:
67 if self._find_user(user["name"]):
68 self.runsubcmd("user", "delete", user["name"])
70 # setup the 12 users and ensure they are correct
71 for user in self.users:
72 (result, out, err) = user["createUserFn"](user)
74 self.assertCmdSuccess(result, out, err)
75 self.assertEqual(err, "", "Shouldn't be any error messages")
76 if 'unix' in user["name"]:
77 self.assertIn("Modified User '%s' successfully" % user["name"],
80 self.assertIn("User '%s' added successfully" % user["name"],
83 user["checkUserFn"](user)
87 # clean up all the left over users, just in case
88 for user in self.users:
89 if self._find_user(user["name"]):
90 self.runsubcmd("user", "delete", user["name"])
92 # second run of this test
93 # the cache is still there and '--cache-ldb-initialize'
95 cachedb = lp.private_path("user-syncpasswords-cache.ldb")
96 if os.path.exists(cachedb):
98 lp.set('template homedir', self.template_homedir)
100 def test_newuser(self):
101 # try to add all the users again, this should fail
102 for user in self.users:
103 (result, out, err) = self._create_user(user)
104 self.assertCmdFail(result, "Ensure that create user fails")
105 self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
107 # try to delete all the 4 users we just added
108 for user in self.users:
109 (result, out, err) = self.runsubcmd("user", "delete", user["name"])
110 self.assertCmdSuccess(result, out, err, "Can we delete users")
111 found = self._find_user(user["name"])
112 self.assertIsNone(found)
114 # test adding users with --use-username-as-cn
115 for user in self.users:
116 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
117 "--use-username-as-cn",
118 "--surname=%s" % user["surname"],
119 "--given-name=%s" % user["given-name"],
120 "--job-title=%s" % user["job-title"],
121 "--department=%s" % user["department"],
122 "--description=%s" % user["description"],
123 "--company=%s" % user["company"],
124 "-H", "ldap://%s" % os.environ["DC_SERVER"],
125 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
127 self.assertCmdSuccess(result, out, err)
128 self.assertEqual(err, "", "Shouldn't be any error messages")
129 self.assertIn("User '%s' added successfully" % user["name"], out)
131 found = self._find_user(user["name"])
133 self.assertEqual("%s" % found.get("cn"), "%(name)s" % user)
134 self.assertEqual("%s" % found.get("name"), "%(name)s" % user)
136 def test_newuser_weak_password(self):
137 # Ensure that when we try to create a user over LDAP (thus no
138 # transactions) and the password is too weak, we do not get a
139 # half-created account.
141 def cleanup_user(username):
143 self.samdb.deleteuser(username)
144 except Exception as err:
146 if 'Unable to find user' not in estr:
149 server = os.environ['DC_SERVER']
150 dc_username = os.environ['DC_USERNAME']
151 dc_password = os.environ['DC_PASSWORD']
153 username = self.randomName()
156 self.addCleanup(cleanup_user, username)
158 # Try to add the user and ensure it fails.
159 result, out, err = self.runsubcmd('user', 'add',
161 '-H', f'ldap://{server}',
162 f'-U{dc_username}%{dc_password}')
163 self.assertCmdFail(result)
164 self.assertIn('Failed to add user', err)
165 self.assertIn('LDAP_CONSTRAINT_VIOLATION', err)
166 self.assertIn(f'{werror.WERR_PASSWORD_RESTRICTION:08X}', err)
168 # Now search for the user, and make sure we don't find anything.
169 res = self.samdb.search(self.samdb.domain_dn(),
170 expression=f'(sAMAccountName={username})',
171 scope=ldb.SCOPE_SUBTREE)
172 self.assertEqual(0, len(res), 'expected not to find the user')
174 def _verify_supplementalCredentials(self, ldif,
177 msgs = self.samdb.parse_ldif(ldif)
178 (changetype, obj) = next(msgs)
180 self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
181 sc_blob = obj["supplementalCredentials"][0]
182 sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
184 self.assertGreaterEqual(sc.sub.num_packages,
185 min_packages, "min_packages check")
186 self.assertLessEqual(sc.sub.num_packages,
187 max_packages, "max_packages check")
189 if max_packages == 0:
192 def find_package(packages, name, start_idx=0):
193 for i in range(start_idx, len(packages)):
194 if packages[i].name == name:
195 return (i, packages[i])
198 # The ordering is this
200 # Primary:Kerberos-Newer-Keys (optional)
203 # Primary:CLEARTEXT (optional)
204 # Primary:SambaGPG (optional)
206 # And the 'Packages' package is insert before the last
210 (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
211 self.assertIsNotNone(pp, "Packages required")
212 self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
213 "Packages needs to be at num_packages - 1")
215 (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
217 if knidx is not None:
218 self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
223 (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
225 self.assertIsNotNone(pp, "Primary:Kerberos required")
226 self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
231 (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
233 self.assertIsNotNone(pp, "Primary:WDigest required")
234 self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
239 (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
242 self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
247 (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
250 self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
255 self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
257 def test_setpassword(self):
258 expect_nt_hash = bool(int(os.environ.get("EXPECT_NT_HASH", "1")))
260 for user in self.users:
261 newpasswd = self.random_password(16)
262 (result, out, err) = self.runsubcmd("user", "setpassword",
264 "--newpassword=%s" % newpasswd,
265 "-H", "ldap://%s" % os.environ["DC_SERVER"],
266 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
267 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
268 self.assertEqual(err, "", "setpassword with url")
269 self.assertMatch(out, "Changed password OK", "setpassword with url")
271 attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
272 (result, out, err) = self.runsubcmd("user", "syncpasswords",
273 "--cache-ldb-initialize",
274 "--attributes=%s" % attributes,
275 "--decrypt-samba-gpg")
276 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
277 self.assertEqual(err, "", "getpassword without url")
279 "objectClass": {"value": "userSyncPasswords"},
282 "dirsyncAttribute": {},
283 "dirsyncControl": {"value": "dirsync:1:0:0"},
284 "passwordAttribute": {},
285 "decryptSambaGPG": {},
288 for a in cache_attrs.keys():
289 v = cache_attrs[a].get("value", "")
290 self.assertMatch(out, "%s: %s" % (a, v),
291 "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
293 (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
294 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
295 self.assertEqual(err, "", "syncpasswords --no-wait")
296 self.assertMatch(out, "dirsync_loop(): results 0",
297 "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
298 for user in self.users:
299 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
300 "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
302 for user in self.users:
303 newpasswd = self.random_password(16)
304 creds = credentials.Credentials()
305 creds.set_anonymous()
306 creds.set_password(newpasswd)
307 unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
308 virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8')
309 virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8')
311 (result, out, err) = self.runsubcmd("user", "setpassword",
313 "--newpassword=%s" % newpasswd)
314 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
315 self.assertEqual(err, "", "setpassword without url")
316 self.assertMatch(out, "Changed password OK", "setpassword without url")
318 (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
319 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
320 self.assertEqual(err, "", "syncpasswords --no-wait")
321 self.assertMatch(out, "dirsync_loop(): results 0",
322 "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
323 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
324 "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
325 self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
326 "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
327 if expect_nt_hash or "virtualSambaGPG:: " in out:
328 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
329 "getpassword unicodePwd: out[%s]" % out)
331 self.assertNotIn("unicodePwd:: %s" % unicodePwd, out)
332 self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
333 "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
334 self.assertMatch(out, "supplementalCredentials:: ",
335 "getpassword supplementalCredentials: out[%s]" % out)
336 if "virtualSambaGPG:: " in out:
337 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
338 "getpassword virtualClearTextUTF8: out[%s]" % out)
339 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
340 "getpassword virtualClearTextUTF16: out[%s]" % out)
341 self.assertMatch(out, "virtualSSHA: ",
342 "getpassword virtualSSHA: out[%s]" % out)
344 (result, out, err) = self.runsubcmd("user", "getpassword",
346 "--attributes=%s" % attributes,
347 "--decrypt-samba-gpg")
348 self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
349 self.assertEqual(err, "Any available password returned OK\n", "getpassword without url")
350 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
351 "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
352 if expect_nt_hash or "virtualSambaGPG:: " in out:
353 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
354 "getpassword unicodePwd: out[%s]" % out)
356 self.assertNotIn("unicodePwd:: %s" % unicodePwd, out)
357 self.assertMatch(out, "supplementalCredentials:: ",
358 "getpassword supplementalCredentials: out[%s]" % out)
359 self._verify_supplementalCredentials(out)
360 if "virtualSambaGPG:: " in out:
361 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
362 "getpassword virtualClearTextUTF8: out[%s]" % out)
363 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
364 "getpassword virtualClearTextUTF16: out[%s]" % out)
365 self.assertMatch(out, "virtualSSHA: ",
366 "getpassword virtualSSHA: out[%s]" % out)
368 for user in self.users:
369 newpasswd = self.random_password(16)
370 (result, out, err) = self.runsubcmd("user", "setpassword",
372 "--newpassword=%s" % newpasswd,
373 "--must-change-at-next-login",
374 "-H", "ldap://%s" % os.environ["DC_SERVER"],
375 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
376 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
377 self.assertEqual(err, "", "setpassword with forced change")
378 self.assertMatch(out, "Changed password OK", "setpassword with forced change")
380 def test_setexpiry(self):
381 for user in self.users:
382 twodays = time.time() + (2 * 24 * 60 * 60)
384 (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
386 "-H", "ldap://%s" % os.environ["DC_SERVER"],
387 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
388 self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
389 self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
391 found = self._find_user(user["name"])
393 expires = nttime2unix(int("%s" % found.get("accountExpires")))
394 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
396 # TODO: re-enable this after the filter case is sorted out
397 if "filters are broken, bail now":
400 # now run the expiration based on a filter
401 fourdays = time.time() + (4 * 24 * 60 * 60)
402 (result, out, err) = self.runsubcmd("user", "setexpiry",
403 "--filter", "(&(objectClass=user)(company=comp2))",
405 "-H", "ldap://%s" % os.environ["DC_SERVER"],
406 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
407 self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
409 for user in self.users:
410 found = self._find_user(user["name"])
411 if ("%s" % found.get("company")) == "comp2":
412 expires = nttime2unix(int("%s" % found.get("accountExpires")))
413 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
415 expires = nttime2unix(int("%s" % found.get("accountExpires")))
416 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
419 (result, out, err) = self.runsubcmd("user", "list",
420 "-H", "ldap://%s" % os.environ["DC_SERVER"],
421 "-U%s%%%s" % (os.environ["DC_USERNAME"],
422 os.environ["DC_PASSWORD"]))
423 self.assertCmdSuccess(result, out, err, "Error running list")
425 search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
426 (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
428 userlist = self.samdb.search(base=self.samdb.domain_dn(),
429 scope=ldb.SCOPE_SUBTREE,
430 expression=search_filter,
431 attrs=["samaccountname"])
433 self.assertTrue(len(userlist) > 0, "no users found in samdb")
435 for userobj in userlist:
436 name = str(userobj.get("samaccountname", idx=0))
437 self.assertMatch(out, name,
438 "user '%s' not found" % name)
440 # Test: samba-tool user list --locked-only
441 # This test does not verify that the command lists the locked user, it just
442 # tests that it does not list unlocked users. The funcional test, which
443 # lists locked users, is located in the 'samba4.ldap.password_lockout' test
444 # in source8/dsdb/tests/python/password_lockout.py
445 def test_list_locked(self):
446 (result, out, err) = self.runsubcmd("user", "list",
447 "-H", "ldap://%s" % os.environ["DC_SERVER"],
448 "-U%s%%%s" % (os.environ["DC_USERNAME"],
449 os.environ["DC_PASSWORD"]),
451 self.assertCmdSuccess(result, out, err, "Error running list")
453 search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
454 (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
456 userlist = self.samdb.search(base=self.samdb.domain_dn(),
457 scope=ldb.SCOPE_SUBTREE,
458 expression=search_filter,
459 attrs=["samaccountname"])
461 for userobj in userlist:
462 name = str(userobj.get("samaccountname", idx=0))
463 self.assertNotIn(name, out,
464 "user '%s' is incorrectly listed as locked" % name)
466 def test_list_base_dn(self):
468 (result, out, err) = self.runsubcmd("user", "list", "-b", base_dn,
469 "-H", "ldap://%s" % os.environ["DC_SERVER"],
470 "-U%s%%%s" % (os.environ["DC_USERNAME"],
471 os.environ["DC_PASSWORD"]))
472 self.assertCmdSuccess(result, out, err, "Error running list")
474 search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
475 (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
477 userlist = self.samdb.search(base=self.samdb.normalize_dn_in_domain(base_dn),
478 scope=ldb.SCOPE_SUBTREE,
479 expression=search_filter,
480 attrs=["samaccountname"])
482 self.assertTrue(len(userlist) > 0, "no users found in samdb")
484 for userobj in userlist:
485 name = str(userobj.get("samaccountname", idx=0))
486 self.assertMatch(out, name,
487 "user '%s' not found" % name)
489 def test_list_full_dn(self):
490 (result, out, err) = self.runsubcmd("user", "list", "--full-dn",
491 "-H", "ldap://%s" % os.environ["DC_SERVER"],
492 "-U%s%%%s" % (os.environ["DC_USERNAME"],
493 os.environ["DC_PASSWORD"]))
494 self.assertCmdSuccess(result, out, err, "Error running list")
496 search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
497 (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
499 userlist = self.samdb.search(base=self.samdb.domain_dn(),
500 scope=ldb.SCOPE_SUBTREE,
501 expression=search_filter,
504 self.assertTrue(len(userlist) > 0, "no users found in samdb")
506 for userobj in userlist:
507 name = str(userobj.get("dn", idx=0))
508 self.assertMatch(out, name,
509 "user '%s' not found" % name)
511 def test_list_hide_expired(self):
512 expire_username = "expireUser"
513 expire_user = self._randomUser({"name": expire_username})
514 self._create_user(expire_user)
516 (result, out, err) = self.runsubcmd(
521 "ldap://%s" % os.environ["DC_SERVER"],
522 "-U%s%%%s" % (os.environ["DC_USERNAME"],
523 os.environ["DC_PASSWORD"]))
524 self.assertCmdSuccess(result, out, err, "Error running list")
525 self.assertTrue(expire_username in out,
526 "user '%s' not found" % expire_username)
528 # user will be expired one second ago
529 self.samdb.setexpiry(
530 "(sAMAccountname=%s)" % expire_username,
534 (result, out, err) = self.runsubcmd(
539 "ldap://%s" % os.environ["DC_SERVER"],
540 "-U%s%%%s" % (os.environ["DC_USERNAME"],
541 os.environ["DC_PASSWORD"]))
542 self.assertCmdSuccess(result, out, err, "Error running list")
543 self.assertFalse(expire_username in out,
544 "user '%s' found" % expire_username)
546 self.samdb.deleteuser(expire_username)
548 def test_list_hide_disabled(self):
549 disable_username = "disableUser"
550 disable_user = self._randomUser({"name": disable_username})
551 self._create_user(disable_user)
553 (result, out, err) = self.runsubcmd(
558 "ldap://%s" % os.environ["DC_SERVER"],
559 "-U%s%%%s" % (os.environ["DC_USERNAME"],
560 os.environ["DC_PASSWORD"]))
561 self.assertCmdSuccess(result, out, err, "Error running list")
562 self.assertTrue(disable_username in out,
563 "user '%s' not found" % disable_username)
565 self.samdb.disable_account("(sAMAccountname=%s)" % disable_username)
567 (result, out, err) = self.runsubcmd(
572 "ldap://%s" % os.environ["DC_SERVER"],
573 "-U%s%%%s" % (os.environ["DC_USERNAME"],
574 os.environ["DC_PASSWORD"]))
575 self.assertCmdSuccess(result, out, err, "Error running list")
576 self.assertFalse(disable_username in out,
577 "user '%s' found" % disable_username)
579 self.samdb.deleteuser(disable_username)
582 for user in self.users:
583 (result, out, err) = self.runsubcmd(
584 "user", "show", user["name"],
585 "--attributes=sAMAccountName,company",
586 "-H", "ldap://%s" % os.environ["DC_SERVER"],
587 "-U%s%%%s" % (os.environ["DC_USERNAME"],
588 os.environ["DC_PASSWORD"]))
589 self.assertCmdSuccess(result, out, err, "Error running show")
591 expected_out = """dn: CN=%s %s,CN=Users,%s
595 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
596 user["company"], user["name"])
598 self.assertEqual(out, expected_out,
599 "Unexpected show output for user '%s'" %
603 "name", # test that invalid values are just ignored
610 "lastLogonTimestamp",
612 "msDS-UserPasswordExpiryTimeComputed",
617 for ta in time_attrs:
619 for fm in ["GeneralizedTime", "UnixTime", "TimeSpec"]:
620 attrs.append("%s;format=%s" % (ta, fm))
622 (result, out, err) = self.runsubcmd(
623 "user", "show", user["name"],
624 "--attributes=%s" % ",".join(attrs),
625 "-H", "ldap://%s" % os.environ["DC_SERVER"],
626 "-U%s%%%s" % (os.environ["DC_USERNAME"],
627 os.environ["DC_PASSWORD"]))
628 self.assertCmdSuccess(result, out, err,
629 "Error running show --attributes=%s"
632 self.assertIn(";format=GeneralizedTime", out)
633 self.assertIn(";format=UnixTime", out)
634 self.assertIn(";format=TimeSpec", out)
636 self.assertIn("name: ", out)
637 self.assertNotIn("name;format=GeneralizedTime: ", out)
638 self.assertNotIn("name;format=UnixTime: ", out)
639 self.assertNotIn("name;format=TimeSpec: ", out)
641 self.assertIn("whenCreated: 20", out)
642 self.assertIn("whenCreated;format=GeneralizedTime: 20", out)
643 self.assertIn("whenCreated;format=UnixTime: 1", out)
644 self.assertIn("whenCreated;format=TimeSpec: 1", out)
646 self.assertIn("whenChanged: 20", out)
647 self.assertIn("whenChanged;format=GeneralizedTime: 20", out)
648 self.assertIn("whenChanged;format=UnixTime: 1", out)
649 self.assertIn("whenChanged;format=TimeSpec: 1", out)
651 self.assertIn("accountExpires: 9223372036854775807", out)
652 self.assertNotIn("accountExpires;format=GeneralizedTime: ", out)
653 self.assertNotIn("accountExpires;format=UnixTime: ", out)
654 self.assertNotIn("accountExpires;format=TimeSpec: ", out)
656 self.assertIn("badPasswordTime: 0", out)
657 self.assertNotIn("badPasswordTime;format=GeneralizedTime: ", out)
658 self.assertNotIn("badPasswordTime;format=UnixTime: ", out)
659 self.assertNotIn("badPasswordTime;format=TimeSpec: ", out)
661 self.assertIn("lastLogoff: 0", out)
662 self.assertNotIn("lastLogoff;format=GeneralizedTime: ", out)
663 self.assertNotIn("lastLogoff;format=UnixTime: ", out)
664 self.assertNotIn("lastLogoff;format=TimeSpec: ", out)
666 self.assertIn("lastLogon: 0", out)
667 self.assertNotIn("lastLogon;format=GeneralizedTime: ", out)
668 self.assertNotIn("lastLogon;format=UnixTime: ", out)
669 self.assertNotIn("lastLogon;format=TimeSpec: ", out)
671 # If a specified attribute is not available on a user object
672 # it's silently omitted.
673 self.assertNotIn("lastLogonTimestamp:", out)
674 self.assertNotIn("lockoutTime:", out)
676 self.assertIn("msDS-UserPasswordExpiryTimeComputed: 1", out)
677 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime: 20", out)
678 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime: 1", out)
679 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec: 1", out)
681 self.assertIn("pwdLastSet: 1", out)
682 self.assertIn("pwdLastSet;format=GeneralizedTime: 20", out)
683 self.assertIn("pwdLastSet;format=UnixTime: 1", out)
684 self.assertIn("pwdLastSet;format=TimeSpec: 1", out)
686 out_msgs = self.samdb.parse_ldif(out)
687 out_msg = next(out_msgs)[1]
689 self.assertIn("whenCreated", out_msg)
690 when_created_str = str(out_msg["whenCreated"][0])
691 self.assertIn("whenCreated;format=GeneralizedTime", out_msg)
692 self.assertEqual(str(out_msg["whenCreated;format=GeneralizedTime"][0]), when_created_str)
693 when_created_time = ldb.string_to_time(when_created_str)
694 self.assertIn("whenCreated;format=UnixTime", out_msg)
695 self.assertEqual(str(out_msg["whenCreated;format=UnixTime"][0]), str(when_created_time))
696 self.assertIn("whenCreated;format=TimeSpec", out_msg)
697 self.assertEqual(str(out_msg["whenCreated;format=TimeSpec"][0]),
698 "%d.000000000" % (when_created_time))
700 self.assertIn("whenChanged", out_msg)
701 when_changed_str = str(out_msg["whenChanged"][0])
702 self.assertIn("whenChanged;format=GeneralizedTime", out_msg)
703 self.assertEqual(str(out_msg["whenChanged;format=GeneralizedTime"][0]), when_changed_str)
704 when_changed_time = ldb.string_to_time(when_changed_str)
705 self.assertIn("whenChanged;format=UnixTime", out_msg)
706 self.assertEqual(str(out_msg["whenChanged;format=UnixTime"][0]), str(when_changed_time))
707 self.assertIn("whenChanged;format=TimeSpec", out_msg)
708 self.assertEqual(str(out_msg["whenChanged;format=TimeSpec"][0]),
709 "%d.000000000" % (when_changed_time))
711 self.assertIn("pwdLastSet;format=GeneralizedTime", out_msg)
712 pwd_last_set_str = str(out_msg["pwdLastSet;format=GeneralizedTime"][0])
713 pwd_last_set_time = ldb.string_to_time(pwd_last_set_str)
714 self.assertIn("pwdLastSet;format=UnixTime", out_msg)
715 self.assertEqual(str(out_msg["pwdLastSet;format=UnixTime"][0]), str(pwd_last_set_time))
716 self.assertIn("pwdLastSet;format=TimeSpec", out_msg)
717 self.assertIn("%d." % pwd_last_set_time, str(out_msg["pwdLastSet;format=TimeSpec"][0]))
718 self.assertNotIn(".000000000", str(out_msg["pwdLastSet;format=TimeSpec"][0]))
720 # assert that the pwd has been set in the minute after user creation
721 self.assertGreaterEqual(pwd_last_set_time, when_created_time)
722 self.assertLess(pwd_last_set_time, when_created_time + 60)
724 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime", out_msg)
725 pwd_expires_str = str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime"][0])
726 pwd_expires_time = ldb.string_to_time(pwd_expires_str)
727 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime", out_msg)
728 self.assertEqual(str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=UnixTime"][0]), str(pwd_expires_time))
729 self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec", out_msg)
730 self.assertIn("%d." % pwd_expires_time, str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
731 self.assertNotIn(".000000000", str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
733 # assert that the pwd expires after it was set
734 self.assertGreater(pwd_expires_time, pwd_last_set_time)
737 full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest_usr"))
738 self.addCleanup(self.samdb.delete, full_ou_dn, ["tree_delete:1"])
740 (result, out, err) = self.runsubcmd("ou", "add", full_ou_dn)
741 self.assertCmdSuccess(result, out, err)
742 self.assertEqual(err, "", "There shouldn't be any error message")
743 self.assertIn('Added ou "%s"' % full_ou_dn, out)
745 for user in self.users:
746 (result, out, err) = self.runsubcmd(
747 "user", "move", user["name"], full_ou_dn)
748 self.assertCmdSuccess(result, out, err, "Error running move")
749 self.assertIn('Moved user "%s" into "%s"' %
750 (user["name"], full_ou_dn), out)
752 # Should fail as users objects are in OU
753 (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
754 self.assertCmdFail(result)
755 self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
756 "(it has %d children)!") % len(self.users), err)
758 for user in self.users:
759 new_dn = "CN=Users,%s" % self.samdb.domain_dn()
760 (result, out, err) = self.runsubcmd(
761 "user", "move", user["name"], new_dn)
762 self.assertCmdSuccess(result, out, err, "Error running move")
763 self.assertIn('Moved user "%s" into "%s"' %
764 (user["name"], new_dn), out)
766 def test_rename_surname_initials_givenname(self):
767 """rename the existing surname and given name and add missing
768 initials, then remove them, for all users"""
769 for user in self.users:
770 new_givenname = "new_given_name_of_" + user["name"]
772 new_surname = "new_surname_of_" + user["name"]
773 found = self._find_user(user["name"])
774 old_cn = str(found.get("cn"))
776 # rename given name, initials and surname
777 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
778 "--surname=%s" % new_surname,
779 "--initials=%s" % new_initials,
780 "--given-name=%s" % new_givenname)
781 self.assertCmdSuccess(result, out, err)
782 self.assertEqual(err, "", "Shouldn't be any error messages")
783 self.assertIn('successfully', out)
785 found = self._find_user(user["name"])
786 self.assertEqual("%s" % found.get("givenName"), new_givenname)
787 self.assertEqual("%s" % found.get("initials"), new_initials)
788 self.assertEqual("%s" % found.get("sn"), new_surname)
789 self.assertEqual("%s" % found.get("name"),
790 "%s %s. %s" % (new_givenname, new_initials, new_surname))
791 self.assertEqual("%s" % found.get("cn"),
792 "%s %s. %s" % (new_givenname, new_initials, new_surname))
794 # remove given name, initials and surname
795 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
799 self.assertCmdSuccess(result, out, err)
800 self.assertEqual(err, "", "Shouldn't be any error messages")
801 self.assertIn('successfully', out)
803 found = self._find_user(user["name"])
804 self.assertEqual(found.get("givenName"), None)
805 self.assertEqual(found.get("initials"), None)
806 self.assertEqual(found.get("sn"), None)
807 self.assertEqual("%s" % found.get("cn"), user["name"])
809 # reset changes (initials are removed)
810 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
811 "--surname=%(surname)s" % user,
812 "--given-name=%(given-name)s" % user)
813 self.assertCmdSuccess(result, out, err)
816 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
817 "--force-new-cn=%s" % old_cn)
819 def test_rename_cn_samaccountname(self):
820 """rename and try to remove the cn and the samaccount of all users"""
821 for user in self.users:
822 new_cn = "new_cn_of_" + user["name"]
823 new_samaccountname = "new_samaccount_of_" + user["name"]
824 new_surname = "new_surname_of_" + user["name"]
827 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
828 "--samaccountname=%s"
829 % new_samaccountname,
830 "--force-new-cn=%s" % new_cn)
831 self.assertCmdSuccess(result, out, err)
832 self.assertEqual(err, "", "Shouldn't be any error messages")
833 self.assertIn('successfully', out)
835 found = self._find_user(new_samaccountname)
836 self.assertEqual("%s" % found.get("cn"), new_cn)
837 self.assertEqual("%s" % found.get("sAMAccountName"),
840 # changing the surname has no effect to the cn
841 (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
842 "--surname=%s" % new_surname)
843 self.assertCmdSuccess(result, out, err)
845 found = self._find_user(new_samaccountname)
846 self.assertEqual("%s" % found.get("cn"), new_cn)
848 # trying to remove cn (throws an error)
849 (result, out, err) = self.runsubcmd("user", "rename",
852 self.assertCmdFail(result)
853 self.assertIn('Failed to rename user', err)
854 self.assertIn("delete protected attribute", err)
856 # trying to remove the samccountname (throws an error)
857 (result, out, err) = self.runsubcmd("user", "rename",
860 self.assertCmdFail(result)
861 self.assertIn('Failed to rename user', err)
862 self.assertIn('delete protected attribute', err)
864 # reset changes (cn must be the name)
865 (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
866 "--samaccountname=%(name)s"
868 "--force-new-cn=%(name)s" % user)
869 self.assertCmdSuccess(result, out, err)
871 def test_rename_standard_cn(self):
872 """reset the cn of all users to the standard"""
873 for user in self.users:
874 new_cn = "new_cn_of_" + user["name"]
875 new_givenname = "new_given_name_of_" + user["name"]
877 new_surname = "new_surname_of_" + user["name"]
880 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
881 "--force-new-cn=%s" % new_cn)
882 self.assertCmdSuccess(result, out, err)
884 # remove given name, initials and surname
885 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
889 self.assertCmdSuccess(result, out, err)
891 # reset the CN (no given name, initials or surname --> samaccountname)
892 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
895 self.assertCmdSuccess(result, out, err)
896 self.assertEqual(err, "", "Shouldn't be any error messages")
897 self.assertIn('successfully', out)
899 found = self._find_user(user["name"])
900 self.assertEqual("%s" % found.get("cn"), user["name"])
902 # set given name, initials and surname and set different cn
903 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
904 "--force-new-cn=%s" % new_cn,
905 "--surname=%s" % new_surname,
906 "--initials=%s" % new_initials,
907 "--given-name=%s" % new_givenname)
908 self.assertCmdSuccess(result, out, err)
910 # reset the CN (given name, initials or surname are given --> given name)
911 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
914 self.assertCmdSuccess(result, out, err)
915 self.assertEqual(err, "", "Shouldn't be any error messages")
916 self.assertIn('successfully', out)
918 found = self._find_user(user["name"])
919 self.assertEqual("%s" % found.get("cn"),
920 "%s %s. %s" % (new_givenname, new_initials, new_surname))
923 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
926 "--surname=%(surname)s" % user,
927 "--given-name=%(given-name)s" % user)
928 self.assertCmdSuccess(result, out, err)
930 def test_rename_mailaddress_displayname(self):
931 for user in self.users:
932 new_mail = "new_mailaddress_of_" + user["name"]
933 new_displayname = "new displayname of " + user["name"]
935 # change mail and displayname
936 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
941 self.assertCmdSuccess(result, out, err)
942 self.assertEqual(err, "", "Shouldn't be any error messages")
943 self.assertIn('successfully', out)
945 found = self._find_user(user["name"])
946 self.assertEqual("%s" % found.get("mail"), new_mail)
947 self.assertEqual("%s" % found.get("displayName"), new_displayname)
949 # remove mail and displayname
950 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
953 self.assertCmdSuccess(result, out, err)
954 self.assertEqual(err, "", "Shouldn't be any error messages")
955 self.assertIn('successfully', out)
957 found = self._find_user(user["name"])
958 self.assertEqual(found.get("mail"), None)
959 self.assertEqual(found.get("displayName"), None)
961 def test_rename_upn(self):
962 """rename upn of all users"""
963 for user in self.users:
964 found = self._find_user(user["name"])
965 old_upn = "%s" % found.get("userPrincipalName")
966 valid_suffix = old_upn.split('@')[1] # samba.example.com
968 valid_new_upn = "new_%s@%s" % (user["name"], valid_suffix)
969 invalid_new_upn = "%s@invalid.suffix" + user["name"]
971 # trying to set invalid upn
972 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
975 self.assertCmdFail(result)
976 self.assertIn('is not a valid upn', err)
979 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
982 self.assertCmdSuccess(result, out, err)
983 self.assertEqual(err, "", "Shouldn't be any error messages")
984 self.assertIn('successfully', out)
986 found = self._find_user(user["name"])
987 self.assertEqual("%s" % found.get("userPrincipalName"), valid_new_upn)
989 # trying to remove upn
990 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
992 self.assertCmdFail(result)
993 self.assertIn('is not a valid upn', err)
996 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
997 "--upn=%s" % old_upn)
998 self.assertCmdSuccess(result, out, err)
1000 def test_getpwent(self):
1004 self.skipTest("Skipping getpwent test, no 'pwd' module available")
1007 # get the current user's data for the test
1010 u = pwd.getpwuid(uid)
1012 self.skipTest("Skipping getpwent test, current EUID not found in NSS")
1016 # samba-tool user create command didn't support users with empty gecos if none is
1017 # specified on the command line and the user hasn't one in the passwd file it
1018 # will fail, so let's add some contents
1021 if (gecos is None or len(gecos) == 0):
1023 user = self._randomPosixUser({
1032 # Remove user if it already exists
1033 if self._find_user(u[0]):
1034 self.runsubcmd("user", "delete", u[0])
1035 # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
1036 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
1037 "--surname=%s" % user["surname"],
1038 "--given-name=%s" % user["given-name"],
1039 "--job-title=%s" % user["job-title"],
1040 "--department=%s" % user["department"],
1041 "--description=%s" % user["description"],
1042 "--company=%s" % user["company"],
1043 "--gecos=%s" % user["gecos"],
1044 "--rfc2307-from-nss",
1045 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1046 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1048 self.assertCmdSuccess(result, out, err)
1049 self.assertEqual(err, "", "Shouldn't be any error messages")
1050 self.assertIn("User '%s' added successfully" % user["name"], out)
1052 self._check_posix_user(user)
1053 self.runsubcmd("user", "delete", user["name"])
1055 # Check if overriding the attributes from NSS with explicit values works
1057 # get a user with all random posix attributes
1058 user = self._randomPosixUser({"name": u[0]})
1060 # Remove user if it already exists
1061 if self._find_user(u[0]):
1062 self.runsubcmd("user", "delete", u[0])
1063 # create a user with posix attributes from nss but override all of them with the
1064 # random ones just obtained
1065 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
1066 "--surname=%s" % user["surname"],
1067 "--given-name=%s" % user["given-name"],
1068 "--job-title=%s" % user["job-title"],
1069 "--department=%s" % user["department"],
1070 "--description=%s" % user["description"],
1071 "--company=%s" % user["company"],
1072 "--rfc2307-from-nss",
1073 "--gecos=%s" % user["gecos"],
1074 "--login-shell=%s" % user["loginShell"],
1075 "--uid=%s" % user["uid"],
1076 "--uid-number=%s" % user["uidNumber"],
1077 "--gid-number=%s" % user["gidNumber"],
1078 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1079 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1081 self.assertCmdSuccess(result, out, err)
1082 self.assertEqual(err, "", "Shouldn't be any error messages")
1083 self.assertIn("User '%s' added successfully" % user["name"], out)
1085 self._check_posix_user(user)
1086 self.runsubcmd("user", "delete", user["name"])
1088 # Test: samba-tool user unlock
1089 # This test does not verify that the command unlocks the user, it just
1090 # tests the command itself. The unlock test, which unlocks locked users,
1091 # is located in the 'samba4.ldap.password_lockout' test in
1092 # source4/dsdb/tests/python/password_lockout.py
1093 def test_unlock(self):
1095 # try to unlock a nonexistent user, this should fail
1096 nonexistentusername = "userdoesnotexist"
1097 (result, out, err) = self.runsubcmd(
1098 "user", "unlock", nonexistentusername)
1099 self.assertCmdFail(result, "Ensure that unlock nonexistent user fails")
1100 self.assertIn("Failed to unlock user '%s'" % nonexistentusername, err)
1101 self.assertIn("Unable to find user", err)
1103 # try to unlock with insufficient permissions, this should fail
1104 unprivileged_username = "unprivilegedunlockuser"
1105 unlocktest_username = "usertounlock"
1107 self.runsubcmd("user", "add", unprivileged_username, "Passw0rd")
1108 self.runsubcmd("user", "add", unlocktest_username, "Passw0rd")
1110 (result, out, err) = self.runsubcmd(
1111 "user", "unlock", unlocktest_username,
1112 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1113 "-U%s%%%s" % (unprivileged_username,
1115 self.assertCmdFail(result, "Fail with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
1116 self.assertIn("Failed to unlock user '%s'" % unlocktest_username, err)
1117 self.assertIn("LDAP error 50 LDAP_INSUFFICIENT_ACCESS_RIGHTS", err)
1119 self.runsubcmd("user", "delete", unprivileged_username)
1120 self.runsubcmd("user", "delete", unlocktest_username)
1122 # run unlock against test users
1123 for user in self.users:
1124 (result, out, err) = self.runsubcmd(
1125 "user", "unlock", user["name"])
1126 self.assertCmdSuccess(result, out, err, "Error running user unlock")
1127 self.assertEqual(err, "", "Shouldn't be any error messages")
1129 def _randomUser(self, base=None):
1130 """create a user with random attribute values, you can specify base attributes"""
1134 "name": self.randomName(),
1135 "password": self.random_password(16),
1136 "surname": self.randomName(),
1137 "given-name": self.randomName(),
1138 "job-title": self.randomName(),
1139 "department": self.randomName(),
1140 "company": self.randomName(),
1141 "description": self.randomName(count=100),
1142 "createUserFn": self._create_user,
1143 "checkUserFn": self._check_user,
1148 def _randomPosixUser(self, base=None):
1149 """create a user with random attribute values and additional RFC2307
1150 attributes, you can specify base attributes"""
1153 user = self._randomUser({})
1156 "uid": self.randomName(),
1157 "loginShell": self.randomName(),
1158 "gecos": self.randomName(),
1159 "uidNumber": self.randomXid(),
1160 "gidNumber": self.randomXid(),
1161 "createUserFn": self._create_posix_user,
1162 "checkUserFn": self._check_posix_user,
1164 user.update(posixAttributes)
1168 def _randomUnixUser(self, base=None):
1169 """create a user with random attribute values and additional RFC2307
1170 attributes, you can specify base attributes"""
1173 user = self._randomUser({})
1176 "uidNumber": self.randomXid(),
1177 "gidNumber": self.randomXid(),
1178 "uid": self.randomName(),
1179 "loginShell": self.randomName(),
1180 "gecos": self.randomName(),
1181 "createUserFn": self._create_unix_user,
1182 "checkUserFn": self._check_unix_user,
1184 user.update(posixAttributes)
1188 def _check_user(self, user):
1189 """ check if a user from SamDB has the same attributes as its template """
1190 found = self._find_user(user["name"])
1192 self.assertEqual("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
1193 self.assertEqual("%s" % found.get("title"), user["job-title"])
1194 self.assertEqual("%s" % found.get("company"), user["company"])
1195 self.assertEqual("%s" % found.get("description"), user["description"])
1196 self.assertEqual("%s" % found.get("department"), user["department"])
1198 def _check_posix_user(self, user):
1199 """ check if a posix_user from SamDB has the same attributes as its template """
1200 found = self._find_user(user["name"])
1202 self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
1203 self.assertEqual("%s" % found.get("gecos"), user["gecos"])
1204 self.assertEqual("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
1205 self.assertEqual("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
1206 self.assertEqual("%s" % found.get("uid"), user["uid"])
1207 self._check_user(user)
1209 def _check_unix_user(self, user):
1210 """ check if a unix_user from SamDB has the same attributes as its
1212 found = self._find_user(user["name"])
1214 self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
1215 self.assertEqual("%s" % found.get("gecos"), user["gecos"])
1216 self.assertEqual("%s" % found.get("uidNumber"), "%s" %
1218 self.assertEqual("%s" % found.get("gidNumber"), "%s" %
1220 self.assertEqual("%s" % found.get("uid"), user["uid"])
1221 self.assertIn('/home/test/', "%s" % found.get("unixHomeDirectory"))
1222 self._check_user(user)
1224 def _create_user(self, user):
1225 return self.runsubcmd("user", "add", user["name"], user["password"],
1226 "--surname=%s" % user["surname"],
1227 "--given-name=%s" % user["given-name"],
1228 "--job-title=%s" % user["job-title"],
1229 "--department=%s" % user["department"],
1230 "--description=%s" % user["description"],
1231 "--company=%s" % user["company"],
1232 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1233 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1235 def _create_posix_user(self, user):
1236 """ create a new user with RFC2307 attributes """
1237 return self.runsubcmd("user", "create", user["name"], user["password"],
1238 "--surname=%s" % user["surname"],
1239 "--given-name=%s" % user["given-name"],
1240 "--job-title=%s" % user["job-title"],
1241 "--department=%s" % user["department"],
1242 "--description=%s" % user["description"],
1243 "--company=%s" % user["company"],
1244 "--gecos=%s" % user["gecos"],
1245 "--login-shell=%s" % user["loginShell"],
1246 "--uid=%s" % user["uid"],
1247 "--uid-number=%s" % user["uidNumber"],
1248 "--gid-number=%s" % user["gidNumber"],
1249 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1250 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1252 def _create_unix_user(self, user):
1253 """ Add RFC2307 attributes to a user"""
1254 self._create_user(user)
1255 return self.runsubcmd("user", "addunixattrs", user["name"],
1256 "%s" % user["uidNumber"],
1257 "--gid-number=%s" % user["gidNumber"],
1258 "--gecos=%s" % user["gecos"],
1259 "--login-shell=%s" % user["loginShell"],
1260 "--uid=%s" % user["uid"],
1261 "-H", "ldap://%s" % os.environ["DC_SERVER"],
1262 "-U%s%%%s" % (os.environ["DC_USERNAME"],
1263 os.environ["DC_PASSWORD"]))
1265 def _find_user(self, name):
1266 search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
1267 userlist = self.samdb.search(base=self.samdb.domain_dn(),
1268 scope=ldb.SCOPE_SUBTREE,
1269 expression=search_filter)