ldb: Free memory when repacking database
[garming/samba-autobuild/.git] / python / samba / tests / samba_tool / user.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import time
20 import base64
21 import ldb
22 from samba.tests.samba_tool.base import SambaToolCmdTest
23 from samba import (
24         credentials,
25         nttime2unix,
26         dsdb
27         )
28 from samba.ndr import ndr_unpack
29 from samba.dcerpc import drsblobs
30 from samba.compat import get_bytes
31 from samba.compat import get_string
32 from samba.tests import env_loadparm
33
34
35 class UserCmdTestCase(SambaToolCmdTest):
36     """Tests for samba-tool user subcommands"""
37     users = []
38     samdb = None
39
40     def setUp(self):
41         super(UserCmdTestCase, self).setUp()
42         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
43                                    "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
44         self.users = []
45         self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
46         self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
47         self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
48         self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
49         self.users.append(self._randomPosixUser({"name": "posixuser1"}))
50         self.users.append(self._randomPosixUser({"name": "posixuser2"}))
51         self.users.append(self._randomPosixUser({"name": "posixuser3"}))
52         self.users.append(self._randomPosixUser({"name": "posixuser4"}))
53
54         # setup the 8 users and ensure they are correct
55         for user in self.users:
56             (result, out, err) = user["createUserFn"](user)
57
58             self.assertCmdSuccess(result, out, err)
59             self.assertEquals(err, "", "Shouldn't be any error messages")
60             self.assertIn("User '%s' created successfully" % user["name"], out)
61
62             user["checkUserFn"](user)
63
64     def tearDown(self):
65         super(UserCmdTestCase, self).tearDown()
66         # clean up all the left over users, just in case
67         for user in self.users:
68             if self._find_user(user["name"]):
69                 self.runsubcmd("user", "delete", user["name"])
70         lp = env_loadparm()
71         # second run of this test
72         # the cache is still there and '--cache-ldb-initialize'
73         # will fail
74         cachedb = lp.private_path("user-syncpasswords-cache.ldb")
75         if os.path.exists(cachedb):
76             os.remove(cachedb)
77
78     def test_newuser(self):
79         # try to add all the users again, this should fail
80         for user in self.users:
81             (result, out, err) = self._create_user(user)
82             self.assertCmdFail(result, "Ensure that create user fails")
83             self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
84
85         # try to delete all the 4 users we just added
86         for user in self.users:
87             (result, out, err) = self.runsubcmd("user", "delete", user["name"])
88             self.assertCmdSuccess(result, out, err, "Can we delete users")
89             found = self._find_user(user["name"])
90             self.assertIsNone(found)
91
92         # test adding users with --use-username-as-cn
93         for user in self.users:
94             (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
95                                                 "--use-username-as-cn",
96                                                 "--surname=%s" % user["surname"],
97                                                 "--given-name=%s" % user["given-name"],
98                                                 "--job-title=%s" % user["job-title"],
99                                                 "--department=%s" % user["department"],
100                                                 "--description=%s" % user["description"],
101                                                 "--company=%s" % user["company"],
102                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
103                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
104
105             self.assertCmdSuccess(result, out, err)
106             self.assertEquals(err, "", "Shouldn't be any error messages")
107             self.assertIn("User '%s' created successfully" % user["name"], out)
108
109             found = self._find_user(user["name"])
110
111             self.assertEquals("%s" % found.get("cn"), "%(name)s" % user)
112             self.assertEquals("%s" % found.get("name"), "%(name)s" % user)
113
114     def _verify_supplementalCredentials(self, ldif,
115                                         min_packages=3,
116                                         max_packages=6):
117         msgs = self.samdb.parse_ldif(ldif)
118         (changetype, obj) = next(msgs)
119
120         self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
121         sc_blob = obj["supplementalCredentials"][0]
122         sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
123
124         self.assertGreaterEqual(sc.sub.num_packages,
125                                 min_packages, "min_packages check")
126         self.assertLessEqual(sc.sub.num_packages,
127                              max_packages, "max_packages check")
128
129         if max_packages == 0:
130             return
131
132         def find_package(packages, name, start_idx=0):
133             for i in range(start_idx, len(packages)):
134                 if packages[i].name == name:
135                     return (i, packages[i])
136             return (None, None)
137
138         # The ordering is this
139         #
140         # Primary:Kerberos-Newer-Keys (optional)
141         # Primary:Kerberos
142         # Primary:WDigest
143         # Primary:CLEARTEXT (optional)
144         # Primary:SambaGPG (optional)
145         #
146         # And the 'Packages' package is insert before the last
147         # other package.
148
149         nidx = 0
150         (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
151         self.assertIsNotNone(pp, "Packages required")
152         self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
153                          "Packages needs to be at num_packages - 1")
154
155         (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
156                                     start_idx=nidx)
157         if knidx is not None:
158             self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
159             nidx = nidx + 1
160             if nidx == pidx:
161                 nidx = nidx + 1
162
163         (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
164                                   start_idx=nidx)
165         self.assertIsNotNone(pp, "Primary:Kerberos required")
166         self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
167         nidx = nidx + 1
168         if nidx == pidx:
169             nidx = nidx + 1
170
171         (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
172                                   start_idx=nidx)
173         self.assertIsNotNone(pp, "Primary:WDigest required")
174         self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
175         nidx = nidx + 1
176         if nidx == pidx:
177             nidx = nidx + 1
178
179         (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
180                                   start_idx=nidx)
181         if cidx is not None:
182             self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
183             nidx = nidx + 1
184             if nidx == pidx:
185                 nidx = nidx + 1
186
187         (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
188                                   start_idx=nidx)
189         if gidx is not None:
190             self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
191             nidx = nidx + 1
192             if nidx == pidx:
193                 nidx = nidx + 1
194
195         self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
196
197     def test_setpassword(self):
198         for user in self.users:
199             newpasswd = self.random_password(16)
200             (result, out, err) = self.runsubcmd("user", "setpassword",
201                                                 user["name"],
202                                                 "--newpassword=%s" % newpasswd,
203                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
204                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
205             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
206             self.assertEquals(err, "", "setpassword with url")
207             self.assertMatch(out, "Changed password OK", "setpassword with url")
208
209         attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
210         (result, out, err) = self.runsubcmd("user", "syncpasswords",
211                                             "--cache-ldb-initialize",
212                                             "--attributes=%s" % attributes,
213                                             "--decrypt-samba-gpg")
214         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
215         self.assertEqual(err, "", "getpassword without url")
216         cache_attrs = {
217             "objectClass": {"value": "userSyncPasswords"},
218             "samdbUrl": {},
219             "dirsyncFilter": {},
220             "dirsyncAttribute": {},
221             "dirsyncControl": {"value": "dirsync:1:0:0"},
222             "passwordAttribute": {},
223             "decryptSambaGPG": {},
224             "currentTime": {},
225         }
226         for a in cache_attrs.keys():
227             v = cache_attrs[a].get("value", "")
228             self.assertMatch(out, "%s: %s" % (a, v),
229                              "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
230
231         (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
232         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
233         self.assertEqual(err, "", "syncpasswords --no-wait")
234         self.assertMatch(out, "dirsync_loop(): results 0",
235                          "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
236         for user in self.users:
237             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
238                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
239
240         for user in self.users:
241             newpasswd = self.random_password(16)
242             creds = credentials.Credentials()
243             creds.set_anonymous()
244             creds.set_password(newpasswd)
245             nthash = creds.get_nt_hash()
246             unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
247             virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8')
248             virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8')
249
250             (result, out, err) = self.runsubcmd("user", "setpassword",
251                                                 user["name"],
252                                                 "--newpassword=%s" % newpasswd)
253             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
254             self.assertEquals(err, "", "setpassword without url")
255             self.assertMatch(out, "Changed password OK", "setpassword without url")
256
257             (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
258             self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
259             self.assertEqual(err, "", "syncpasswords --no-wait")
260             self.assertMatch(out, "dirsync_loop(): results 0",
261                              "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
262             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
263                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
264             self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
265                              "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
266             self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
267                              "getpassword unicodePwd: out[%s]" % out)
268             self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
269                              "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
270             self.assertMatch(out, "supplementalCredentials:: ",
271                              "getpassword supplementalCredentials: out[%s]" % out)
272             if "virtualSambaGPG:: " in out:
273                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
274                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
275                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
276                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
277                 self.assertMatch(out, "virtualSSHA: ",
278                                  "getpassword virtualSSHA: out[%s]" % out)
279
280             (result, out, err) = self.runsubcmd("user", "getpassword",
281                                                 user["name"],
282                                                 "--attributes=%s" % attributes,
283                                                 "--decrypt-samba-gpg")
284             self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
285             self.assertEqual(err, "", "getpassword without url")
286             self.assertMatch(out, "Got password OK", "getpassword without url")
287             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
288                              "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
289             self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
290                              "getpassword unicodePwd: out[%s]" % out)
291             self.assertMatch(out, "supplementalCredentials:: ",
292                              "getpassword supplementalCredentials: out[%s]" % out)
293             self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
294             if "virtualSambaGPG:: " in out:
295                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
296                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
297                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
298                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
299                 self.assertMatch(out, "virtualSSHA: ",
300                                  "getpassword virtualSSHA: out[%s]" % out)
301
302         for user in self.users:
303             newpasswd = self.random_password(16)
304             (result, out, err) = self.runsubcmd("user", "setpassword",
305                                                 user["name"],
306                                                 "--newpassword=%s" % newpasswd,
307                                                 "--must-change-at-next-login",
308                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
309                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
310             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
311             self.assertEquals(err, "", "setpassword with forced change")
312             self.assertMatch(out, "Changed password OK", "setpassword with forced change")
313
314     def test_setexpiry(self):
315         for user in self.users:
316             twodays = time.time() + (2 * 24 * 60 * 60)
317
318             (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
319                                                 "--days=2",
320                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
321                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
322             self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
323             self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
324
325             found = self._find_user(user["name"])
326
327             expires = nttime2unix(int("%s" % found.get("accountExpires")))
328             self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
329
330         # TODO: renable this after the filter case is sorted out
331         if "filters are broken, bail now":
332             return
333
334         # now run the expiration based on a filter
335         fourdays = time.time() + (4 * 24 * 60 * 60)
336         (result, out, err) = self.runsubcmd("user", "setexpiry",
337                                             "--filter", "(&(objectClass=user)(company=comp2))",
338                                             "--days=4",
339                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
340                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
341         self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
342
343         for user in self.users:
344             found = self._find_user(user["name"])
345             if ("%s" % found.get("company")) == "comp2":
346                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
347                 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
348             else:
349                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
350                 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
351
352     def test_list(self):
353         (result, out, err) = self.runsubcmd("user", "list",
354                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
355                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
356                                                           os.environ["DC_PASSWORD"]))
357         self.assertCmdSuccess(result, out, err, "Error running list")
358
359         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
360                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
361
362         userlist = self.samdb.search(base=self.samdb.domain_dn(),
363                                      scope=ldb.SCOPE_SUBTREE,
364                                      expression=search_filter,
365                                      attrs=["samaccountname"])
366
367         self.assertTrue(len(userlist) > 0, "no users found in samdb")
368
369         for userobj in userlist:
370             name = str(userobj.get("samaccountname", idx=0))
371             found = self.assertMatch(out, name,
372                                      "user '%s' not found" % name)
373
374     def test_show(self):
375         for user in self.users:
376             (result, out, err) = self.runsubcmd(
377                 "user", "show", user["name"],
378                 "--attributes=sAMAccountName,company",
379                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
380                 "-U%s%%%s" % (os.environ["DC_USERNAME"],
381                               os.environ["DC_PASSWORD"]))
382             self.assertCmdSuccess(result, out, err, "Error running show")
383
384             expected_out = """dn: CN=%s %s,CN=Users,%s
385 company: %s
386 sAMAccountName: %s
387
388 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
389                 user["company"], user["name"])
390
391             self.assertEqual(out, expected_out,
392                              "Unexpected show output for user '%s'" %
393                              user["name"])
394
395     def test_move(self):
396         full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
397         (result, out, err) = self.runsubcmd("ou", "create", full_ou_dn)
398         self.assertCmdSuccess(result, out, err)
399         self.assertEquals(err, "", "There shouldn't be any error message")
400         self.assertIn('Created ou "%s"' % full_ou_dn, out)
401
402         for user in self.users:
403             (result, out, err) = self.runsubcmd(
404                 "user", "move", user["name"], full_ou_dn)
405             self.assertCmdSuccess(result, out, err, "Error running move")
406             self.assertIn('Moved user "%s" into "%s"' %
407                           (user["name"], full_ou_dn), out)
408
409         # Should fail as users objects are in OU
410         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
411         self.assertCmdFail(result)
412         self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
413                        "(it has %d children)!") % len(self.users), err)
414
415         for user in self.users:
416             new_dn = "CN=Users,%s" % self.samdb.domain_dn()
417             (result, out, err) = self.runsubcmd(
418                 "user", "move", user["name"], new_dn)
419             self.assertCmdSuccess(result, out, err, "Error running move")
420             self.assertIn('Moved user "%s" into "%s"' %
421                           (user["name"], new_dn), out)
422
423         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
424         self.assertCmdSuccess(result, out, err,
425                               "Failed to delete ou '%s'" % full_ou_dn)
426
427     def test_getpwent(self):
428         try:
429             import pwd
430         except ImportError:
431             self.skipTest("Skipping getpwent test, no 'pwd' module available")
432             return
433
434         # get the current user's data for the test
435         uid = os.geteuid()
436         try:
437             u = pwd.getpwuid(uid)
438         except KeyError:
439             self.skipTest("Skipping getpwent test, current EUID not found in NSS")
440             return
441
442
443 # samba-tool user create command didn't support users with empty gecos if none is
444 # specified on the command line and the user hasn't one in the passwd file it
445 # will fail, so let's add some contents
446
447         gecos = u[4]
448         if (gecos is None or len(gecos) == 0):
449             gecos = "Foo GECOS"
450         user = self._randomPosixUser({
451                         "name": u[0],
452                         "uid": u[0],
453                         "uidNumber": u[2],
454                         "gidNumber": u[3],
455                         "gecos": gecos,
456                         "loginShell": u[6],
457                         })
458         # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
459         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
460                                             "--surname=%s" % user["surname"],
461                                             "--given-name=%s" % user["given-name"],
462                                             "--job-title=%s" % user["job-title"],
463                                             "--department=%s" % user["department"],
464                                             "--description=%s" % user["description"],
465                                             "--company=%s" % user["company"],
466                                             "--gecos=%s" % user["gecos"],
467                                             "--rfc2307-from-nss",
468                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
469                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
470
471         self.assertCmdSuccess(result, out, err)
472         self.assertEquals(err, "", "Shouldn't be any error messages")
473         self.assertIn("User '%s' created successfully" % user["name"], out)
474
475         self._check_posix_user(user)
476         self.runsubcmd("user", "delete", user["name"])
477
478         # Check if overriding the attributes from NSS with explicit values works
479         #
480         # get a user with all random posix attributes
481         user = self._randomPosixUser({"name": u[0]})
482         # create a user with posix attributes from nss but override all of them with the
483         # random ones just obtained
484         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
485                                             "--surname=%s" % user["surname"],
486                                             "--given-name=%s" % user["given-name"],
487                                             "--job-title=%s" % user["job-title"],
488                                             "--department=%s" % user["department"],
489                                             "--description=%s" % user["description"],
490                                             "--company=%s" % user["company"],
491                                             "--rfc2307-from-nss",
492                                             "--gecos=%s" % user["gecos"],
493                                             "--login-shell=%s" % user["loginShell"],
494                                             "--uid=%s" % user["uid"],
495                                             "--uid-number=%s" % user["uidNumber"],
496                                             "--gid-number=%s" % user["gidNumber"],
497                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
498                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
499
500         self.assertCmdSuccess(result, out, err)
501         self.assertEquals(err, "", "Shouldn't be any error messages")
502         self.assertIn("User '%s' created successfully" % user["name"], out)
503
504         self._check_posix_user(user)
505         self.runsubcmd("user", "delete", user["name"])
506
507     def _randomUser(self, base={}):
508         """create a user with random attribute values, you can specify base attributes"""
509         user = {
510             "name": self.randomName(),
511             "password": self.random_password(16),
512             "surname": self.randomName(),
513             "given-name": self.randomName(),
514             "job-title": self.randomName(),
515             "department": self.randomName(),
516             "company": self.randomName(),
517             "description": self.randomName(count=100),
518             "createUserFn": self._create_user,
519             "checkUserFn": self._check_user,
520         }
521         user.update(base)
522         return user
523
524     def _randomPosixUser(self, base={}):
525         """create a user with random attribute values and additional RFC2307
526         attributes, you can specify base attributes"""
527         user = self._randomUser({})
528         user.update(base)
529         posixAttributes = {
530             "uid": self.randomName(),
531             "loginShell": self.randomName(),
532             "gecos": self.randomName(),
533             "uidNumber": self.randomXid(),
534             "gidNumber": self.randomXid(),
535             "createUserFn": self._create_posix_user,
536             "checkUserFn": self._check_posix_user,
537         }
538         user.update(posixAttributes)
539         user.update(base)
540         return user
541
542     def _check_user(self, user):
543         """ check if a user from SamDB has the same attributes as its template """
544         found = self._find_user(user["name"])
545
546         self.assertEquals("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
547         self.assertEquals("%s" % found.get("title"), user["job-title"])
548         self.assertEquals("%s" % found.get("company"), user["company"])
549         self.assertEquals("%s" % found.get("description"), user["description"])
550         self.assertEquals("%s" % found.get("department"), user["department"])
551
552     def _check_posix_user(self, user):
553         """ check if a posix_user from SamDB has the same attributes as its template """
554         found = self._find_user(user["name"])
555
556         self.assertEquals("%s" % found.get("loginShell"), user["loginShell"])
557         self.assertEquals("%s" % found.get("gecos"), user["gecos"])
558         self.assertEquals("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
559         self.assertEquals("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
560         self.assertEquals("%s" % found.get("uid"), user["uid"])
561         self._check_user(user)
562
563     def _create_user(self, user):
564         return self.runsubcmd("user", "create", user["name"], user["password"],
565                               "--surname=%s" % user["surname"],
566                               "--given-name=%s" % user["given-name"],
567                               "--job-title=%s" % user["job-title"],
568                               "--department=%s" % user["department"],
569                               "--description=%s" % user["description"],
570                               "--company=%s" % user["company"],
571                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
572                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
573
574     def _create_posix_user(self, user):
575         """ create a new user with RFC2307 attributes """
576         return self.runsubcmd("user", "create", user["name"], user["password"],
577                               "--surname=%s" % user["surname"],
578                               "--given-name=%s" % user["given-name"],
579                               "--job-title=%s" % user["job-title"],
580                               "--department=%s" % user["department"],
581                               "--description=%s" % user["description"],
582                               "--company=%s" % user["company"],
583                               "--gecos=%s" % user["gecos"],
584                               "--login-shell=%s" % user["loginShell"],
585                               "--uid=%s" % user["uid"],
586                               "--uid-number=%s" % user["uidNumber"],
587                               "--gid-number=%s" % user["gidNumber"],
588                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
589                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
590
591     def _find_user(self, name):
592         search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
593         userlist = self.samdb.search(base=self.samdb.domain_dn(),
594                                      scope=ldb.SCOPE_SUBTREE,
595                                      expression=search_filter)
596         if userlist:
597             return userlist[0]
598         else:
599             return None