1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from samba.tests.samba_tool.base import SambaToolCmdTest
28 from samba.ndr import ndr_unpack
29 from samba.dcerpc import drsblobs
30 from samba.compat import get_bytes
31 from samba.compat import get_string
32 from samba.tests import env_loadparm
35 class UserCmdTestCase(SambaToolCmdTest):
36 """Tests for samba-tool user subcommands"""
41 super(UserCmdTestCase, self).setUp()
42 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
43 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
45 self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
46 self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
47 self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
48 self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
49 self.users.append(self._randomPosixUser({"name": "posixuser1"}))
50 self.users.append(self._randomPosixUser({"name": "posixuser2"}))
51 self.users.append(self._randomPosixUser({"name": "posixuser3"}))
52 self.users.append(self._randomPosixUser({"name": "posixuser4"}))
53 self.users.append(self._randomUnixUser({"name": "unixuser1"}))
54 self.users.append(self._randomUnixUser({"name": "unixuser2"}))
55 self.users.append(self._randomUnixUser({"name": "unixuser3"}))
56 self.users.append(self._randomUnixUser({"name": "unixuser4"}))
58 # setup the 12 users and ensure they are correct
59 for user in self.users:
60 (result, out, err) = user["createUserFn"](user)
62 self.assertCmdSuccess(result, out, err)
63 self.assertEquals(err, "", "Shouldn't be any error messages")
64 if 'unix' in user["name"]:
65 self.assertIn("Modified User '%s' successfully" % user["name"],
68 self.assertIn("User '%s' created successfully" % user["name"],
71 user["checkUserFn"](user)
74 super(UserCmdTestCase, self).tearDown()
75 # clean up all the left over users, just in case
76 for user in self.users:
77 if self._find_user(user["name"]):
78 self.runsubcmd("user", "delete", user["name"])
80 # second run of this test
81 # the cache is still there and '--cache-ldb-initialize'
83 cachedb = lp.private_path("user-syncpasswords-cache.ldb")
84 if os.path.exists(cachedb):
87 def test_newuser(self):
88 # try to add all the users again, this should fail
89 for user in self.users:
90 (result, out, err) = self._create_user(user)
91 self.assertCmdFail(result, "Ensure that create user fails")
92 self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
94 # try to delete all the 4 users we just added
95 for user in self.users:
96 (result, out, err) = self.runsubcmd("user", "delete", user["name"])
97 self.assertCmdSuccess(result, out, err, "Can we delete users")
98 found = self._find_user(user["name"])
99 self.assertIsNone(found)
101 # test adding users with --use-username-as-cn
102 for user in self.users:
103 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
104 "--use-username-as-cn",
105 "--surname=%s" % user["surname"],
106 "--given-name=%s" % user["given-name"],
107 "--job-title=%s" % user["job-title"],
108 "--department=%s" % user["department"],
109 "--description=%s" % user["description"],
110 "--company=%s" % user["company"],
111 "-H", "ldap://%s" % os.environ["DC_SERVER"],
112 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
114 self.assertCmdSuccess(result, out, err)
115 self.assertEquals(err, "", "Shouldn't be any error messages")
116 self.assertIn("User '%s' created successfully" % user["name"], out)
118 found = self._find_user(user["name"])
120 self.assertEquals("%s" % found.get("cn"), "%(name)s" % user)
121 self.assertEquals("%s" % found.get("name"), "%(name)s" % user)
123 def _verify_supplementalCredentials(self, ldif,
126 msgs = self.samdb.parse_ldif(ldif)
127 (changetype, obj) = next(msgs)
129 self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
130 sc_blob = obj["supplementalCredentials"][0]
131 sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
133 self.assertGreaterEqual(sc.sub.num_packages,
134 min_packages, "min_packages check")
135 self.assertLessEqual(sc.sub.num_packages,
136 max_packages, "max_packages check")
138 if max_packages == 0:
141 def find_package(packages, name, start_idx=0):
142 for i in range(start_idx, len(packages)):
143 if packages[i].name == name:
144 return (i, packages[i])
147 # The ordering is this
149 # Primary:Kerberos-Newer-Keys (optional)
152 # Primary:CLEARTEXT (optional)
153 # Primary:SambaGPG (optional)
155 # And the 'Packages' package is insert before the last
159 (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
160 self.assertIsNotNone(pp, "Packages required")
161 self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
162 "Packages needs to be at num_packages - 1")
164 (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
166 if knidx is not None:
167 self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
172 (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
174 self.assertIsNotNone(pp, "Primary:Kerberos required")
175 self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
180 (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
182 self.assertIsNotNone(pp, "Primary:WDigest required")
183 self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
188 (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
191 self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
196 (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
199 self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
204 self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
206 def test_setpassword(self):
207 for user in self.users:
208 newpasswd = self.random_password(16)
209 (result, out, err) = self.runsubcmd("user", "setpassword",
211 "--newpassword=%s" % newpasswd,
212 "-H", "ldap://%s" % os.environ["DC_SERVER"],
213 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
214 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
215 self.assertEquals(err, "", "setpassword with url")
216 self.assertMatch(out, "Changed password OK", "setpassword with url")
218 attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
219 (result, out, err) = self.runsubcmd("user", "syncpasswords",
220 "--cache-ldb-initialize",
221 "--attributes=%s" % attributes,
222 "--decrypt-samba-gpg")
223 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
224 self.assertEqual(err, "", "getpassword without url")
226 "objectClass": {"value": "userSyncPasswords"},
229 "dirsyncAttribute": {},
230 "dirsyncControl": {"value": "dirsync:1:0:0"},
231 "passwordAttribute": {},
232 "decryptSambaGPG": {},
235 for a in cache_attrs.keys():
236 v = cache_attrs[a].get("value", "")
237 self.assertMatch(out, "%s: %s" % (a, v),
238 "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
240 (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
241 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
242 self.assertEqual(err, "", "syncpasswords --no-wait")
243 self.assertMatch(out, "dirsync_loop(): results 0",
244 "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
245 for user in self.users:
246 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
247 "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
249 for user in self.users:
250 newpasswd = self.random_password(16)
251 creds = credentials.Credentials()
252 creds.set_anonymous()
253 creds.set_password(newpasswd)
254 nthash = creds.get_nt_hash()
255 unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
256 virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8')
257 virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8')
259 (result, out, err) = self.runsubcmd("user", "setpassword",
261 "--newpassword=%s" % newpasswd)
262 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
263 self.assertEquals(err, "", "setpassword without url")
264 self.assertMatch(out, "Changed password OK", "setpassword without url")
266 (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
267 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
268 self.assertEqual(err, "", "syncpasswords --no-wait")
269 self.assertMatch(out, "dirsync_loop(): results 0",
270 "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
271 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
272 "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
273 self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
274 "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
275 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
276 "getpassword unicodePwd: out[%s]" % out)
277 self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
278 "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
279 self.assertMatch(out, "supplementalCredentials:: ",
280 "getpassword supplementalCredentials: out[%s]" % out)
281 if "virtualSambaGPG:: " in out:
282 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
283 "getpassword virtualClearTextUTF8: out[%s]" % out)
284 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
285 "getpassword virtualClearTextUTF16: out[%s]" % out)
286 self.assertMatch(out, "virtualSSHA: ",
287 "getpassword virtualSSHA: out[%s]" % out)
289 (result, out, err) = self.runsubcmd("user", "getpassword",
291 "--attributes=%s" % attributes,
292 "--decrypt-samba-gpg")
293 self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
294 self.assertEqual(err, "", "getpassword without url")
295 self.assertMatch(out, "Got password OK", "getpassword without url")
296 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
297 "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
298 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
299 "getpassword unicodePwd: out[%s]" % out)
300 self.assertMatch(out, "supplementalCredentials:: ",
301 "getpassword supplementalCredentials: out[%s]" % out)
302 self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
303 if "virtualSambaGPG:: " in out:
304 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
305 "getpassword virtualClearTextUTF8: out[%s]" % out)
306 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
307 "getpassword virtualClearTextUTF16: out[%s]" % out)
308 self.assertMatch(out, "virtualSSHA: ",
309 "getpassword virtualSSHA: out[%s]" % out)
311 for user in self.users:
312 newpasswd = self.random_password(16)
313 (result, out, err) = self.runsubcmd("user", "setpassword",
315 "--newpassword=%s" % newpasswd,
316 "--must-change-at-next-login",
317 "-H", "ldap://%s" % os.environ["DC_SERVER"],
318 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
319 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
320 self.assertEquals(err, "", "setpassword with forced change")
321 self.assertMatch(out, "Changed password OK", "setpassword with forced change")
323 def test_setexpiry(self):
324 for user in self.users:
325 twodays = time.time() + (2 * 24 * 60 * 60)
327 (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
329 "-H", "ldap://%s" % os.environ["DC_SERVER"],
330 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
331 self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
332 self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
334 found = self._find_user(user["name"])
336 expires = nttime2unix(int("%s" % found.get("accountExpires")))
337 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
339 # TODO: renable this after the filter case is sorted out
340 if "filters are broken, bail now":
343 # now run the expiration based on a filter
344 fourdays = time.time() + (4 * 24 * 60 * 60)
345 (result, out, err) = self.runsubcmd("user", "setexpiry",
346 "--filter", "(&(objectClass=user)(company=comp2))",
348 "-H", "ldap://%s" % os.environ["DC_SERVER"],
349 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
350 self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
352 for user in self.users:
353 found = self._find_user(user["name"])
354 if ("%s" % found.get("company")) == "comp2":
355 expires = nttime2unix(int("%s" % found.get("accountExpires")))
356 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
358 expires = nttime2unix(int("%s" % found.get("accountExpires")))
359 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
362 (result, out, err) = self.runsubcmd("user", "list",
363 "-H", "ldap://%s" % os.environ["DC_SERVER"],
364 "-U%s%%%s" % (os.environ["DC_USERNAME"],
365 os.environ["DC_PASSWORD"]))
366 self.assertCmdSuccess(result, out, err, "Error running list")
368 search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
369 (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
371 userlist = self.samdb.search(base=self.samdb.domain_dn(),
372 scope=ldb.SCOPE_SUBTREE,
373 expression=search_filter,
374 attrs=["samaccountname"])
376 self.assertTrue(len(userlist) > 0, "no users found in samdb")
378 for userobj in userlist:
379 name = str(userobj.get("samaccountname", idx=0))
380 found = self.assertMatch(out, name,
381 "user '%s' not found" % name)
384 for user in self.users:
385 (result, out, err) = self.runsubcmd(
386 "user", "show", user["name"],
387 "--attributes=sAMAccountName,company",
388 "-H", "ldap://%s" % os.environ["DC_SERVER"],
389 "-U%s%%%s" % (os.environ["DC_USERNAME"],
390 os.environ["DC_PASSWORD"]))
391 self.assertCmdSuccess(result, out, err, "Error running show")
393 expected_out = """dn: CN=%s %s,CN=Users,%s
397 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
398 user["company"], user["name"])
400 self.assertEqual(out, expected_out,
401 "Unexpected show output for user '%s'" %
405 full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
406 (result, out, err) = self.runsubcmd("ou", "create", full_ou_dn)
407 self.assertCmdSuccess(result, out, err)
408 self.assertEquals(err, "", "There shouldn't be any error message")
409 self.assertIn('Created ou "%s"' % full_ou_dn, out)
411 for user in self.users:
412 (result, out, err) = self.runsubcmd(
413 "user", "move", user["name"], full_ou_dn)
414 self.assertCmdSuccess(result, out, err, "Error running move")
415 self.assertIn('Moved user "%s" into "%s"' %
416 (user["name"], full_ou_dn), out)
418 # Should fail as users objects are in OU
419 (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
420 self.assertCmdFail(result)
421 self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
422 "(it has %d children)!") % len(self.users), err)
424 for user in self.users:
425 new_dn = "CN=Users,%s" % self.samdb.domain_dn()
426 (result, out, err) = self.runsubcmd(
427 "user", "move", user["name"], new_dn)
428 self.assertCmdSuccess(result, out, err, "Error running move")
429 self.assertIn('Moved user "%s" into "%s"' %
430 (user["name"], new_dn), out)
432 (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
433 self.assertCmdSuccess(result, out, err,
434 "Failed to delete ou '%s'" % full_ou_dn)
436 def test_getpwent(self):
440 self.skipTest("Skipping getpwent test, no 'pwd' module available")
443 # get the current user's data for the test
446 u = pwd.getpwuid(uid)
448 self.skipTest("Skipping getpwent test, current EUID not found in NSS")
452 # samba-tool user create command didn't support users with empty gecos if none is
453 # specified on the command line and the user hasn't one in the passwd file it
454 # will fail, so let's add some contents
457 if (gecos is None or len(gecos) == 0):
459 user = self._randomPosixUser({
467 # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
468 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
469 "--surname=%s" % user["surname"],
470 "--given-name=%s" % user["given-name"],
471 "--job-title=%s" % user["job-title"],
472 "--department=%s" % user["department"],
473 "--description=%s" % user["description"],
474 "--company=%s" % user["company"],
475 "--gecos=%s" % user["gecos"],
476 "--rfc2307-from-nss",
477 "-H", "ldap://%s" % os.environ["DC_SERVER"],
478 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
480 self.assertCmdSuccess(result, out, err)
481 self.assertEquals(err, "", "Shouldn't be any error messages")
482 self.assertIn("User '%s' created successfully" % user["name"], out)
484 self._check_posix_user(user)
485 self.runsubcmd("user", "delete", user["name"])
487 # Check if overriding the attributes from NSS with explicit values works
489 # get a user with all random posix attributes
490 user = self._randomPosixUser({"name": u[0]})
491 # create a user with posix attributes from nss but override all of them with the
492 # random ones just obtained
493 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
494 "--surname=%s" % user["surname"],
495 "--given-name=%s" % user["given-name"],
496 "--job-title=%s" % user["job-title"],
497 "--department=%s" % user["department"],
498 "--description=%s" % user["description"],
499 "--company=%s" % user["company"],
500 "--rfc2307-from-nss",
501 "--gecos=%s" % user["gecos"],
502 "--login-shell=%s" % user["loginShell"],
503 "--uid=%s" % user["uid"],
504 "--uid-number=%s" % user["uidNumber"],
505 "--gid-number=%s" % user["gidNumber"],
506 "-H", "ldap://%s" % os.environ["DC_SERVER"],
507 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
509 self.assertCmdSuccess(result, out, err)
510 self.assertEquals(err, "", "Shouldn't be any error messages")
511 self.assertIn("User '%s' created successfully" % user["name"], out)
513 self._check_posix_user(user)
514 self.runsubcmd("user", "delete", user["name"])
516 def _randomUser(self, base={}):
517 """create a user with random attribute values, you can specify base attributes"""
519 "name": self.randomName(),
520 "password": self.random_password(16),
521 "surname": self.randomName(),
522 "given-name": self.randomName(),
523 "job-title": self.randomName(),
524 "department": self.randomName(),
525 "company": self.randomName(),
526 "description": self.randomName(count=100),
527 "createUserFn": self._create_user,
528 "checkUserFn": self._check_user,
533 def _randomPosixUser(self, base={}):
534 """create a user with random attribute values and additional RFC2307
535 attributes, you can specify base attributes"""
536 user = self._randomUser({})
539 "uid": self.randomName(),
540 "loginShell": self.randomName(),
541 "gecos": self.randomName(),
542 "uidNumber": self.randomXid(),
543 "gidNumber": self.randomXid(),
544 "createUserFn": self._create_posix_user,
545 "checkUserFn": self._check_posix_user,
547 user.update(posixAttributes)
551 def _randomUnixUser(self, base={}):
552 """create a user with random attribute values and additional RFC2307
553 attributes, you can specify base attributes"""
554 user = self._randomUser({})
557 "uidNumber": self.randomXid(),
558 "gidNumber": self.randomXid(),
559 "uid": self.randomName(),
560 "loginShell": self.randomName(),
561 "gecos": self.randomName(),
562 "createUserFn": self._create_unix_user,
563 "checkUserFn": self._check_unix_user,
565 user.update(posixAttributes)
569 def _check_user(self, user):
570 """ check if a user from SamDB has the same attributes as its template """
571 found = self._find_user(user["name"])
573 self.assertEquals("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
574 self.assertEquals("%s" % found.get("title"), user["job-title"])
575 self.assertEquals("%s" % found.get("company"), user["company"])
576 self.assertEquals("%s" % found.get("description"), user["description"])
577 self.assertEquals("%s" % found.get("department"), user["department"])
579 def _check_posix_user(self, user):
580 """ check if a posix_user from SamDB has the same attributes as its template """
581 found = self._find_user(user["name"])
583 self.assertEquals("%s" % found.get("loginShell"), user["loginShell"])
584 self.assertEquals("%s" % found.get("gecos"), user["gecos"])
585 self.assertEquals("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
586 self.assertEquals("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
587 self.assertEquals("%s" % found.get("uid"), user["uid"])
588 self._check_user(user)
590 def _check_unix_user(self, user):
591 """ check if a unix_user from SamDB has the same attributes as its
593 found = self._find_user(user["name"])
595 self.assertEquals("%s" % found.get("loginShell"), user["loginShell"])
596 self.assertEquals("%s" % found.get("gecos"), user["gecos"])
597 self.assertEquals("%s" % found.get("uidNumber"), "%s" %
599 self.assertEquals("%s" % found.get("gidNumber"), "%s" %
601 self.assertEquals("%s" % found.get("uid"), user["uid"])
602 self._check_user(user)
604 def _create_user(self, user):
605 return self.runsubcmd("user", "create", user["name"], user["password"],
606 "--surname=%s" % user["surname"],
607 "--given-name=%s" % user["given-name"],
608 "--job-title=%s" % user["job-title"],
609 "--department=%s" % user["department"],
610 "--description=%s" % user["description"],
611 "--company=%s" % user["company"],
612 "-H", "ldap://%s" % os.environ["DC_SERVER"],
613 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
615 def _create_posix_user(self, user):
616 """ create a new user with RFC2307 attributes """
617 return self.runsubcmd("user", "create", user["name"], user["password"],
618 "--surname=%s" % user["surname"],
619 "--given-name=%s" % user["given-name"],
620 "--job-title=%s" % user["job-title"],
621 "--department=%s" % user["department"],
622 "--description=%s" % user["description"],
623 "--company=%s" % user["company"],
624 "--gecos=%s" % user["gecos"],
625 "--login-shell=%s" % user["loginShell"],
626 "--uid=%s" % user["uid"],
627 "--uid-number=%s" % user["uidNumber"],
628 "--gid-number=%s" % user["gidNumber"],
629 "-H", "ldap://%s" % os.environ["DC_SERVER"],
630 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
632 def _create_unix_user(self, user):
633 """ Add RFC2307 attributes to a user"""
634 self._create_user(user)
635 return self.runsubcmd("user", "addunixattrs", user["name"],
636 "%s" % user["uidNumber"],
637 "--gid-number=%s" % user["gidNumber"],
638 "--gecos=%s" % user["gecos"],
639 "--login-shell=%s" % user["loginShell"],
640 "--uid=%s" % user["uid"],
641 "-H", "ldap://%s" % os.environ["DC_SERVER"],
642 "-U%s%%%s" % (os.environ["DC_USERNAME"],
643 os.environ["DC_PASSWORD"]))
645 def _find_user(self, name):
646 search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
647 userlist = self.samdb.search(base=self.samdb.domain_dn(),
648 scope=ldb.SCOPE_SUBTREE,
649 expression=search_filter)