PEP8: fix E127: continuation line over-indented for visual indent
[samba.git] / python / samba / tests / samba_tool / user.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import time
20 import base64
21 import ldb
22 from samba.tests.samba_tool.base import SambaToolCmdTest
23 from samba import (
24         credentials,
25         nttime2unix,
26         dsdb
27         )
28 from samba.ndr import ndr_unpack
29 from samba.dcerpc import drsblobs
30
31 class UserCmdTestCase(SambaToolCmdTest):
32     """Tests for samba-tool user subcommands"""
33     users = []
34     samdb = None
35
36     def setUp(self):
37         super(UserCmdTestCase, self).setUp()
38         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
39             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
40         self.users = []
41         self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
42         self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
43         self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
44         self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
45         self.users.append(self._randomPosixUser({"name": "posixuser1"}))
46         self.users.append(self._randomPosixUser({"name": "posixuser2"}))
47         self.users.append(self._randomPosixUser({"name": "posixuser3"}))
48         self.users.append(self._randomPosixUser({"name": "posixuser4"}))
49
50         # setup the 8 users and ensure they are correct
51         for user in self.users:
52             (result, out, err) = user["createUserFn"](user)
53
54             self.assertCmdSuccess(result, out, err)
55             self.assertEquals(err,"","Shouldn't be any error messages")
56             self.assertIn("User '%s' created successfully" % user["name"], out)
57
58             user["checkUserFn"](user)
59
60
61     def tearDown(self):
62         super(UserCmdTestCase, self).tearDown()
63         # clean up all the left over users, just in case
64         for user in self.users:
65             if self._find_user(user["name"]):
66                 self.runsubcmd("user", "delete", user["name"])
67
68
69     def test_newuser(self):
70         # try to add all the users again, this should fail
71         for user in self.users:
72             (result, out, err) = self._create_user(user)
73             self.assertCmdFail(result, "Ensure that create user fails")
74             self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
75
76         # try to delete all the 4 users we just added
77         for user in self.users:
78             (result, out, err) = self.runsubcmd("user", "delete", user["name"])
79             self.assertCmdSuccess(result, out, err, "Can we delete users")
80             found = self._find_user(user["name"])
81             self.assertIsNone(found)
82
83         # test adding users with --use-username-as-cn
84         for user in self.users:
85             (result, out, err) =  self.runsubcmd("user", "create", user["name"], user["password"],
86                                                  "--use-username-as-cn",
87                                                  "--surname=%s" % user["surname"],
88                                                  "--given-name=%s" % user["given-name"],
89                                                  "--job-title=%s" % user["job-title"],
90                                                  "--department=%s" % user["department"],
91                                                  "--description=%s" % user["description"],
92                                                  "--company=%s" % user["company"],
93                                                  "-H", "ldap://%s" % os.environ["DC_SERVER"],
94                                                  "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
95
96             self.assertCmdSuccess(result, out, err)
97             self.assertEquals(err,"","Shouldn't be any error messages")
98             self.assertIn("User '%s' created successfully" % user["name"], out)
99
100             found = self._find_user(user["name"])
101
102             self.assertEquals("%s" % found.get("cn"), "%(name)s" % user)
103             self.assertEquals("%s" % found.get("name"), "%(name)s" % user)
104
105     def _verify_supplementalCredentials(self, ldif,
106                                         min_packages=3,
107                                         max_packages=6):
108         msgs = self.samdb.parse_ldif(ldif)
109         (changetype, obj) = next(msgs)
110
111         self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
112         sc_blob = obj["supplementalCredentials"][0]
113         sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
114
115         self.assertGreaterEqual(sc.sub.num_packages,
116                                 min_packages, "min_packages check")
117         self.assertLessEqual(sc.sub.num_packages,
118                              max_packages, "max_packages check")
119
120         if max_packages == 0:
121             return
122
123         def find_package(packages, name, start_idx=0):
124             for i in range(start_idx, len(packages)):
125                 if packages[i].name == name:
126                     return (i, packages[i])
127             return (None, None)
128
129         # The ordering is this
130         #
131         # Primary:Kerberos-Newer-Keys (optional)
132         # Primary:Kerberos
133         # Primary:WDigest
134         # Primary:CLEARTEXT (optional)
135         # Primary:SambaGPG (optional)
136         #
137         # And the 'Packages' package is insert before the last
138         # other package.
139
140         nidx = 0
141         (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
142         self.assertIsNotNone(pp, "Packages required")
143         self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
144                          "Packages needs to be at num_packages - 1")
145
146         (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
147                                     start_idx=nidx)
148         if knidx is not None:
149             self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
150             nidx = nidx + 1
151             if nidx == pidx:
152                 nidx = nidx + 1
153
154         (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
155                                   start_idx=nidx)
156         self.assertIsNotNone(pp, "Primary:Kerberos required")
157         self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
158         nidx = nidx + 1
159         if nidx == pidx:
160             nidx = nidx + 1
161
162         (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
163                                   start_idx=nidx)
164         self.assertIsNotNone(pp, "Primary:WDigest required")
165         self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
166         nidx = nidx + 1
167         if nidx == pidx:
168             nidx = nidx + 1
169
170         (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
171                                   start_idx=nidx)
172         if cidx is not None:
173             self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
174             nidx = nidx + 1
175             if nidx == pidx:
176                 nidx = nidx + 1
177
178         (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
179                                   start_idx=nidx)
180         if gidx is not None:
181             self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
182             nidx = nidx + 1
183             if nidx == pidx:
184                 nidx = nidx + 1
185
186         self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
187
188     def test_setpassword(self):
189         for user in self.users:
190             newpasswd = self.randomPass()
191             (result, out, err) = self.runsubcmd("user", "setpassword",
192                                                 user["name"],
193                                                 "--newpassword=%s" % newpasswd,
194                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
195                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
196             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
197             self.assertEquals(err,"","setpassword with url")
198             self.assertMatch(out, "Changed password OK", "setpassword with url")
199
200         attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
201         (result, out, err) = self.runsubcmd("user", "syncpasswords",
202                                             "--cache-ldb-initialize",
203                                             "--attributes=%s" % attributes,
204                                             "--decrypt-samba-gpg")
205         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
206         self.assertEqual(err,"","getpassword without url")
207         cache_attrs = {
208             "objectClass": { "value": "userSyncPasswords" },
209             "samdbUrl": { },
210             "dirsyncFilter": { },
211             "dirsyncAttribute": { },
212             "dirsyncControl": { "value": "dirsync:1:0:0"},
213             "passwordAttribute": { },
214             "decryptSambaGPG": { },
215             "currentTime": { },
216         }
217         for a in cache_attrs.keys():
218             v = cache_attrs[a].get("value", "")
219             self.assertMatch(out, "%s: %s" % (a, v),
220                 "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
221
222         (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
223         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
224         self.assertEqual(err,"","syncpasswords --no-wait")
225         self.assertMatch(out, "dirsync_loop(): results 0",
226             "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
227         for user in self.users:
228             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
229                 "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
230
231         for user in self.users:
232             newpasswd = self.randomPass()
233             creds = credentials.Credentials()
234             creds.set_anonymous()
235             creds.set_password(newpasswd)
236             nthash = creds.get_nt_hash()
237             unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
238             virtualClearTextUTF8 = base64.b64encode(newpasswd).decode('utf8')
239             virtualClearTextUTF16 = base64.b64encode(unicode(newpasswd, 'utf-8').encode('utf-16-le')).decode('utf8')
240
241             (result, out, err) = self.runsubcmd("user", "setpassword",
242                                                 user["name"],
243                                                 "--newpassword=%s" % newpasswd)
244             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
245             self.assertEquals(err,"","setpassword without url")
246             self.assertMatch(out, "Changed password OK", "setpassword without url")
247
248             (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
249             self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
250             self.assertEqual(err,"","syncpasswords --no-wait")
251             self.assertMatch(out, "dirsync_loop(): results 0",
252                 "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
253             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
254                 "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
255             self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
256                     "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
257             self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
258                     "getpassword unicodePwd: out[%s]" % out)
259             self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
260                     "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
261             self.assertMatch(out, "supplementalCredentials:: ",
262                     "getpassword supplementalCredentials: out[%s]" % out)
263             if "virtualSambaGPG:: " in out:
264                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
265                     "getpassword virtualClearTextUTF8: out[%s]" % out)
266                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
267                     "getpassword virtualClearTextUTF16: out[%s]" % out)
268                 self.assertMatch(out, "virtualSSHA: ",
269                     "getpassword virtualSSHA: out[%s]" % out)
270
271             (result, out, err) = self.runsubcmd("user", "getpassword",
272                                                 user["name"],
273                                                 "--attributes=%s" % attributes,
274                                                 "--decrypt-samba-gpg")
275             self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
276             self.assertEqual(err,"","getpassword without url")
277             self.assertMatch(out, "Got password OK", "getpassword without url")
278             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
279                     "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
280             self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
281                     "getpassword unicodePwd: out[%s]" % out)
282             self.assertMatch(out, "supplementalCredentials:: ",
283                     "getpassword supplementalCredentials: out[%s]" % out)
284             self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
285             if "virtualSambaGPG:: " in out:
286                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
287                     "getpassword virtualClearTextUTF8: out[%s]" % out)
288                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
289                     "getpassword virtualClearTextUTF16: out[%s]" % out)
290                 self.assertMatch(out, "virtualSSHA: ",
291                     "getpassword virtualSSHA: out[%s]" % out)
292
293         for user in self.users:
294             newpasswd = self.randomPass()
295             (result, out, err) = self.runsubcmd("user", "setpassword",
296                                                 user["name"],
297                                                 "--newpassword=%s" % newpasswd,
298                                                 "--must-change-at-next-login",
299                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
300                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
301             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
302             self.assertEquals(err,"","setpassword with forced change")
303             self.assertMatch(out, "Changed password OK", "setpassword with forced change")
304
305
306
307
308     def test_setexpiry(self):
309         for user in self.users:
310             twodays = time.time() + (2 * 24 * 60 * 60)
311
312             (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
313                                                 "--days=2",
314                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
315                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
316             self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
317             self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
318
319             found = self._find_user(user["name"])
320
321             expires = nttime2unix(int("%s" % found.get("accountExpires")))
322             self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
323
324         # TODO: renable this after the filter case is sorted out
325         if "filters are broken, bail now":
326             return
327
328         # now run the expiration based on a filter
329         fourdays = time.time() + (4 * 24 * 60 * 60)
330         (result, out, err) = self.runsubcmd("user", "setexpiry",
331                                             "--filter", "(&(objectClass=user)(company=comp2))",
332                                             "--days=4",
333                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
334                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
335         self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
336
337         for user in self.users:
338             found = self._find_user(user["name"])
339             if ("%s" % found.get("company")) == "comp2":
340                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
341                 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
342             else:
343                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
344                 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
345
346
347     def test_list(self):
348         (result, out, err) = self.runsubcmd("user", "list",
349                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
350                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
351                                                           os.environ["DC_PASSWORD"]))
352         self.assertCmdSuccess(result, out, err, "Error running list")
353
354         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
355                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
356
357         userlist = self.samdb.search(base=self.samdb.domain_dn(),
358                                      scope=ldb.SCOPE_SUBTREE,
359                                      expression=search_filter,
360                                      attrs=["samaccountname"])
361
362         self.assertTrue(len(userlist) > 0, "no users found in samdb")
363
364         for userobj in userlist:
365             name = userobj.get("samaccountname", idx=0)
366             found = self.assertMatch(out, name,
367                                      "user '%s' not found" % name)
368
369     def test_show(self):
370         for user in self.users:
371             (result, out, err) = self.runsubcmd(
372                 "user", "show", user["name"],
373                 "--attributes=sAMAccountName,company",
374                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
375                 "-U%s%%%s" % (os.environ["DC_USERNAME"],
376                 os.environ["DC_PASSWORD"]))
377             self.assertCmdSuccess(result, out, err, "Error running show")
378
379             expected_out = """dn: CN=%s %s,CN=Users,%s
380 company: %s
381 sAMAccountName: %s
382
383 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
384                 user["company"], user["name"])
385
386             self.assertEqual(out, expected_out,
387                              "Unexpected show output for user '%s'" %
388                              user["name"])
389
390     def test_move(self):
391         full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
392         (result, out, err) =  self.runsubcmd("ou", "create", full_ou_dn)
393         self.assertCmdSuccess(result, out, err)
394         self.assertEquals(err, "", "There shouldn't be any error message")
395         self.assertIn('Created ou "%s"' % full_ou_dn, out)
396
397         for user in self.users:
398             (result, out, err) = self.runsubcmd(
399                 "user", "move", user["name"], full_ou_dn)
400             self.assertCmdSuccess(result, out, err, "Error running move")
401             self.assertIn('Moved user "%s" into "%s"' %
402                           (user["name"], full_ou_dn), out)
403
404         # Should fail as users objects are in OU
405         (result, out, err) =  self.runsubcmd("ou", "delete", full_ou_dn)
406         self.assertCmdFail(result)
407         self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
408                        "(it has %d children)!") % len(self.users), err)
409
410         for user in self.users:
411             new_dn = "CN=Users,%s" % self.samdb.domain_dn()
412             (result, out, err) = self.runsubcmd(
413                 "user", "move", user["name"], new_dn)
414             self.assertCmdSuccess(result, out, err, "Error running move")
415             self.assertIn('Moved user "%s" into "%s"' %
416                           (user["name"], new_dn), out)
417
418         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
419         self.assertCmdSuccess(result, out, err,
420                               "Failed to delete ou '%s'" % full_ou_dn)
421
422     def test_getpwent(self):
423         try:
424             import pwd
425         except ImportError:
426             self.skipTest("Skipping getpwent test, no 'pwd' module available")
427             return
428
429         # get the current user's data for the test
430         uid = os.geteuid()
431         try:
432             u = pwd.getpwuid(uid)
433         except KeyError:
434             self.skipTest("Skipping getpwent test, current EUID not found in NSS")
435             return
436
437
438 # samba-tool user create command didn't support users with empty gecos if none is
439 # specified on the command line and the user hasn't one in the passwd file it
440 # will fail, so let's add some contents
441
442         gecos = u[4]
443         if (gecos is None or len(gecos) == 0):
444             gecos = "Foo GECOS"
445         user = self._randomPosixUser({
446                         "name": u[0],
447                         "uid": u[0],
448                         "uidNumber": u[2],
449                         "gidNumber": u[3],
450                         "gecos": gecos,
451                         "loginShell": u[6],
452                         })
453         # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
454         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
455                                             "--surname=%s" % user["surname"],
456                                             "--given-name=%s" % user["given-name"],
457                                             "--job-title=%s" % user["job-title"],
458                                             "--department=%s" % user["department"],
459                                             "--description=%s" % user["description"],
460                                             "--company=%s" % user["company"],
461                                             "--gecos=%s" % user["gecos"],
462                                             "--rfc2307-from-nss",
463                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
464                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
465
466         self.assertCmdSuccess(result, out, err)
467         self.assertEquals(err,"","Shouldn't be any error messages")
468         self.assertIn("User '%s' created successfully" % user["name"], out)
469
470         self._check_posix_user(user)
471         self.runsubcmd("user", "delete", user["name"])
472
473         # Check if overriding the attributes from NSS with explicit values works
474         #
475         # get a user with all random posix attributes
476         user = self._randomPosixUser({"name": u[0]})
477         # create a user with posix attributes from nss but override all of them with the
478         # random ones just obtained
479         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
480                                             "--surname=%s" % user["surname"],
481                                             "--given-name=%s" % user["given-name"],
482                                             "--job-title=%s" % user["job-title"],
483                                             "--department=%s" % user["department"],
484                                             "--description=%s" % user["description"],
485                                             "--company=%s" % user["company"],
486                                             "--rfc2307-from-nss",
487                                             "--gecos=%s" % user["gecos"],
488                                             "--login-shell=%s" % user["loginShell"],
489                                             "--uid=%s" % user["uid"],
490                                             "--uid-number=%s" % user["uidNumber"],
491                                             "--gid-number=%s" % user["gidNumber"],
492                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
493                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
494
495         self.assertCmdSuccess(result, out, err)
496         self.assertEquals(err,"","Shouldn't be any error messages")
497         self.assertIn("User '%s' created successfully" % user["name"], out)
498
499         self._check_posix_user(user)
500         self.runsubcmd("user", "delete", user["name"])
501
502     def _randomUser(self, base={}):
503         """create a user with random attribute values, you can specify base attributes"""
504         user = {
505             "name": self.randomName(),
506             "password": self.randomPass(),
507             "surname": self.randomName(),
508             "given-name": self.randomName(),
509             "job-title": self.randomName(),
510             "department": self.randomName(),
511             "company": self.randomName(),
512             "description": self.randomName(count=100),
513             "createUserFn": self._create_user,
514             "checkUserFn": self._check_user,
515         }
516         user.update(base)
517         return user
518
519     def _randomPosixUser(self, base={}):
520         """create a user with random attribute values and additional RFC2307
521         attributes, you can specify base attributes"""
522         user = self._randomUser({})
523         user.update(base)
524         posixAttributes = {
525             "uid": self.randomName(),
526             "loginShell": self.randomName(),
527             "gecos": self.randomName(),
528             "uidNumber": self.randomXid(),
529             "gidNumber": self.randomXid(),
530             "createUserFn": self._create_posix_user,
531             "checkUserFn": self._check_posix_user,
532         }
533         user.update(posixAttributes)
534         user.update(base)
535         return user
536
537     def _check_user(self, user):
538         """ check if a user from SamDB has the same attributes as its template """
539         found = self._find_user(user["name"])
540
541         self.assertEquals("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
542         self.assertEquals("%s" % found.get("title"), user["job-title"])
543         self.assertEquals("%s" % found.get("company"), user["company"])
544         self.assertEquals("%s" % found.get("description"), user["description"])
545         self.assertEquals("%s" % found.get("department"), user["department"])
546
547     def _check_posix_user(self, user):
548         """ check if a posix_user from SamDB has the same attributes as its template """
549         found = self._find_user(user["name"])
550
551         self.assertEquals("%s" % found.get("loginShell"), user["loginShell"])
552         self.assertEquals("%s" % found.get("gecos"), user["gecos"])
553         self.assertEquals("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
554         self.assertEquals("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
555         self.assertEquals("%s" % found.get("uid"), user["uid"])
556         self._check_user(user)
557
558     def _create_user(self, user):
559         return self.runsubcmd("user", "create", user["name"], user["password"],
560                               "--surname=%s" % user["surname"],
561                               "--given-name=%s" % user["given-name"],
562                               "--job-title=%s" % user["job-title"],
563                               "--department=%s" % user["department"],
564                               "--description=%s" % user["description"],
565                               "--company=%s" % user["company"],
566                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
567                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
568     def _create_posix_user(self, user):
569         """ create a new user with RFC2307 attributes """
570         return self.runsubcmd("user", "create", user["name"], user["password"],
571                               "--surname=%s" % user["surname"],
572                               "--given-name=%s" % user["given-name"],
573                               "--job-title=%s" % user["job-title"],
574                               "--department=%s" % user["department"],
575                               "--description=%s" % user["description"],
576                               "--company=%s" % user["company"],
577                               "--gecos=%s" % user["gecos"],
578                               "--login-shell=%s" % user["loginShell"],
579                               "--uid=%s" % user["uid"],
580                               "--uid-number=%s" % user["uidNumber"],
581                               "--gid-number=%s" % user["gidNumber"],
582                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
583                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
584
585     def _find_user(self, name):
586         search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
587         userlist = self.samdb.search(base=self.samdb.domain_dn(),
588                                   scope=ldb.SCOPE_SUBTREE,
589                                   expression=search_filter, attrs=[])
590         if userlist:
591             return userlist[0]
592         else:
593             return None