PEP8: fix E303: too many blank lines (2)
[samba.git] / python / samba / tests / samba_tool / user.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import time
20 import base64
21 import ldb
22 from samba.tests.samba_tool.base import SambaToolCmdTest
23 from samba import (
24         credentials,
25         nttime2unix,
26         dsdb
27         )
28 from samba.ndr import ndr_unpack
29 from samba.dcerpc import drsblobs
30
31
32 class UserCmdTestCase(SambaToolCmdTest):
33     """Tests for samba-tool user subcommands"""
34     users = []
35     samdb = None
36
37     def setUp(self):
38         super(UserCmdTestCase, self).setUp()
39         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
40                                    "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
41         self.users = []
42         self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
43         self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
44         self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
45         self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
46         self.users.append(self._randomPosixUser({"name": "posixuser1"}))
47         self.users.append(self._randomPosixUser({"name": "posixuser2"}))
48         self.users.append(self._randomPosixUser({"name": "posixuser3"}))
49         self.users.append(self._randomPosixUser({"name": "posixuser4"}))
50
51         # setup the 8 users and ensure they are correct
52         for user in self.users:
53             (result, out, err) = user["createUserFn"](user)
54
55             self.assertCmdSuccess(result, out, err)
56             self.assertEquals(err, "", "Shouldn't be any error messages")
57             self.assertIn("User '%s' created successfully" % user["name"], out)
58
59             user["checkUserFn"](user)
60
61     def tearDown(self):
62         super(UserCmdTestCase, self).tearDown()
63         # clean up all the left over users, just in case
64         for user in self.users:
65             if self._find_user(user["name"]):
66                 self.runsubcmd("user", "delete", user["name"])
67
68     def test_newuser(self):
69         # try to add all the users again, this should fail
70         for user in self.users:
71             (result, out, err) = self._create_user(user)
72             self.assertCmdFail(result, "Ensure that create user fails")
73             self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
74
75         # try to delete all the 4 users we just added
76         for user in self.users:
77             (result, out, err) = self.runsubcmd("user", "delete", user["name"])
78             self.assertCmdSuccess(result, out, err, "Can we delete users")
79             found = self._find_user(user["name"])
80             self.assertIsNone(found)
81
82         # test adding users with --use-username-as-cn
83         for user in self.users:
84             (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
85                                                  "--use-username-as-cn",
86                                                  "--surname=%s" % user["surname"],
87                                                  "--given-name=%s" % user["given-name"],
88                                                  "--job-title=%s" % user["job-title"],
89                                                  "--department=%s" % user["department"],
90                                                  "--description=%s" % user["description"],
91                                                  "--company=%s" % user["company"],
92                                                  "-H", "ldap://%s" % os.environ["DC_SERVER"],
93                                                  "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
94
95             self.assertCmdSuccess(result, out, err)
96             self.assertEquals(err, "", "Shouldn't be any error messages")
97             self.assertIn("User '%s' created successfully" % user["name"], out)
98
99             found = self._find_user(user["name"])
100
101             self.assertEquals("%s" % found.get("cn"), "%(name)s" % user)
102             self.assertEquals("%s" % found.get("name"), "%(name)s" % user)
103
104     def _verify_supplementalCredentials(self, ldif,
105                                         min_packages=3,
106                                         max_packages=6):
107         msgs = self.samdb.parse_ldif(ldif)
108         (changetype, obj) = next(msgs)
109
110         self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
111         sc_blob = obj["supplementalCredentials"][0]
112         sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
113
114         self.assertGreaterEqual(sc.sub.num_packages,
115                                 min_packages, "min_packages check")
116         self.assertLessEqual(sc.sub.num_packages,
117                              max_packages, "max_packages check")
118
119         if max_packages == 0:
120             return
121
122         def find_package(packages, name, start_idx=0):
123             for i in range(start_idx, len(packages)):
124                 if packages[i].name == name:
125                     return (i, packages[i])
126             return (None, None)
127
128         # The ordering is this
129         #
130         # Primary:Kerberos-Newer-Keys (optional)
131         # Primary:Kerberos
132         # Primary:WDigest
133         # Primary:CLEARTEXT (optional)
134         # Primary:SambaGPG (optional)
135         #
136         # And the 'Packages' package is insert before the last
137         # other package.
138
139         nidx = 0
140         (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
141         self.assertIsNotNone(pp, "Packages required")
142         self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
143                          "Packages needs to be at num_packages - 1")
144
145         (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
146                                     start_idx=nidx)
147         if knidx is not None:
148             self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
149             nidx = nidx + 1
150             if nidx == pidx:
151                 nidx = nidx + 1
152
153         (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
154                                   start_idx=nidx)
155         self.assertIsNotNone(pp, "Primary:Kerberos required")
156         self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
157         nidx = nidx + 1
158         if nidx == pidx:
159             nidx = nidx + 1
160
161         (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
162                                   start_idx=nidx)
163         self.assertIsNotNone(pp, "Primary:WDigest required")
164         self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
165         nidx = nidx + 1
166         if nidx == pidx:
167             nidx = nidx + 1
168
169         (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
170                                   start_idx=nidx)
171         if cidx is not None:
172             self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
173             nidx = nidx + 1
174             if nidx == pidx:
175                 nidx = nidx + 1
176
177         (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
178                                   start_idx=nidx)
179         if gidx is not None:
180             self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
181             nidx = nidx + 1
182             if nidx == pidx:
183                 nidx = nidx + 1
184
185         self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
186
187     def test_setpassword(self):
188         for user in self.users:
189             newpasswd = self.randomPass()
190             (result, out, err) = self.runsubcmd("user", "setpassword",
191                                                 user["name"],
192                                                 "--newpassword=%s" % newpasswd,
193                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
194                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
195             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
196             self.assertEquals(err, "", "setpassword with url")
197             self.assertMatch(out, "Changed password OK", "setpassword with url")
198
199         attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
200         (result, out, err) = self.runsubcmd("user", "syncpasswords",
201                                             "--cache-ldb-initialize",
202                                             "--attributes=%s" % attributes,
203                                             "--decrypt-samba-gpg")
204         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
205         self.assertEqual(err, "", "getpassword without url")
206         cache_attrs = {
207             "objectClass": {"value": "userSyncPasswords"},
208             "samdbUrl": {},
209             "dirsyncFilter": {},
210             "dirsyncAttribute": {},
211             "dirsyncControl": {"value": "dirsync:1:0:0"},
212             "passwordAttribute": {},
213             "decryptSambaGPG": {},
214             "currentTime": {},
215         }
216         for a in cache_attrs.keys():
217             v = cache_attrs[a].get("value", "")
218             self.assertMatch(out, "%s: %s" % (a, v),
219                              "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
220
221         (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
222         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
223         self.assertEqual(err, "", "syncpasswords --no-wait")
224         self.assertMatch(out, "dirsync_loop(): results 0",
225                          "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
226         for user in self.users:
227             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
228                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
229
230         for user in self.users:
231             newpasswd = self.randomPass()
232             creds = credentials.Credentials()
233             creds.set_anonymous()
234             creds.set_password(newpasswd)
235             nthash = creds.get_nt_hash()
236             unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
237             virtualClearTextUTF8 = base64.b64encode(newpasswd).decode('utf8')
238             virtualClearTextUTF16 = base64.b64encode(unicode(newpasswd, 'utf-8').encode('utf-16-le')).decode('utf8')
239
240             (result, out, err) = self.runsubcmd("user", "setpassword",
241                                                 user["name"],
242                                                 "--newpassword=%s" % newpasswd)
243             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
244             self.assertEquals(err, "", "setpassword without url")
245             self.assertMatch(out, "Changed password OK", "setpassword without url")
246
247             (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
248             self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
249             self.assertEqual(err, "", "syncpasswords --no-wait")
250             self.assertMatch(out, "dirsync_loop(): results 0",
251                              "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
252             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
253                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
254             self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
255                              "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
256             self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
257                              "getpassword unicodePwd: out[%s]" % out)
258             self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
259                              "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
260             self.assertMatch(out, "supplementalCredentials:: ",
261                              "getpassword supplementalCredentials: out[%s]" % out)
262             if "virtualSambaGPG:: " in out:
263                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
264                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
265                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
266                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
267                 self.assertMatch(out, "virtualSSHA: ",
268                                  "getpassword virtualSSHA: out[%s]" % out)
269
270             (result, out, err) = self.runsubcmd("user", "getpassword",
271                                                 user["name"],
272                                                 "--attributes=%s" % attributes,
273                                                 "--decrypt-samba-gpg")
274             self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
275             self.assertEqual(err, "", "getpassword without url")
276             self.assertMatch(out, "Got password OK", "getpassword without url")
277             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
278                              "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
279             self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
280                              "getpassword unicodePwd: out[%s]" % out)
281             self.assertMatch(out, "supplementalCredentials:: ",
282                              "getpassword supplementalCredentials: out[%s]" % out)
283             self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
284             if "virtualSambaGPG:: " in out:
285                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
286                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
287                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
288                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
289                 self.assertMatch(out, "virtualSSHA: ",
290                                  "getpassword virtualSSHA: out[%s]" % out)
291
292         for user in self.users:
293             newpasswd = self.randomPass()
294             (result, out, err) = self.runsubcmd("user", "setpassword",
295                                                 user["name"],
296                                                 "--newpassword=%s" % newpasswd,
297                                                 "--must-change-at-next-login",
298                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
299                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
300             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
301             self.assertEquals(err, "", "setpassword with forced change")
302             self.assertMatch(out, "Changed password OK", "setpassword with forced change")
303
304     def test_setexpiry(self):
305         for user in self.users:
306             twodays = time.time() + (2 * 24 * 60 * 60)
307
308             (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
309                                                 "--days=2",
310                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
311                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
312             self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
313             self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
314
315             found = self._find_user(user["name"])
316
317             expires = nttime2unix(int("%s" % found.get("accountExpires")))
318             self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
319
320         # TODO: renable this after the filter case is sorted out
321         if "filters are broken, bail now":
322             return
323
324         # now run the expiration based on a filter
325         fourdays = time.time() + (4 * 24 * 60 * 60)
326         (result, out, err) = self.runsubcmd("user", "setexpiry",
327                                             "--filter", "(&(objectClass=user)(company=comp2))",
328                                             "--days=4",
329                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
330                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
331         self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
332
333         for user in self.users:
334             found = self._find_user(user["name"])
335             if ("%s" % found.get("company")) == "comp2":
336                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
337                 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
338             else:
339                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
340                 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
341
342     def test_list(self):
343         (result, out, err) = self.runsubcmd("user", "list",
344                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
345                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
346                                                           os.environ["DC_PASSWORD"]))
347         self.assertCmdSuccess(result, out, err, "Error running list")
348
349         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
350                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
351
352         userlist = self.samdb.search(base=self.samdb.domain_dn(),
353                                      scope=ldb.SCOPE_SUBTREE,
354                                      expression=search_filter,
355                                      attrs=["samaccountname"])
356
357         self.assertTrue(len(userlist) > 0, "no users found in samdb")
358
359         for userobj in userlist:
360             name = userobj.get("samaccountname", idx=0)
361             found = self.assertMatch(out, name,
362                                      "user '%s' not found" % name)
363
364     def test_show(self):
365         for user in self.users:
366             (result, out, err) = self.runsubcmd(
367                 "user", "show", user["name"],
368                 "--attributes=sAMAccountName,company",
369                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
370                 "-U%s%%%s" % (os.environ["DC_USERNAME"],
371                               os.environ["DC_PASSWORD"]))
372             self.assertCmdSuccess(result, out, err, "Error running show")
373
374             expected_out = """dn: CN=%s %s,CN=Users,%s
375 company: %s
376 sAMAccountName: %s
377
378 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
379                 user["company"], user["name"])
380
381             self.assertEqual(out, expected_out,
382                              "Unexpected show output for user '%s'" %
383                              user["name"])
384
385     def test_move(self):
386         full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
387         (result, out, err) = self.runsubcmd("ou", "create", full_ou_dn)
388         self.assertCmdSuccess(result, out, err)
389         self.assertEquals(err, "", "There shouldn't be any error message")
390         self.assertIn('Created ou "%s"' % full_ou_dn, out)
391
392         for user in self.users:
393             (result, out, err) = self.runsubcmd(
394                 "user", "move", user["name"], full_ou_dn)
395             self.assertCmdSuccess(result, out, err, "Error running move")
396             self.assertIn('Moved user "%s" into "%s"' %
397                           (user["name"], full_ou_dn), out)
398
399         # Should fail as users objects are in OU
400         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
401         self.assertCmdFail(result)
402         self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
403                        "(it has %d children)!") % len(self.users), err)
404
405         for user in self.users:
406             new_dn = "CN=Users,%s" % self.samdb.domain_dn()
407             (result, out, err) = self.runsubcmd(
408                 "user", "move", user["name"], new_dn)
409             self.assertCmdSuccess(result, out, err, "Error running move")
410             self.assertIn('Moved user "%s" into "%s"' %
411                           (user["name"], new_dn), out)
412
413         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
414         self.assertCmdSuccess(result, out, err,
415                               "Failed to delete ou '%s'" % full_ou_dn)
416
417     def test_getpwent(self):
418         try:
419             import pwd
420         except ImportError:
421             self.skipTest("Skipping getpwent test, no 'pwd' module available")
422             return
423
424         # get the current user's data for the test
425         uid = os.geteuid()
426         try:
427             u = pwd.getpwuid(uid)
428         except KeyError:
429             self.skipTest("Skipping getpwent test, current EUID not found in NSS")
430             return
431
432
433 # samba-tool user create command didn't support users with empty gecos if none is
434 # specified on the command line and the user hasn't one in the passwd file it
435 # will fail, so let's add some contents
436
437         gecos = u[4]
438         if (gecos is None or len(gecos) == 0):
439             gecos = "Foo GECOS"
440         user = self._randomPosixUser({
441                         "name": u[0],
442                         "uid": u[0],
443                         "uidNumber": u[2],
444                         "gidNumber": u[3],
445                         "gecos": gecos,
446                         "loginShell": u[6],
447                         })
448         # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
449         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
450                                             "--surname=%s" % user["surname"],
451                                             "--given-name=%s" % user["given-name"],
452                                             "--job-title=%s" % user["job-title"],
453                                             "--department=%s" % user["department"],
454                                             "--description=%s" % user["description"],
455                                             "--company=%s" % user["company"],
456                                             "--gecos=%s" % user["gecos"],
457                                             "--rfc2307-from-nss",
458                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
459                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
460
461         self.assertCmdSuccess(result, out, err)
462         self.assertEquals(err, "", "Shouldn't be any error messages")
463         self.assertIn("User '%s' created successfully" % user["name"], out)
464
465         self._check_posix_user(user)
466         self.runsubcmd("user", "delete", user["name"])
467
468         # Check if overriding the attributes from NSS with explicit values works
469         #
470         # get a user with all random posix attributes
471         user = self._randomPosixUser({"name": u[0]})
472         # create a user with posix attributes from nss but override all of them with the
473         # random ones just obtained
474         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
475                                             "--surname=%s" % user["surname"],
476                                             "--given-name=%s" % user["given-name"],
477                                             "--job-title=%s" % user["job-title"],
478                                             "--department=%s" % user["department"],
479                                             "--description=%s" % user["description"],
480                                             "--company=%s" % user["company"],
481                                             "--rfc2307-from-nss",
482                                             "--gecos=%s" % user["gecos"],
483                                             "--login-shell=%s" % user["loginShell"],
484                                             "--uid=%s" % user["uid"],
485                                             "--uid-number=%s" % user["uidNumber"],
486                                             "--gid-number=%s" % user["gidNumber"],
487                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
488                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
489
490         self.assertCmdSuccess(result, out, err)
491         self.assertEquals(err, "", "Shouldn't be any error messages")
492         self.assertIn("User '%s' created successfully" % user["name"], out)
493
494         self._check_posix_user(user)
495         self.runsubcmd("user", "delete", user["name"])
496
497     def _randomUser(self, base={}):
498         """create a user with random attribute values, you can specify base attributes"""
499         user = {
500             "name": self.randomName(),
501             "password": self.randomPass(),
502             "surname": self.randomName(),
503             "given-name": self.randomName(),
504             "job-title": self.randomName(),
505             "department": self.randomName(),
506             "company": self.randomName(),
507             "description": self.randomName(count=100),
508             "createUserFn": self._create_user,
509             "checkUserFn": self._check_user,
510         }
511         user.update(base)
512         return user
513
514     def _randomPosixUser(self, base={}):
515         """create a user with random attribute values and additional RFC2307
516         attributes, you can specify base attributes"""
517         user = self._randomUser({})
518         user.update(base)
519         posixAttributes = {
520             "uid": self.randomName(),
521             "loginShell": self.randomName(),
522             "gecos": self.randomName(),
523             "uidNumber": self.randomXid(),
524             "gidNumber": self.randomXid(),
525             "createUserFn": self._create_posix_user,
526             "checkUserFn": self._check_posix_user,
527         }
528         user.update(posixAttributes)
529         user.update(base)
530         return user
531
532     def _check_user(self, user):
533         """ check if a user from SamDB has the same attributes as its template """
534         found = self._find_user(user["name"])
535
536         self.assertEquals("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
537         self.assertEquals("%s" % found.get("title"), user["job-title"])
538         self.assertEquals("%s" % found.get("company"), user["company"])
539         self.assertEquals("%s" % found.get("description"), user["description"])
540         self.assertEquals("%s" % found.get("department"), user["department"])
541
542     def _check_posix_user(self, user):
543         """ check if a posix_user from SamDB has the same attributes as its template """
544         found = self._find_user(user["name"])
545
546         self.assertEquals("%s" % found.get("loginShell"), user["loginShell"])
547         self.assertEquals("%s" % found.get("gecos"), user["gecos"])
548         self.assertEquals("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
549         self.assertEquals("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
550         self.assertEquals("%s" % found.get("uid"), user["uid"])
551         self._check_user(user)
552
553     def _create_user(self, user):
554         return self.runsubcmd("user", "create", user["name"], user["password"],
555                               "--surname=%s" % user["surname"],
556                               "--given-name=%s" % user["given-name"],
557                               "--job-title=%s" % user["job-title"],
558                               "--department=%s" % user["department"],
559                               "--description=%s" % user["description"],
560                               "--company=%s" % user["company"],
561                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
562                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
563
564     def _create_posix_user(self, user):
565         """ create a new user with RFC2307 attributes """
566         return self.runsubcmd("user", "create", user["name"], user["password"],
567                               "--surname=%s" % user["surname"],
568                               "--given-name=%s" % user["given-name"],
569                               "--job-title=%s" % user["job-title"],
570                               "--department=%s" % user["department"],
571                               "--description=%s" % user["description"],
572                               "--company=%s" % user["company"],
573                               "--gecos=%s" % user["gecos"],
574                               "--login-shell=%s" % user["loginShell"],
575                               "--uid=%s" % user["uid"],
576                               "--uid-number=%s" % user["uidNumber"],
577                               "--gid-number=%s" % user["gidNumber"],
578                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
579                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
580
581     def _find_user(self, name):
582         search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
583         userlist = self.samdb.search(base=self.samdb.domain_dn(),
584                                      scope=ldb.SCOPE_SUBTREE,
585                                      expression=search_filter, attrs=[])
586         if userlist:
587             return userlist[0]
588         else:
589             return None