python: tests: update all super calls to python 3 style in tests
[samba.git] / python / samba / tests / samba_tool / user.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import time
20 import base64
21 import ldb
22 from samba.tests.samba_tool.base import SambaToolCmdTest
23 from samba import (
24         credentials,
25         nttime2unix,
26         dsdb,
27         werror,
28         )
29 from samba.ndr import ndr_unpack
30 from samba.dcerpc import drsblobs
31 from samba.common import get_bytes
32 from samba.common import get_string
33 from samba.tests import env_loadparm
34
35
36 class UserCmdTestCase(SambaToolCmdTest):
37     """Tests for samba-tool user subcommands"""
38     users = []
39     samdb = None
40
41     def setUp(self):
42         super().setUp()
43         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
44                                    "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
45
46         # Modify the default template homedir
47         lp = self.get_loadparm()
48         self.template_homedir = lp.get('template homedir')
49         lp.set('template homedir', '/home/test/%D/%U')
50
51         self.users = []
52         self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
53         self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
54         self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
55         self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
56         self.users.append(self._randomPosixUser({"name": "posixuser1"}))
57         self.users.append(self._randomPosixUser({"name": "posixuser2"}))
58         self.users.append(self._randomPosixUser({"name": "posixuser3"}))
59         self.users.append(self._randomPosixUser({"name": "posixuser4"}))
60         self.users.append(self._randomUnixUser({"name": "unixuser1"}))
61         self.users.append(self._randomUnixUser({"name": "unixuser2"}))
62         self.users.append(self._randomUnixUser({"name": "unixuser3"}))
63         self.users.append(self._randomUnixUser({"name": "unixuser4"}))
64
65         # Make sure users don't exist
66         for user in self.users:
67             if self._find_user(user["name"]):
68                 self.runsubcmd("user", "delete", user["name"])
69
70         # setup the 12 users and ensure they are correct
71         for user in self.users:
72             (result, out, err) = user["createUserFn"](user)
73
74             self.assertCmdSuccess(result, out, err)
75             self.assertEqual(err, "", "Shouldn't be any error messages")
76             if 'unix' in user["name"]:
77                 self.assertIn("Modified User '%s' successfully" % user["name"],
78                               out)
79             else:
80                 self.assertIn("User '%s' added successfully" % user["name"],
81                               out)
82
83             user["checkUserFn"](user)
84
85     def tearDown(self):
86         super().tearDown()
87         # clean up all the left over users, just in case
88         for user in self.users:
89             if self._find_user(user["name"]):
90                 self.runsubcmd("user", "delete", user["name"])
91         lp = env_loadparm()
92         # second run of this test
93         # the cache is still there and '--cache-ldb-initialize'
94         # will fail
95         cachedb = lp.private_path("user-syncpasswords-cache.ldb")
96         if os.path.exists(cachedb):
97             os.remove(cachedb)
98         lp.set('template homedir', self.template_homedir)
99
100     def test_newuser(self):
101         # try to add all the users again, this should fail
102         for user in self.users:
103             (result, out, err) = self._create_user(user)
104             self.assertCmdFail(result, "Ensure that create user fails")
105             self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
106
107         # try to delete all the 4 users we just added
108         for user in self.users:
109             (result, out, err) = self.runsubcmd("user", "delete", user["name"])
110             self.assertCmdSuccess(result, out, err, "Can we delete users")
111             found = self._find_user(user["name"])
112             self.assertIsNone(found)
113
114         # test adding users with --use-username-as-cn
115         for user in self.users:
116             (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
117                                                 "--use-username-as-cn",
118                                                 "--surname=%s" % user["surname"],
119                                                 "--given-name=%s" % user["given-name"],
120                                                 "--job-title=%s" % user["job-title"],
121                                                 "--department=%s" % user["department"],
122                                                 "--description=%s" % user["description"],
123                                                 "--company=%s" % user["company"],
124                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
125                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
126
127             self.assertCmdSuccess(result, out, err)
128             self.assertEqual(err, "", "Shouldn't be any error messages")
129             self.assertIn("User '%s' added successfully" % user["name"], out)
130
131             found = self._find_user(user["name"])
132
133             self.assertEqual("%s" % found.get("cn"), "%(name)s" % user)
134             self.assertEqual("%s" % found.get("name"), "%(name)s" % user)
135
136     def test_newuser_weak_password(self):
137         # Ensure that when we try to create a user over LDAP (thus no
138         # transactions) and the password is too weak, we do not get a
139         # half-created account.
140
141         def cleanup_user(username):
142             try:
143                 self.samdb.deleteuser(username)
144             except Exception as err:
145                 estr = err.args[0]
146                 if 'Unable to find user' not in estr:
147                     raise
148
149         server = os.environ['DC_SERVER']
150         dc_username = os.environ['DC_USERNAME']
151         dc_password = os.environ['DC_PASSWORD']
152
153         username = self.randomName()
154         password = 'a'
155
156         self.addCleanup(cleanup_user, username)
157
158         # Try to add the user and ensure it fails.
159         result, out, err = self.runsubcmd('user', 'add',
160                                           username, password,
161                                           '-H', f'ldap://{server}',
162                                           f'-U{dc_username}%{dc_password}')
163         self.assertCmdFail(result)
164         self.assertIn('Failed to add user', err)
165         self.assertIn('LDAP_CONSTRAINT_VIOLATION', err)
166         self.assertIn(f'{werror.WERR_PASSWORD_RESTRICTION:08X}', err)
167
168         # Now search for the user, and make sure we don't find anything.
169         res = self.samdb.search(self.samdb.domain_dn(),
170                                 expression=f'(sAMAccountName={username})',
171                                 scope=ldb.SCOPE_SUBTREE)
172         self.assertEqual(0, len(res), 'expected not to find the user')
173
174     def _verify_supplementalCredentials(self, ldif,
175                                         min_packages=3,
176                                         max_packages=6):
177         msgs = self.samdb.parse_ldif(ldif)
178         (changetype, obj) = next(msgs)
179
180         self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
181         sc_blob = obj["supplementalCredentials"][0]
182         sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
183
184         self.assertGreaterEqual(sc.sub.num_packages,
185                                 min_packages, "min_packages check")
186         self.assertLessEqual(sc.sub.num_packages,
187                              max_packages, "max_packages check")
188
189         if max_packages == 0:
190             return
191
192         def find_package(packages, name, start_idx=0):
193             for i in range(start_idx, len(packages)):
194                 if packages[i].name == name:
195                     return (i, packages[i])
196             return (None, None)
197
198         # The ordering is this
199         #
200         # Primary:Kerberos-Newer-Keys (optional)
201         # Primary:Kerberos
202         # Primary:WDigest
203         # Primary:CLEARTEXT (optional)
204         # Primary:SambaGPG (optional)
205         #
206         # And the 'Packages' package is insert before the last
207         # other package.
208
209         nidx = 0
210         (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
211         self.assertIsNotNone(pp, "Packages required")
212         self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
213                          "Packages needs to be at num_packages - 1")
214
215         (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
216                                     start_idx=nidx)
217         if knidx is not None:
218             self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
219             nidx = nidx + 1
220             if nidx == pidx:
221                 nidx = nidx + 1
222
223         (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
224                                   start_idx=nidx)
225         self.assertIsNotNone(pp, "Primary:Kerberos required")
226         self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
227         nidx = nidx + 1
228         if nidx == pidx:
229             nidx = nidx + 1
230
231         (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
232                                   start_idx=nidx)
233         self.assertIsNotNone(pp, "Primary:WDigest required")
234         self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
235         nidx = nidx + 1
236         if nidx == pidx:
237             nidx = nidx + 1
238
239         (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
240                                   start_idx=nidx)
241         if cidx is not None:
242             self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
243             nidx = nidx + 1
244             if nidx == pidx:
245                 nidx = nidx + 1
246
247         (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
248                                   start_idx=nidx)
249         if gidx is not None:
250             self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
251             nidx = nidx + 1
252             if nidx == pidx:
253                 nidx = nidx + 1
254
255         self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
256
257     def test_setpassword(self):
258         expect_nt_hash = bool(int(os.environ.get("EXPECT_NT_HASH", "1")))
259
260         for user in self.users:
261             newpasswd = self.random_password(16)
262             (result, out, err) = self.runsubcmd("user", "setpassword",
263                                                 user["name"],
264                                                 "--newpassword=%s" % newpasswd,
265                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
266                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
267             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
268             self.assertEqual(err, "", "setpassword with url")
269             self.assertMatch(out, "Changed password OK", "setpassword with url")
270
271         attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
272         (result, out, err) = self.runsubcmd("user", "syncpasswords",
273                                             "--cache-ldb-initialize",
274                                             "--attributes=%s" % attributes,
275                                             "--decrypt-samba-gpg")
276         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
277         self.assertEqual(err, "", "getpassword without url")
278         cache_attrs = {
279             "objectClass": {"value": "userSyncPasswords"},
280             "samdbUrl": {},
281             "dirsyncFilter": {},
282             "dirsyncAttribute": {},
283             "dirsyncControl": {"value": "dirsync:1:0:0"},
284             "passwordAttribute": {},
285             "decryptSambaGPG": {},
286             "currentTime": {},
287         }
288         for a in cache_attrs.keys():
289             v = cache_attrs[a].get("value", "")
290             self.assertMatch(out, "%s: %s" % (a, v),
291                              "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
292
293         (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
294         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
295         self.assertEqual(err, "", "syncpasswords --no-wait")
296         self.assertMatch(out, "dirsync_loop(): results 0",
297                          "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
298         for user in self.users:
299             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
300                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
301
302         for user in self.users:
303             newpasswd = self.random_password(16)
304             creds = credentials.Credentials()
305             creds.set_anonymous()
306             creds.set_password(newpasswd)
307             unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
308             virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8')
309             virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8')
310
311             (result, out, err) = self.runsubcmd("user", "setpassword",
312                                                 user["name"],
313                                                 "--newpassword=%s" % newpasswd)
314             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
315             self.assertEqual(err, "", "setpassword without url")
316             self.assertMatch(out, "Changed password OK", "setpassword without url")
317
318             (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
319             self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
320             self.assertEqual(err, "", "syncpasswords --no-wait")
321             self.assertMatch(out, "dirsync_loop(): results 0",
322                              "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
323             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
324                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
325             self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
326                              "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
327             if expect_nt_hash:
328                 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
329                                  "getpassword unicodePwd: out[%s]" % out)
330             else:
331                 self.assertNotIn("unicodePwd:: %s" % unicodePwd, out)
332             self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
333                              "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
334             self.assertMatch(out, "supplementalCredentials:: ",
335                              "getpassword supplementalCredentials: out[%s]" % out)
336             if "virtualSambaGPG:: " in out:
337                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
338                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
339                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
340                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
341                 self.assertMatch(out, "virtualSSHA: ",
342                                  "getpassword virtualSSHA: out[%s]" % out)
343
344             (result, out, err) = self.runsubcmd("user", "getpassword",
345                                                 user["name"],
346                                                 "--attributes=%s" % attributes,
347                                                 "--decrypt-samba-gpg")
348             self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
349             self.assertEqual(err, "", "getpassword without url")
350             self.assertMatch(out, "Got password OK", "getpassword without url")
351             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
352                              "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
353             if expect_nt_hash:
354                 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
355                                  "getpassword unicodePwd: out[%s]" % out)
356             else:
357                 self.assertNotIn("unicodePwd:: %s" % unicodePwd, out)
358             self.assertMatch(out, "supplementalCredentials:: ",
359                              "getpassword supplementalCredentials: out[%s]" % out)
360             self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
361             if "virtualSambaGPG:: " in out:
362                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
363                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
364                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
365                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
366                 self.assertMatch(out, "virtualSSHA: ",
367                                  "getpassword virtualSSHA: out[%s]" % out)
368
369         for user in self.users:
370             newpasswd = self.random_password(16)
371             (result, out, err) = self.runsubcmd("user", "setpassword",
372                                                 user["name"],
373                                                 "--newpassword=%s" % newpasswd,
374                                                 "--must-change-at-next-login",
375                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
376                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
377             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
378             self.assertEqual(err, "", "setpassword with forced change")
379             self.assertMatch(out, "Changed password OK", "setpassword with forced change")
380
381     def test_setexpiry(self):
382         for user in self.users:
383             twodays = time.time() + (2 * 24 * 60 * 60)
384
385             (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
386                                                 "--days=2",
387                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
388                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
389             self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
390             self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
391
392             found = self._find_user(user["name"])
393
394             expires = nttime2unix(int("%s" % found.get("accountExpires")))
395             self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
396
397         # TODO: re-enable this after the filter case is sorted out
398         if "filters are broken, bail now":
399             return
400
401         # now run the expiration based on a filter
402         fourdays = time.time() + (4 * 24 * 60 * 60)
403         (result, out, err) = self.runsubcmd("user", "setexpiry",
404                                             "--filter", "(&(objectClass=user)(company=comp2))",
405                                             "--days=4",
406                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
407                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
408         self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
409
410         for user in self.users:
411             found = self._find_user(user["name"])
412             if ("%s" % found.get("company")) == "comp2":
413                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
414                 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
415             else:
416                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
417                 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
418
419     def test_list(self):
420         (result, out, err) = self.runsubcmd("user", "list",
421                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
422                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
423                                                           os.environ["DC_PASSWORD"]))
424         self.assertCmdSuccess(result, out, err, "Error running list")
425
426         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
427                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
428
429         userlist = self.samdb.search(base=self.samdb.domain_dn(),
430                                      scope=ldb.SCOPE_SUBTREE,
431                                      expression=search_filter,
432                                      attrs=["samaccountname"])
433
434         self.assertTrue(len(userlist) > 0, "no users found in samdb")
435
436         for userobj in userlist:
437             name = str(userobj.get("samaccountname", idx=0))
438             self.assertMatch(out, name,
439                              "user '%s' not found" % name)
440
441
442     def test_list_base_dn(self):
443         base_dn = "CN=Users"
444         (result, out, err) = self.runsubcmd("user", "list", "-b", base_dn,
445                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
446                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
447                                                           os.environ["DC_PASSWORD"]))
448         self.assertCmdSuccess(result, out, err, "Error running list")
449
450         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
451                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
452
453         userlist = self.samdb.search(base=self.samdb.normalize_dn_in_domain(base_dn),
454                                      scope=ldb.SCOPE_SUBTREE,
455                                      expression=search_filter,
456                                      attrs=["samaccountname"])
457
458         self.assertTrue(len(userlist) > 0, "no users found in samdb")
459
460         for userobj in userlist:
461             name = str(userobj.get("samaccountname", idx=0))
462             self.assertMatch(out, name,
463                              "user '%s' not found" % name)
464
465     def test_list_full_dn(self):
466         (result, out, err) = self.runsubcmd("user", "list", "--full-dn",
467                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
468                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
469                                                           os.environ["DC_PASSWORD"]))
470         self.assertCmdSuccess(result, out, err, "Error running list")
471
472         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
473                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
474
475         userlist = self.samdb.search(base=self.samdb.domain_dn(),
476                                      scope=ldb.SCOPE_SUBTREE,
477                                      expression=search_filter,
478                                      attrs=["dn"])
479
480         self.assertTrue(len(userlist) > 0, "no users found in samdb")
481
482         for userobj in userlist:
483             name = str(userobj.get("dn", idx=0))
484             self.assertMatch(out, name,
485                              "user '%s' not found" % name)
486
487     def test_list_hide_expired(self):
488         expire_username = "expireUser"
489         expire_user = self._randomUser({"name": expire_username})
490         self._create_user(expire_user)
491
492         (result, out, err) = self.runsubcmd(
493             "user",
494             "list",
495             "--hide-expired",
496             "-H",
497             "ldap://%s" % os.environ["DC_SERVER"],
498             "-U%s%%%s" % (os.environ["DC_USERNAME"],
499                           os.environ["DC_PASSWORD"]))
500         self.assertCmdSuccess(result, out, err, "Error running list")
501         self.assertTrue(expire_username in out,
502                         "user '%s' not found" % expire_username)
503
504         # user will be expired one second ago
505         self.samdb.setexpiry(
506             "(sAMAccountname=%s)" % expire_username,
507             -1,
508             False)
509
510         (result, out, err) = self.runsubcmd(
511             "user",
512             "list",
513             "--hide-expired",
514             "-H",
515             "ldap://%s" % os.environ["DC_SERVER"],
516             "-U%s%%%s" % (os.environ["DC_USERNAME"],
517                           os.environ["DC_PASSWORD"]))
518         self.assertCmdSuccess(result, out, err, "Error running list")
519         self.assertFalse(expire_username in out,
520                          "user '%s' found" % expire_username)
521
522         self.samdb.deleteuser(expire_username)
523
524     def test_list_hide_disabled(self):
525         disable_username = "disableUser"
526         disable_user = self._randomUser({"name": disable_username})
527         self._create_user(disable_user)
528
529         (result, out, err) = self.runsubcmd(
530             "user",
531             "list",
532             "--hide-disabled",
533             "-H",
534             "ldap://%s" % os.environ["DC_SERVER"],
535             "-U%s%%%s" % (os.environ["DC_USERNAME"],
536                           os.environ["DC_PASSWORD"]))
537         self.assertCmdSuccess(result, out, err, "Error running list")
538         self.assertTrue(disable_username in out,
539                         "user '%s' not found" % disable_username)
540
541         self.samdb.disable_account("(sAMAccountname=%s)" % disable_username)
542
543         (result, out, err) = self.runsubcmd(
544             "user",
545             "list",
546             "--hide-disabled",
547             "-H",
548             "ldap://%s" % os.environ["DC_SERVER"],
549             "-U%s%%%s" % (os.environ["DC_USERNAME"],
550                           os.environ["DC_PASSWORD"]))
551         self.assertCmdSuccess(result, out, err, "Error running list")
552         self.assertFalse(disable_username in out,
553                          "user '%s' found" % disable_username)
554
555         self.samdb.deleteuser(disable_username)
556
557     def test_show(self):
558         for user in self.users:
559             (result, out, err) = self.runsubcmd(
560                 "user", "show", user["name"],
561                 "--attributes=sAMAccountName,company",
562                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
563                 "-U%s%%%s" % (os.environ["DC_USERNAME"],
564                               os.environ["DC_PASSWORD"]))
565             self.assertCmdSuccess(result, out, err, "Error running show")
566
567             expected_out = """dn: CN=%s %s,CN=Users,%s
568 company: %s
569 sAMAccountName: %s
570
571 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
572                 user["company"], user["name"])
573
574             self.assertEqual(out, expected_out,
575                              "Unexpected show output for user '%s'" %
576                              user["name"])
577
578             time_attrs = [
579                 "name", # test that invalid values are just ignored
580                 "whenCreated",
581                 "whenChanged",
582                 "accountExpires",
583                 "badPasswordTime",
584                 "lastLogoff",
585                 "lastLogon",
586                 "lastLogonTimestamp",
587                 "lockoutTime",
588                 "msDS-UserPasswordExpiryTimeComputed",
589                 "pwdLastSet",
590                 ]
591
592             attrs = []
593             for ta in time_attrs:
594                 attrs.append(ta)
595                 for fm in ["GeneralizedTime", "UnixTime", "TimeSpec"]:
596                     attrs.append("%s;format=%s" % (ta, fm))
597
598             (result, out, err) = self.runsubcmd(
599                 "user", "show", user["name"],
600                 "--attributes=%s" % ",".join(attrs),
601                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
602                 "-U%s%%%s" % (os.environ["DC_USERNAME"],
603                               os.environ["DC_PASSWORD"]))
604             self.assertCmdSuccess(result, out, err, "Error running show")
605
606             self.assertIn(";format=GeneralizedTime", out)
607             self.assertIn(";format=UnixTime", out)
608             self.assertIn(";format=TimeSpec", out)
609
610             self.assertIn("name: ", out)
611             self.assertNotIn("name;format=GeneralizedTime: ", out)
612             self.assertNotIn("name;format=UnixTime: ", out)
613             self.assertNotIn("name;format=TimeSpec: ", out)
614
615             self.assertIn("whenCreated: 20", out)
616             self.assertIn("whenCreated;format=GeneralizedTime: 20", out)
617             self.assertIn("whenCreated;format=UnixTime: 1", out)
618             self.assertIn("whenCreated;format=TimeSpec: 1", out)
619
620             self.assertIn("whenChanged: 20", out)
621             self.assertIn("whenChanged;format=GeneralizedTime: 20", out)
622             self.assertIn("whenChanged;format=UnixTime: 1", out)
623             self.assertIn("whenChanged;format=TimeSpec: 1", out)
624
625             self.assertIn("accountExpires: 9223372036854775807", out)
626             self.assertNotIn("accountExpires;format=GeneralizedTime: ", out)
627             self.assertNotIn("accountExpires;format=UnixTime: ", out)
628             self.assertNotIn("accountExpires;format=TimeSpec: ", out)
629
630             self.assertIn("badPasswordTime: 0", out)
631             self.assertNotIn("badPasswordTime;format=GeneralizedTime: ", out)
632             self.assertNotIn("badPasswordTime;format=UnixTime: ", out)
633             self.assertNotIn("badPasswordTime;format=TimeSpec: ", out)
634
635             self.assertIn("lastLogoff: 0", out)
636             self.assertNotIn("lastLogoff;format=GeneralizedTime: ", out)
637             self.assertNotIn("lastLogoff;format=UnixTime: ", out)
638             self.assertNotIn("lastLogoff;format=TimeSpec: ", out)
639
640             self.assertIn("lastLogon: 0", out)
641             self.assertNotIn("lastLogon;format=GeneralizedTime: ", out)
642             self.assertNotIn("lastLogon;format=UnixTime: ", out)
643             self.assertNotIn("lastLogon;format=TimeSpec: ", out)
644
645             # If a specified attribute is not available on a user object
646             # it's silently omitted.
647             self.assertNotIn("lastLogonTimestamp:", out)
648             self.assertNotIn("lockoutTime:", out)
649
650             self.assertIn("msDS-UserPasswordExpiryTimeComputed: 1", out)
651             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime: 20", out)
652             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime: 1", out)
653             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec: 1", out)
654
655             self.assertIn("pwdLastSet: 1", out)
656             self.assertIn("pwdLastSet;format=GeneralizedTime: 20", out)
657             self.assertIn("pwdLastSet;format=UnixTime: 1", out)
658             self.assertIn("pwdLastSet;format=TimeSpec: 1", out)
659
660             out_msgs = self.samdb.parse_ldif(out)
661             out_msg = next(out_msgs)[1]
662
663             self.assertIn("whenCreated", out_msg)
664             when_created_str = str(out_msg["whenCreated"][0])
665             self.assertIn("whenCreated;format=GeneralizedTime", out_msg)
666             self.assertEqual(str(out_msg["whenCreated;format=GeneralizedTime"][0]), when_created_str)
667             when_created_time = ldb.string_to_time(when_created_str)
668             self.assertIn("whenCreated;format=UnixTime", out_msg)
669             self.assertEqual(str(out_msg["whenCreated;format=UnixTime"][0]), str(when_created_time))
670             self.assertIn("whenCreated;format=TimeSpec", out_msg)
671             self.assertEqual(str(out_msg["whenCreated;format=TimeSpec"][0]),
672                              "%d.000000000" % (when_created_time))
673
674             self.assertIn("whenChanged", out_msg)
675             when_changed_str = str(out_msg["whenChanged"][0])
676             self.assertIn("whenChanged;format=GeneralizedTime", out_msg)
677             self.assertEqual(str(out_msg["whenChanged;format=GeneralizedTime"][0]), when_changed_str)
678             when_changed_time = ldb.string_to_time(when_changed_str)
679             self.assertIn("whenChanged;format=UnixTime", out_msg)
680             self.assertEqual(str(out_msg["whenChanged;format=UnixTime"][0]), str(when_changed_time))
681             self.assertIn("whenChanged;format=TimeSpec", out_msg)
682             self.assertEqual(str(out_msg["whenChanged;format=TimeSpec"][0]),
683                              "%d.000000000" % (when_changed_time))
684
685             self.assertIn("pwdLastSet;format=GeneralizedTime", out_msg)
686             pwd_last_set_str = str(out_msg["pwdLastSet;format=GeneralizedTime"][0])
687             pwd_last_set_time = ldb.string_to_time(pwd_last_set_str)
688             self.assertIn("pwdLastSet;format=UnixTime", out_msg)
689             self.assertEqual(str(out_msg["pwdLastSet;format=UnixTime"][0]), str(pwd_last_set_time))
690             self.assertIn("pwdLastSet;format=TimeSpec", out_msg)
691             self.assertIn("%d." % pwd_last_set_time, str(out_msg["pwdLastSet;format=TimeSpec"][0]))
692             self.assertNotIn(".000000000", str(out_msg["pwdLastSet;format=TimeSpec"][0]))
693
694             # assert that the pwd has been set in the minute after user creation
695             self.assertGreaterEqual(pwd_last_set_time, when_created_time)
696             self.assertLess(pwd_last_set_time, when_created_time + 60)
697
698             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime", out_msg)
699             pwd_expires_str = str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime"][0])
700             pwd_expires_time = ldb.string_to_time(pwd_expires_str)
701             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime", out_msg)
702             self.assertEqual(str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=UnixTime"][0]), str(pwd_expires_time))
703             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec", out_msg)
704             self.assertIn("%d." % pwd_expires_time, str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
705             self.assertNotIn(".000000000", str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
706
707             # assert that the pwd expires after it was set
708             self.assertGreater(pwd_expires_time, pwd_last_set_time)
709
710     def test_move(self):
711         full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest_usr"))
712         self.addCleanup(self.samdb.delete, full_ou_dn, ["tree_delete:1"])
713
714         (result, out, err) = self.runsubcmd("ou", "add", full_ou_dn)
715         self.assertCmdSuccess(result, out, err)
716         self.assertEqual(err, "", "There shouldn't be any error message")
717         self.assertIn('Added ou "%s"' % full_ou_dn, out)
718
719         for user in self.users:
720             (result, out, err) = self.runsubcmd(
721                 "user", "move", user["name"], full_ou_dn)
722             self.assertCmdSuccess(result, out, err, "Error running move")
723             self.assertIn('Moved user "%s" into "%s"' %
724                           (user["name"], full_ou_dn), out)
725
726         # Should fail as users objects are in OU
727         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
728         self.assertCmdFail(result)
729         self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
730                        "(it has %d children)!") % len(self.users), err)
731
732         for user in self.users:
733             new_dn = "CN=Users,%s" % self.samdb.domain_dn()
734             (result, out, err) = self.runsubcmd(
735                 "user", "move", user["name"], new_dn)
736             self.assertCmdSuccess(result, out, err, "Error running move")
737             self.assertIn('Moved user "%s" into "%s"' %
738                           (user["name"], new_dn), out)
739
740     def test_rename_surname_initials_givenname(self):
741         """rename the existing surname and given name and add missing
742         initials, then remove them, for all users"""
743         for user in self.users:
744             new_givenname = "new_given_name_of_" + user["name"]
745             new_initials = "A"
746             new_surname = "new_surname_of_" + user["name"]
747             found = self._find_user(user["name"])
748             old_cn = str(found.get("cn"))
749
750             # rename given name, initials and surname
751             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
752                                                 "--surname=%s" % new_surname,
753                                                 "--initials=%s" % new_initials,
754                                                 "--given-name=%s" % new_givenname)
755             self.assertCmdSuccess(result, out, err)
756             self.assertEqual(err, "", "Shouldn't be any error messages")
757             self.assertIn('successfully', out)
758
759             found = self._find_user(user["name"])
760             self.assertEqual("%s" % found.get("givenName"), new_givenname)
761             self.assertEqual("%s" % found.get("initials"), new_initials)
762             self.assertEqual("%s" % found.get("sn"), new_surname)
763             self.assertEqual("%s" % found.get("name"),
764                              "%s %s. %s" % (new_givenname, new_initials, new_surname))
765             self.assertEqual("%s" % found.get("cn"),
766                              "%s %s. %s" % (new_givenname, new_initials, new_surname))
767
768             # remove given name, initials and surname
769             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
770                                                 "--surname=",
771                                                 "--initials=",
772                                                 "--given-name=")
773             self.assertCmdSuccess(result, out, err)
774             self.assertEqual(err, "", "Shouldn't be any error messages")
775             self.assertIn('successfully', out)
776
777             found = self._find_user(user["name"])
778             self.assertEqual(found.get("givenName"), None)
779             self.assertEqual(found.get("initials"), None)
780             self.assertEqual(found.get("sn"), None)
781             self.assertEqual("%s" % found.get("cn"), user["name"])
782
783             # reset changes (initials are removed)
784             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
785                                                 "--surname=%(surname)s" % user,
786                                                 "--given-name=%(given-name)s" % user)
787             self.assertCmdSuccess(result, out, err)
788
789             if old_cn:
790                 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
791                                                 "--force-new-cn=%s" % old_cn)
792
793     def test_rename_cn_samaccountname(self):
794         """rename and try to remove the cn and the samaccount of all users"""
795         for user in self.users:
796             new_cn = "new_cn_of_" + user["name"]
797             new_samaccountname = "new_samaccount_of_" + user["name"]
798             new_surname = "new_surname_of_" + user["name"]
799
800             # rename cn
801             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
802                                                 "--samaccountname=%s"
803                                                  % new_samaccountname,
804                                                 "--force-new-cn=%s" % new_cn)
805             self.assertCmdSuccess(result, out, err)
806             self.assertEqual(err, "", "Shouldn't be any error messages")
807             self.assertIn('successfully', out)
808
809             found = self._find_user(new_samaccountname)
810             self.assertEqual("%s" % found.get("cn"), new_cn)
811             self.assertEqual("%s" % found.get("sAMAccountName"),
812                              new_samaccountname)
813
814             # changing the surname has no effect to the cn
815             (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
816                                                 "--surname=%s" % new_surname)
817             self.assertCmdSuccess(result, out, err)
818
819             found = self._find_user(new_samaccountname)
820             self.assertEqual("%s" % found.get("cn"), new_cn)
821
822             # trying to remove cn (throws an error)
823             (result, out, err) = self.runsubcmd("user", "rename",
824                                                 new_samaccountname,
825                                                 "--force-new-cn=")
826             self.assertCmdFail(result)
827             self.assertIn('Failed to rename user', err)
828             self.assertIn("delete protected attribute", err)
829
830             # trying to remove the samccountname (throws an error)
831             (result, out, err) = self.runsubcmd("user", "rename",
832                                                 new_samaccountname,
833                                                 "--samaccountname=")
834             self.assertCmdFail(result)
835             self.assertIn('Failed to rename user', err)
836             self.assertIn('delete protected attribute', err)
837
838             # reset changes (cn must be the name)
839             (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
840                                                 "--samaccountname=%(name)s"
841                                                   % user,
842                                                 "--force-new-cn=%(name)s" % user)
843             self.assertCmdSuccess(result, out, err)
844
845     def test_rename_standard_cn(self):
846         """reset the cn of all users to the standard"""
847         for user in self.users:
848             new_cn = "new_cn_of_" + user["name"]
849             new_givenname = "new_given_name_of_" + user["name"]
850             new_initials = "A"
851             new_surname = "new_surname_of_" + user["name"]
852
853             # set different cn
854             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
855                                                 "--force-new-cn=%s" % new_cn)
856             self.assertCmdSuccess(result, out, err)
857
858             # remove given name, initials and surname
859             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
860                                                 "--surname=",
861                                                 "--initials=",
862                                                 "--given-name=")
863             self.assertCmdSuccess(result, out, err)
864
865             # reset the CN (no given name, initials or surname --> samaccountname)
866             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
867                                                 "--reset-cn")
868
869             self.assertCmdSuccess(result, out, err)
870             self.assertEqual(err, "", "Shouldn't be any error messages")
871             self.assertIn('successfully', out)
872
873             found = self._find_user(user["name"])
874             self.assertEqual("%s" % found.get("cn"), user["name"])
875
876             # set given name, initials and surname and set different cn
877             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
878                                                 "--force-new-cn=%s" % new_cn,
879                                                 "--surname=%s" % new_surname,
880                                                 "--initials=%s" % new_initials,
881                                                 "--given-name=%s" % new_givenname)
882             self.assertCmdSuccess(result, out, err)
883
884             # reset the CN (given name, initials or surname are given --> given name)
885             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
886                                                 "--reset-cn")
887
888             self.assertCmdSuccess(result, out, err)
889             self.assertEqual(err, "", "Shouldn't be any error messages")
890             self.assertIn('successfully', out)
891
892             found = self._find_user(user["name"])
893             self.assertEqual("%s" % found.get("cn"),
894                              "%s %s. %s" % (new_givenname, new_initials, new_surname))
895
896             # reset changes
897             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
898                                                 "--reset-cn",
899                                                 "--initials=",
900                                                 "--surname=%(surname)s" % user,
901                                                 "--given-name=%(given-name)s" % user)
902             self.assertCmdSuccess(result, out, err)
903
904     def test_rename_mailaddress_displayname(self):
905         for user in self.users:
906             new_mail = "new_mailaddress_of_" + user["name"]
907             new_displayname = "new displayname of " + user["name"]
908
909             # change mail and displayname
910             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
911                                                 "--mail-address=%s"
912                                                   % new_mail,
913                                                 "--display-name=%s"
914                                                   % new_displayname)
915             self.assertCmdSuccess(result, out, err)
916             self.assertEqual(err, "", "Shouldn't be any error messages")
917             self.assertIn('successfully', out)
918
919             found = self._find_user(user["name"])
920             self.assertEqual("%s" % found.get("mail"), new_mail)
921             self.assertEqual("%s" % found.get("displayName"), new_displayname)
922
923             # remove mail and displayname
924             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
925                                                 "--mail-address=",
926                                                 "--display-name=")
927             self.assertCmdSuccess(result, out, err)
928             self.assertEqual(err, "", "Shouldn't be any error messages")
929             self.assertIn('successfully', out)
930
931             found = self._find_user(user["name"])
932             self.assertEqual(found.get("mail"), None)
933             self.assertEqual(found.get("displayName"), None)
934
935     def test_rename_upn(self):
936         """rename upn of all users"""
937         for user in self.users:
938             found = self._find_user(user["name"])
939             old_upn = "%s" % found.get("userPrincipalName")
940             valid_suffix = old_upn.split('@')[1]   # samba.example.com
941
942             valid_new_upn = "new_%s@%s" % (user["name"], valid_suffix)
943             invalid_new_upn = "%s@invalid.suffix" + user["name"]
944
945             # trying to set invalid upn
946             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
947                                                 "--upn=%s"
948                                                   % invalid_new_upn)
949             self.assertCmdFail(result)
950             self.assertIn('is not a valid upn', err)
951
952             # set valid upn
953             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
954                                                 "--upn=%s"
955                                                   % valid_new_upn)
956             self.assertCmdSuccess(result, out, err)
957             self.assertEqual(err, "", "Shouldn't be any error messages")
958             self.assertIn('successfully', out)
959
960             found = self._find_user(user["name"])
961             self.assertEqual("%s" % found.get("userPrincipalName"), valid_new_upn)
962
963             # trying to remove upn
964             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
965                                                 "--upn=%s")
966             self.assertCmdFail(result)
967             self.assertIn('is not a valid upn', err)
968
969             # reset upn
970             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
971                                                 "--upn=%s" % old_upn)
972             self.assertCmdSuccess(result, out, err)
973
974     def test_getpwent(self):
975         try:
976             import pwd
977         except ImportError:
978             self.skipTest("Skipping getpwent test, no 'pwd' module available")
979             return
980
981         # get the current user's data for the test
982         uid = os.geteuid()
983         try:
984             u = pwd.getpwuid(uid)
985         except KeyError:
986             self.skipTest("Skipping getpwent test, current EUID not found in NSS")
987             return
988
989
990 # samba-tool user create command didn't support users with empty gecos if none is
991 # specified on the command line and the user hasn't one in the passwd file it
992 # will fail, so let's add some contents
993
994         gecos = u[4]
995         if (gecos is None or len(gecos) == 0):
996             gecos = "Foo GECOS"
997         user = self._randomPosixUser({
998                         "name": u[0],
999                         "uid": u[0],
1000                         "uidNumber": u[2],
1001                         "gidNumber": u[3],
1002                         "gecos": gecos,
1003                         "loginShell": u[6],
1004                         })
1005
1006         # Remove user if it already exists
1007         if self._find_user(u[0]):
1008             self.runsubcmd("user", "delete", u[0])
1009         # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
1010         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
1011                                             "--surname=%s" % user["surname"],
1012                                             "--given-name=%s" % user["given-name"],
1013                                             "--job-title=%s" % user["job-title"],
1014                                             "--department=%s" % user["department"],
1015                                             "--description=%s" % user["description"],
1016                                             "--company=%s" % user["company"],
1017                                             "--gecos=%s" % user["gecos"],
1018                                             "--rfc2307-from-nss",
1019                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
1020                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1021
1022         self.assertCmdSuccess(result, out, err)
1023         self.assertEqual(err, "", "Shouldn't be any error messages")
1024         self.assertIn("User '%s' added successfully" % user["name"], out)
1025
1026         self._check_posix_user(user)
1027         self.runsubcmd("user", "delete", user["name"])
1028
1029         # Check if overriding the attributes from NSS with explicit values works
1030         #
1031         # get a user with all random posix attributes
1032         user = self._randomPosixUser({"name": u[0]})
1033
1034         # Remove user if it already exists
1035         if self._find_user(u[0]):
1036             self.runsubcmd("user", "delete", u[0])
1037         # create a user with posix attributes from nss but override all of them with the
1038         # random ones just obtained
1039         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
1040                                             "--surname=%s" % user["surname"],
1041                                             "--given-name=%s" % user["given-name"],
1042                                             "--job-title=%s" % user["job-title"],
1043                                             "--department=%s" % user["department"],
1044                                             "--description=%s" % user["description"],
1045                                             "--company=%s" % user["company"],
1046                                             "--rfc2307-from-nss",
1047                                             "--gecos=%s" % user["gecos"],
1048                                             "--login-shell=%s" % user["loginShell"],
1049                                             "--uid=%s" % user["uid"],
1050                                             "--uid-number=%s" % user["uidNumber"],
1051                                             "--gid-number=%s" % user["gidNumber"],
1052                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
1053                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1054
1055         self.assertCmdSuccess(result, out, err)
1056         self.assertEqual(err, "", "Shouldn't be any error messages")
1057         self.assertIn("User '%s' added successfully" % user["name"], out)
1058
1059         self._check_posix_user(user)
1060         self.runsubcmd("user", "delete", user["name"])
1061
1062     # Test: samba-tool user unlock
1063     # This test does not verify that the command unlocks the user, it just
1064     # tests the command itself. The unlock test, which unlocks locked users,
1065     # is located in the 'samba4.ldap.password_lockout' test in
1066     # source4/dsdb/tests/python/password_lockout.py
1067     def test_unlock(self):
1068
1069         # try to unlock a nonexistent user, this should fail
1070         nonexistentusername = "userdoesnotexist"
1071         (result, out, err) = self.runsubcmd(
1072             "user", "unlock", nonexistentusername)
1073         self.assertCmdFail(result, "Ensure that unlock nonexistent user fails")
1074         self.assertIn("Failed to unlock user '%s'" % nonexistentusername, err)
1075         self.assertIn("Unable to find user", err)
1076
1077         # try to unlock with insufficient permissions, this should fail
1078         unprivileged_username = "unprivilegedunlockuser"
1079         unlocktest_username = "usertounlock"
1080
1081         self.runsubcmd("user", "add", unprivileged_username, "Passw0rd")
1082         self.runsubcmd("user", "add", unlocktest_username, "Passw0rd")
1083
1084         (result, out, err) = self.runsubcmd(
1085             "user", "unlock", unlocktest_username,
1086             "-H", "ldap://%s" % os.environ["DC_SERVER"],
1087             "-U%s%%%s" % (unprivileged_username,
1088                           "Passw0rd"))
1089         self.assertCmdFail(result, "Fail with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
1090         self.assertIn("Failed to unlock user '%s'" % unlocktest_username, err)
1091         self.assertIn("LDAP error 50 LDAP_INSUFFICIENT_ACCESS_RIGHTS", err)
1092
1093         self.runsubcmd("user", "delete", unprivileged_username)
1094         self.runsubcmd("user", "delete", unlocktest_username)
1095
1096         # run unlock against test users
1097         for user in self.users:
1098             (result, out, err) = self.runsubcmd(
1099                 "user", "unlock", user["name"])
1100             self.assertCmdSuccess(result, out, err, "Error running user unlock")
1101             self.assertEqual(err, "", "Shouldn't be any error messages")
1102
1103     def _randomUser(self, base=None):
1104         """create a user with random attribute values, you can specify base attributes"""
1105         if base is None:
1106             base = {}
1107         user = {
1108             "name": self.randomName(),
1109             "password": self.random_password(16),
1110             "surname": self.randomName(),
1111             "given-name": self.randomName(),
1112             "job-title": self.randomName(),
1113             "department": self.randomName(),
1114             "company": self.randomName(),
1115             "description": self.randomName(count=100),
1116             "createUserFn": self._create_user,
1117             "checkUserFn": self._check_user,
1118         }
1119         user.update(base)
1120         return user
1121
1122     def _randomPosixUser(self, base=None):
1123         """create a user with random attribute values and additional RFC2307
1124         attributes, you can specify base attributes"""
1125         if base is None:
1126             base = {}
1127         user = self._randomUser({})
1128         user.update(base)
1129         posixAttributes = {
1130             "uid": self.randomName(),
1131             "loginShell": self.randomName(),
1132             "gecos": self.randomName(),
1133             "uidNumber": self.randomXid(),
1134             "gidNumber": self.randomXid(),
1135             "createUserFn": self._create_posix_user,
1136             "checkUserFn": self._check_posix_user,
1137         }
1138         user.update(posixAttributes)
1139         user.update(base)
1140         return user
1141
1142     def _randomUnixUser(self, base=None):
1143         """create a user with random attribute values and additional RFC2307
1144         attributes, you can specify base attributes"""
1145         if base is None:
1146             base = {}
1147         user = self._randomUser({})
1148         user.update(base)
1149         posixAttributes = {
1150             "uidNumber": self.randomXid(),
1151             "gidNumber": self.randomXid(),
1152             "uid": self.randomName(),
1153             "loginShell": self.randomName(),
1154             "gecos": self.randomName(),
1155             "createUserFn": self._create_unix_user,
1156             "checkUserFn": self._check_unix_user,
1157         }
1158         user.update(posixAttributes)
1159         user.update(base)
1160         return user
1161
1162     def _check_user(self, user):
1163         """ check if a user from SamDB has the same attributes as its template """
1164         found = self._find_user(user["name"])
1165
1166         self.assertEqual("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
1167         self.assertEqual("%s" % found.get("title"), user["job-title"])
1168         self.assertEqual("%s" % found.get("company"), user["company"])
1169         self.assertEqual("%s" % found.get("description"), user["description"])
1170         self.assertEqual("%s" % found.get("department"), user["department"])
1171
1172     def _check_posix_user(self, user):
1173         """ check if a posix_user from SamDB has the same attributes as its template """
1174         found = self._find_user(user["name"])
1175
1176         self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
1177         self.assertEqual("%s" % found.get("gecos"), user["gecos"])
1178         self.assertEqual("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
1179         self.assertEqual("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
1180         self.assertEqual("%s" % found.get("uid"), user["uid"])
1181         self._check_user(user)
1182
1183     def _check_unix_user(self, user):
1184         """ check if a unix_user from SamDB has the same attributes as its
1185 template """
1186         found = self._find_user(user["name"])
1187
1188         self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
1189         self.assertEqual("%s" % found.get("gecos"), user["gecos"])
1190         self.assertEqual("%s" % found.get("uidNumber"), "%s" %
1191                           user["uidNumber"])
1192         self.assertEqual("%s" % found.get("gidNumber"), "%s" %
1193                           user["gidNumber"])
1194         self.assertEqual("%s" % found.get("uid"), user["uid"])
1195         self.assertIn('/home/test/', "%s" % found.get("unixHomeDirectory"))
1196         self._check_user(user)
1197
1198     def _create_user(self, user):
1199         return self.runsubcmd("user", "add", user["name"], user["password"],
1200                               "--surname=%s" % user["surname"],
1201                               "--given-name=%s" % user["given-name"],
1202                               "--job-title=%s" % user["job-title"],
1203                               "--department=%s" % user["department"],
1204                               "--description=%s" % user["description"],
1205                               "--company=%s" % user["company"],
1206                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
1207                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1208
1209     def _create_posix_user(self, user):
1210         """ create a new user with RFC2307 attributes """
1211         return self.runsubcmd("user", "create", user["name"], user["password"],
1212                               "--surname=%s" % user["surname"],
1213                               "--given-name=%s" % user["given-name"],
1214                               "--job-title=%s" % user["job-title"],
1215                               "--department=%s" % user["department"],
1216                               "--description=%s" % user["description"],
1217                               "--company=%s" % user["company"],
1218                               "--gecos=%s" % user["gecos"],
1219                               "--login-shell=%s" % user["loginShell"],
1220                               "--uid=%s" % user["uid"],
1221                               "--uid-number=%s" % user["uidNumber"],
1222                               "--gid-number=%s" % user["gidNumber"],
1223                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
1224                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1225
1226     def _create_unix_user(self, user):
1227         """ Add RFC2307 attributes to a user"""
1228         self._create_user(user)
1229         return self.runsubcmd("user", "addunixattrs", user["name"],
1230                               "%s" % user["uidNumber"],
1231                               "--gid-number=%s" % user["gidNumber"],
1232                               "--gecos=%s" % user["gecos"],
1233                               "--login-shell=%s" % user["loginShell"],
1234                               "--uid=%s" % user["uid"],
1235                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
1236                               "-U%s%%%s" % (os.environ["DC_USERNAME"],
1237                                             os.environ["DC_PASSWORD"]))
1238
1239     def _find_user(self, name):
1240         search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
1241         userlist = self.samdb.search(base=self.samdb.domain_dn(),
1242                                      scope=ldb.SCOPE_SUBTREE,
1243                                      expression=search_filter)
1244         if userlist:
1245             return userlist[0]
1246         else:
1247             return None