samba-tool: Add facility to add rfc2307 attributes to an already created user or...
[samba.git] / python / samba / tests / samba_tool / user.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import time
20 import base64
21 import ldb
22 from samba.tests.samba_tool.base import SambaToolCmdTest
23 from samba import (
24         credentials,
25         nttime2unix,
26         dsdb
27         )
28 from samba.ndr import ndr_unpack
29 from samba.dcerpc import drsblobs
30 from samba.compat import get_bytes
31 from samba.compat import get_string
32 from samba.tests import env_loadparm
33
34
35 class UserCmdTestCase(SambaToolCmdTest):
36     """Tests for samba-tool user subcommands"""
37     users = []
38     samdb = None
39
40     def setUp(self):
41         super(UserCmdTestCase, self).setUp()
42         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
43                                    "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
44         self.users = []
45         self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
46         self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
47         self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
48         self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
49         self.users.append(self._randomPosixUser({"name": "posixuser1"}))
50         self.users.append(self._randomPosixUser({"name": "posixuser2"}))
51         self.users.append(self._randomPosixUser({"name": "posixuser3"}))
52         self.users.append(self._randomPosixUser({"name": "posixuser4"}))
53         self.users.append(self._randomUnixUser({"name": "unixuser1"}))
54         self.users.append(self._randomUnixUser({"name": "unixuser2"}))
55         self.users.append(self._randomUnixUser({"name": "unixuser3"}))
56         self.users.append(self._randomUnixUser({"name": "unixuser4"}))
57
58         # setup the 12 users and ensure they are correct
59         for user in self.users:
60             (result, out, err) = user["createUserFn"](user)
61
62             self.assertCmdSuccess(result, out, err)
63             self.assertEquals(err, "", "Shouldn't be any error messages")
64             if 'unix' in user["name"]:
65                 self.assertIn("Modified User '%s' successfully" % user["name"],
66                               out)
67             else:
68                 self.assertIn("User '%s' created successfully" % user["name"],
69                               out)
70
71             user["checkUserFn"](user)
72
73     def tearDown(self):
74         super(UserCmdTestCase, self).tearDown()
75         # clean up all the left over users, just in case
76         for user in self.users:
77             if self._find_user(user["name"]):
78                 self.runsubcmd("user", "delete", user["name"])
79         lp = env_loadparm()
80         # second run of this test
81         # the cache is still there and '--cache-ldb-initialize'
82         # will fail
83         cachedb = lp.private_path("user-syncpasswords-cache.ldb")
84         if os.path.exists(cachedb):
85             os.remove(cachedb)
86
87     def test_newuser(self):
88         # try to add all the users again, this should fail
89         for user in self.users:
90             (result, out, err) = self._create_user(user)
91             self.assertCmdFail(result, "Ensure that create user fails")
92             self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
93
94         # try to delete all the 4 users we just added
95         for user in self.users:
96             (result, out, err) = self.runsubcmd("user", "delete", user["name"])
97             self.assertCmdSuccess(result, out, err, "Can we delete users")
98             found = self._find_user(user["name"])
99             self.assertIsNone(found)
100
101         # test adding users with --use-username-as-cn
102         for user in self.users:
103             (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
104                                                 "--use-username-as-cn",
105                                                 "--surname=%s" % user["surname"],
106                                                 "--given-name=%s" % user["given-name"],
107                                                 "--job-title=%s" % user["job-title"],
108                                                 "--department=%s" % user["department"],
109                                                 "--description=%s" % user["description"],
110                                                 "--company=%s" % user["company"],
111                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
112                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
113
114             self.assertCmdSuccess(result, out, err)
115             self.assertEquals(err, "", "Shouldn't be any error messages")
116             self.assertIn("User '%s' created successfully" % user["name"], out)
117
118             found = self._find_user(user["name"])
119
120             self.assertEquals("%s" % found.get("cn"), "%(name)s" % user)
121             self.assertEquals("%s" % found.get("name"), "%(name)s" % user)
122
123     def _verify_supplementalCredentials(self, ldif,
124                                         min_packages=3,
125                                         max_packages=6):
126         msgs = self.samdb.parse_ldif(ldif)
127         (changetype, obj) = next(msgs)
128
129         self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
130         sc_blob = obj["supplementalCredentials"][0]
131         sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
132
133         self.assertGreaterEqual(sc.sub.num_packages,
134                                 min_packages, "min_packages check")
135         self.assertLessEqual(sc.sub.num_packages,
136                              max_packages, "max_packages check")
137
138         if max_packages == 0:
139             return
140
141         def find_package(packages, name, start_idx=0):
142             for i in range(start_idx, len(packages)):
143                 if packages[i].name == name:
144                     return (i, packages[i])
145             return (None, None)
146
147         # The ordering is this
148         #
149         # Primary:Kerberos-Newer-Keys (optional)
150         # Primary:Kerberos
151         # Primary:WDigest
152         # Primary:CLEARTEXT (optional)
153         # Primary:SambaGPG (optional)
154         #
155         # And the 'Packages' package is insert before the last
156         # other package.
157
158         nidx = 0
159         (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
160         self.assertIsNotNone(pp, "Packages required")
161         self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
162                          "Packages needs to be at num_packages - 1")
163
164         (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
165                                     start_idx=nidx)
166         if knidx is not None:
167             self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
168             nidx = nidx + 1
169             if nidx == pidx:
170                 nidx = nidx + 1
171
172         (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
173                                   start_idx=nidx)
174         self.assertIsNotNone(pp, "Primary:Kerberos required")
175         self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
176         nidx = nidx + 1
177         if nidx == pidx:
178             nidx = nidx + 1
179
180         (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
181                                   start_idx=nidx)
182         self.assertIsNotNone(pp, "Primary:WDigest required")
183         self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
184         nidx = nidx + 1
185         if nidx == pidx:
186             nidx = nidx + 1
187
188         (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
189                                   start_idx=nidx)
190         if cidx is not None:
191             self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
192             nidx = nidx + 1
193             if nidx == pidx:
194                 nidx = nidx + 1
195
196         (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
197                                   start_idx=nidx)
198         if gidx is not None:
199             self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
200             nidx = nidx + 1
201             if nidx == pidx:
202                 nidx = nidx + 1
203
204         self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
205
206     def test_setpassword(self):
207         for user in self.users:
208             newpasswd = self.random_password(16)
209             (result, out, err) = self.runsubcmd("user", "setpassword",
210                                                 user["name"],
211                                                 "--newpassword=%s" % newpasswd,
212                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
213                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
214             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
215             self.assertEquals(err, "", "setpassword with url")
216             self.assertMatch(out, "Changed password OK", "setpassword with url")
217
218         attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
219         (result, out, err) = self.runsubcmd("user", "syncpasswords",
220                                             "--cache-ldb-initialize",
221                                             "--attributes=%s" % attributes,
222                                             "--decrypt-samba-gpg")
223         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
224         self.assertEqual(err, "", "getpassword without url")
225         cache_attrs = {
226             "objectClass": {"value": "userSyncPasswords"},
227             "samdbUrl": {},
228             "dirsyncFilter": {},
229             "dirsyncAttribute": {},
230             "dirsyncControl": {"value": "dirsync:1:0:0"},
231             "passwordAttribute": {},
232             "decryptSambaGPG": {},
233             "currentTime": {},
234         }
235         for a in cache_attrs.keys():
236             v = cache_attrs[a].get("value", "")
237             self.assertMatch(out, "%s: %s" % (a, v),
238                              "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
239
240         (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
241         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
242         self.assertEqual(err, "", "syncpasswords --no-wait")
243         self.assertMatch(out, "dirsync_loop(): results 0",
244                          "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
245         for user in self.users:
246             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
247                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
248
249         for user in self.users:
250             newpasswd = self.random_password(16)
251             creds = credentials.Credentials()
252             creds.set_anonymous()
253             creds.set_password(newpasswd)
254             nthash = creds.get_nt_hash()
255             unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
256             virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8')
257             virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8')
258
259             (result, out, err) = self.runsubcmd("user", "setpassword",
260                                                 user["name"],
261                                                 "--newpassword=%s" % newpasswd)
262             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
263             self.assertEquals(err, "", "setpassword without url")
264             self.assertMatch(out, "Changed password OK", "setpassword without url")
265
266             (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
267             self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
268             self.assertEqual(err, "", "syncpasswords --no-wait")
269             self.assertMatch(out, "dirsync_loop(): results 0",
270                              "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
271             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
272                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
273             self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
274                              "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
275             self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
276                              "getpassword unicodePwd: out[%s]" % out)
277             self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
278                              "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
279             self.assertMatch(out, "supplementalCredentials:: ",
280                              "getpassword supplementalCredentials: out[%s]" % out)
281             if "virtualSambaGPG:: " in out:
282                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
283                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
284                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
285                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
286                 self.assertMatch(out, "virtualSSHA: ",
287                                  "getpassword virtualSSHA: out[%s]" % out)
288
289             (result, out, err) = self.runsubcmd("user", "getpassword",
290                                                 user["name"],
291                                                 "--attributes=%s" % attributes,
292                                                 "--decrypt-samba-gpg")
293             self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
294             self.assertEqual(err, "", "getpassword without url")
295             self.assertMatch(out, "Got password OK", "getpassword without url")
296             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
297                              "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
298             self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
299                              "getpassword unicodePwd: out[%s]" % out)
300             self.assertMatch(out, "supplementalCredentials:: ",
301                              "getpassword supplementalCredentials: out[%s]" % out)
302             self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
303             if "virtualSambaGPG:: " in out:
304                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
305                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
306                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
307                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
308                 self.assertMatch(out, "virtualSSHA: ",
309                                  "getpassword virtualSSHA: out[%s]" % out)
310
311         for user in self.users:
312             newpasswd = self.random_password(16)
313             (result, out, err) = self.runsubcmd("user", "setpassword",
314                                                 user["name"],
315                                                 "--newpassword=%s" % newpasswd,
316                                                 "--must-change-at-next-login",
317                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
318                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
319             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
320             self.assertEquals(err, "", "setpassword with forced change")
321             self.assertMatch(out, "Changed password OK", "setpassword with forced change")
322
323     def test_setexpiry(self):
324         for user in self.users:
325             twodays = time.time() + (2 * 24 * 60 * 60)
326
327             (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
328                                                 "--days=2",
329                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
330                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
331             self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
332             self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
333
334             found = self._find_user(user["name"])
335
336             expires = nttime2unix(int("%s" % found.get("accountExpires")))
337             self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
338
339         # TODO: renable this after the filter case is sorted out
340         if "filters are broken, bail now":
341             return
342
343         # now run the expiration based on a filter
344         fourdays = time.time() + (4 * 24 * 60 * 60)
345         (result, out, err) = self.runsubcmd("user", "setexpiry",
346                                             "--filter", "(&(objectClass=user)(company=comp2))",
347                                             "--days=4",
348                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
349                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
350         self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
351
352         for user in self.users:
353             found = self._find_user(user["name"])
354             if ("%s" % found.get("company")) == "comp2":
355                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
356                 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
357             else:
358                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
359                 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
360
361     def test_list(self):
362         (result, out, err) = self.runsubcmd("user", "list",
363                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
364                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
365                                                           os.environ["DC_PASSWORD"]))
366         self.assertCmdSuccess(result, out, err, "Error running list")
367
368         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
369                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
370
371         userlist = self.samdb.search(base=self.samdb.domain_dn(),
372                                      scope=ldb.SCOPE_SUBTREE,
373                                      expression=search_filter,
374                                      attrs=["samaccountname"])
375
376         self.assertTrue(len(userlist) > 0, "no users found in samdb")
377
378         for userobj in userlist:
379             name = str(userobj.get("samaccountname", idx=0))
380             found = self.assertMatch(out, name,
381                                      "user '%s' not found" % name)
382
383     def test_show(self):
384         for user in self.users:
385             (result, out, err) = self.runsubcmd(
386                 "user", "show", user["name"],
387                 "--attributes=sAMAccountName,company",
388                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
389                 "-U%s%%%s" % (os.environ["DC_USERNAME"],
390                               os.environ["DC_PASSWORD"]))
391             self.assertCmdSuccess(result, out, err, "Error running show")
392
393             expected_out = """dn: CN=%s %s,CN=Users,%s
394 company: %s
395 sAMAccountName: %s
396
397 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
398                 user["company"], user["name"])
399
400             self.assertEqual(out, expected_out,
401                              "Unexpected show output for user '%s'" %
402                              user["name"])
403
404     def test_move(self):
405         full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
406         (result, out, err) = self.runsubcmd("ou", "create", full_ou_dn)
407         self.assertCmdSuccess(result, out, err)
408         self.assertEquals(err, "", "There shouldn't be any error message")
409         self.assertIn('Created ou "%s"' % full_ou_dn, out)
410
411         for user in self.users:
412             (result, out, err) = self.runsubcmd(
413                 "user", "move", user["name"], full_ou_dn)
414             self.assertCmdSuccess(result, out, err, "Error running move")
415             self.assertIn('Moved user "%s" into "%s"' %
416                           (user["name"], full_ou_dn), out)
417
418         # Should fail as users objects are in OU
419         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
420         self.assertCmdFail(result)
421         self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
422                        "(it has %d children)!") % len(self.users), err)
423
424         for user in self.users:
425             new_dn = "CN=Users,%s" % self.samdb.domain_dn()
426             (result, out, err) = self.runsubcmd(
427                 "user", "move", user["name"], new_dn)
428             self.assertCmdSuccess(result, out, err, "Error running move")
429             self.assertIn('Moved user "%s" into "%s"' %
430                           (user["name"], new_dn), out)
431
432         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
433         self.assertCmdSuccess(result, out, err,
434                               "Failed to delete ou '%s'" % full_ou_dn)
435
436     def test_getpwent(self):
437         try:
438             import pwd
439         except ImportError:
440             self.skipTest("Skipping getpwent test, no 'pwd' module available")
441             return
442
443         # get the current user's data for the test
444         uid = os.geteuid()
445         try:
446             u = pwd.getpwuid(uid)
447         except KeyError:
448             self.skipTest("Skipping getpwent test, current EUID not found in NSS")
449             return
450
451
452 # samba-tool user create command didn't support users with empty gecos if none is
453 # specified on the command line and the user hasn't one in the passwd file it
454 # will fail, so let's add some contents
455
456         gecos = u[4]
457         if (gecos is None or len(gecos) == 0):
458             gecos = "Foo GECOS"
459         user = self._randomPosixUser({
460                         "name": u[0],
461                         "uid": u[0],
462                         "uidNumber": u[2],
463                         "gidNumber": u[3],
464                         "gecos": gecos,
465                         "loginShell": u[6],
466                         })
467         # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
468         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
469                                             "--surname=%s" % user["surname"],
470                                             "--given-name=%s" % user["given-name"],
471                                             "--job-title=%s" % user["job-title"],
472                                             "--department=%s" % user["department"],
473                                             "--description=%s" % user["description"],
474                                             "--company=%s" % user["company"],
475                                             "--gecos=%s" % user["gecos"],
476                                             "--rfc2307-from-nss",
477                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
478                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
479
480         self.assertCmdSuccess(result, out, err)
481         self.assertEquals(err, "", "Shouldn't be any error messages")
482         self.assertIn("User '%s' created successfully" % user["name"], out)
483
484         self._check_posix_user(user)
485         self.runsubcmd("user", "delete", user["name"])
486
487         # Check if overriding the attributes from NSS with explicit values works
488         #
489         # get a user with all random posix attributes
490         user = self._randomPosixUser({"name": u[0]})
491         # create a user with posix attributes from nss but override all of them with the
492         # random ones just obtained
493         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
494                                             "--surname=%s" % user["surname"],
495                                             "--given-name=%s" % user["given-name"],
496                                             "--job-title=%s" % user["job-title"],
497                                             "--department=%s" % user["department"],
498                                             "--description=%s" % user["description"],
499                                             "--company=%s" % user["company"],
500                                             "--rfc2307-from-nss",
501                                             "--gecos=%s" % user["gecos"],
502                                             "--login-shell=%s" % user["loginShell"],
503                                             "--uid=%s" % user["uid"],
504                                             "--uid-number=%s" % user["uidNumber"],
505                                             "--gid-number=%s" % user["gidNumber"],
506                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
507                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
508
509         self.assertCmdSuccess(result, out, err)
510         self.assertEquals(err, "", "Shouldn't be any error messages")
511         self.assertIn("User '%s' created successfully" % user["name"], out)
512
513         self._check_posix_user(user)
514         self.runsubcmd("user", "delete", user["name"])
515
516     def _randomUser(self, base={}):
517         """create a user with random attribute values, you can specify base attributes"""
518         user = {
519             "name": self.randomName(),
520             "password": self.random_password(16),
521             "surname": self.randomName(),
522             "given-name": self.randomName(),
523             "job-title": self.randomName(),
524             "department": self.randomName(),
525             "company": self.randomName(),
526             "description": self.randomName(count=100),
527             "createUserFn": self._create_user,
528             "checkUserFn": self._check_user,
529         }
530         user.update(base)
531         return user
532
533     def _randomPosixUser(self, base={}):
534         """create a user with random attribute values and additional RFC2307
535         attributes, you can specify base attributes"""
536         user = self._randomUser({})
537         user.update(base)
538         posixAttributes = {
539             "uid": self.randomName(),
540             "loginShell": self.randomName(),
541             "gecos": self.randomName(),
542             "uidNumber": self.randomXid(),
543             "gidNumber": self.randomXid(),
544             "createUserFn": self._create_posix_user,
545             "checkUserFn": self._check_posix_user,
546         }
547         user.update(posixAttributes)
548         user.update(base)
549         return user
550
551     def _randomUnixUser(self, base={}):
552         """create a user with random attribute values and additional RFC2307
553         attributes, you can specify base attributes"""
554         user = self._randomUser({})
555         user.update(base)
556         posixAttributes = {
557             "uidNumber": self.randomXid(),
558             "gidNumber": self.randomXid(),
559             "uid": self.randomName(),
560             "loginShell": self.randomName(),
561             "gecos": self.randomName(),
562             "createUserFn": self._create_unix_user,
563             "checkUserFn": self._check_unix_user,
564         }
565         user.update(posixAttributes)
566         user.update(base)
567         return user
568
569     def _check_user(self, user):
570         """ check if a user from SamDB has the same attributes as its template """
571         found = self._find_user(user["name"])
572
573         self.assertEquals("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
574         self.assertEquals("%s" % found.get("title"), user["job-title"])
575         self.assertEquals("%s" % found.get("company"), user["company"])
576         self.assertEquals("%s" % found.get("description"), user["description"])
577         self.assertEquals("%s" % found.get("department"), user["department"])
578
579     def _check_posix_user(self, user):
580         """ check if a posix_user from SamDB has the same attributes as its template """
581         found = self._find_user(user["name"])
582
583         self.assertEquals("%s" % found.get("loginShell"), user["loginShell"])
584         self.assertEquals("%s" % found.get("gecos"), user["gecos"])
585         self.assertEquals("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
586         self.assertEquals("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
587         self.assertEquals("%s" % found.get("uid"), user["uid"])
588         self._check_user(user)
589
590     def _check_unix_user(self, user):
591         """ check if a unix_user from SamDB has the same attributes as its
592 template """
593         found = self._find_user(user["name"])
594
595         self.assertEquals("%s" % found.get("loginShell"), user["loginShell"])
596         self.assertEquals("%s" % found.get("gecos"), user["gecos"])
597         self.assertEquals("%s" % found.get("uidNumber"), "%s" %
598                           user["uidNumber"])
599         self.assertEquals("%s" % found.get("gidNumber"), "%s" %
600                           user["gidNumber"])
601         self.assertEquals("%s" % found.get("uid"), user["uid"])
602         self._check_user(user)
603
604     def _create_user(self, user):
605         return self.runsubcmd("user", "create", user["name"], user["password"],
606                               "--surname=%s" % user["surname"],
607                               "--given-name=%s" % user["given-name"],
608                               "--job-title=%s" % user["job-title"],
609                               "--department=%s" % user["department"],
610                               "--description=%s" % user["description"],
611                               "--company=%s" % user["company"],
612                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
613                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
614
615     def _create_posix_user(self, user):
616         """ create a new user with RFC2307 attributes """
617         return self.runsubcmd("user", "create", user["name"], user["password"],
618                               "--surname=%s" % user["surname"],
619                               "--given-name=%s" % user["given-name"],
620                               "--job-title=%s" % user["job-title"],
621                               "--department=%s" % user["department"],
622                               "--description=%s" % user["description"],
623                               "--company=%s" % user["company"],
624                               "--gecos=%s" % user["gecos"],
625                               "--login-shell=%s" % user["loginShell"],
626                               "--uid=%s" % user["uid"],
627                               "--uid-number=%s" % user["uidNumber"],
628                               "--gid-number=%s" % user["gidNumber"],
629                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
630                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
631
632     def _create_unix_user(self, user):
633         """ Add RFC2307 attributes to a user"""
634         self._create_user(user)
635         return self.runsubcmd("user", "addunixattrs", user["name"],
636                               "%s" % user["uidNumber"],
637                               "--gid-number=%s" % user["gidNumber"],
638                               "--gecos=%s" % user["gecos"],
639                               "--login-shell=%s" % user["loginShell"],
640                               "--uid=%s" % user["uid"],
641                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
642                               "-U%s%%%s" % (os.environ["DC_USERNAME"],
643                                             os.environ["DC_PASSWORD"]))
644
645     def _find_user(self, name):
646         search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
647         userlist = self.samdb.search(base=self.samdb.domain_dn(),
648                                      scope=ldb.SCOPE_SUBTREE,
649                                      expression=search_filter)
650         if userlist:
651             return userlist[0]
652         else:
653             return None