s3:utils: let smbstatus report anonymous signing/encryption explicitly
[samba.git] / python / samba / tests / samba_tool / user.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import time
20 import base64
21 import ldb
22 from samba.tests.samba_tool.base import SambaToolCmdTest
23 from samba import (
24         credentials,
25         nttime2unix,
26         dsdb,
27         werror,
28         )
29 from samba.ndr import ndr_unpack
30 from samba.dcerpc import drsblobs
31 from samba.common import get_bytes
32 from samba.common import get_string
33 from samba.tests import env_loadparm
34
35
36 class UserCmdTestCase(SambaToolCmdTest):
37     """Tests for samba-tool user subcommands"""
38     users = []
39     samdb = None
40
41     def setUp(self):
42         super().setUp()
43         self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
44                                    "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
45
46         # Modify the default template homedir
47         lp = self.get_loadparm()
48         self.template_homedir = lp.get('template homedir')
49         lp.set('template homedir', '/home/test/%D/%U')
50
51         self.users = []
52         self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
53         self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
54         self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
55         self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
56         self.users.append(self._randomPosixUser({"name": "posixuser1"}))
57         self.users.append(self._randomPosixUser({"name": "posixuser2"}))
58         self.users.append(self._randomPosixUser({"name": "posixuser3"}))
59         self.users.append(self._randomPosixUser({"name": "posixuser4"}))
60         self.users.append(self._randomUnixUser({"name": "unixuser1"}))
61         self.users.append(self._randomUnixUser({"name": "unixuser2"}))
62         self.users.append(self._randomUnixUser({"name": "unixuser3"}))
63         self.users.append(self._randomUnixUser({"name": "unixuser4"}))
64
65         # Make sure users don't exist
66         for user in self.users:
67             if self._find_user(user["name"]):
68                 self.runsubcmd("user", "delete", user["name"])
69
70         # setup the 12 users and ensure they are correct
71         for user in self.users:
72             (result, out, err) = user["createUserFn"](user)
73
74             self.assertCmdSuccess(result, out, err)
75             self.assertEqual(err, "", "Shouldn't be any error messages")
76             if 'unix' in user["name"]:
77                 self.assertIn("Modified User '%s' successfully" % user["name"],
78                               out)
79             else:
80                 self.assertIn("User '%s' added successfully" % user["name"],
81                               out)
82
83             user["checkUserFn"](user)
84
85     def tearDown(self):
86         super().tearDown()
87         # clean up all the left over users, just in case
88         for user in self.users:
89             if self._find_user(user["name"]):
90                 self.runsubcmd("user", "delete", user["name"])
91         lp = env_loadparm()
92         # second run of this test
93         # the cache is still there and '--cache-ldb-initialize'
94         # will fail
95         cachedb = lp.private_path("user-syncpasswords-cache.ldb")
96         if os.path.exists(cachedb):
97             os.remove(cachedb)
98         lp.set('template homedir', self.template_homedir)
99
100     def test_newuser(self):
101         # try to add all the users again, this should fail
102         for user in self.users:
103             (result, out, err) = self._create_user(user)
104             self.assertCmdFail(result, "Ensure that create user fails")
105             self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
106
107         # try to delete all the 4 users we just added
108         for user in self.users:
109             (result, out, err) = self.runsubcmd("user", "delete", user["name"])
110             self.assertCmdSuccess(result, out, err, "Can we delete users")
111             found = self._find_user(user["name"])
112             self.assertIsNone(found)
113
114         # test adding users with --use-username-as-cn
115         for user in self.users:
116             (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
117                                                 "--use-username-as-cn",
118                                                 "--surname=%s" % user["surname"],
119                                                 "--given-name=%s" % user["given-name"],
120                                                 "--job-title=%s" % user["job-title"],
121                                                 "--department=%s" % user["department"],
122                                                 "--description=%s" % user["description"],
123                                                 "--company=%s" % user["company"],
124                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
125                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
126
127             self.assertCmdSuccess(result, out, err)
128             self.assertEqual(err, "", "Shouldn't be any error messages")
129             self.assertIn("User '%s' added successfully" % user["name"], out)
130
131             found = self._find_user(user["name"])
132
133             self.assertEqual("%s" % found.get("cn"), "%(name)s" % user)
134             self.assertEqual("%s" % found.get("name"), "%(name)s" % user)
135
136     def test_newuser_weak_password(self):
137         # Ensure that when we try to create a user over LDAP (thus no
138         # transactions) and the password is too weak, we do not get a
139         # half-created account.
140
141         def cleanup_user(username):
142             try:
143                 self.samdb.deleteuser(username)
144             except Exception as err:
145                 estr = err.args[0]
146                 if 'Unable to find user' not in estr:
147                     raise
148
149         server = os.environ['DC_SERVER']
150         dc_username = os.environ['DC_USERNAME']
151         dc_password = os.environ['DC_PASSWORD']
152
153         username = self.randomName()
154         password = 'a'
155
156         self.addCleanup(cleanup_user, username)
157
158         # Try to add the user and ensure it fails.
159         result, out, err = self.runsubcmd('user', 'add',
160                                           username, password,
161                                           '-H', f'ldap://{server}',
162                                           f'-U{dc_username}%{dc_password}')
163         self.assertCmdFail(result)
164         self.assertIn('Failed to add user', err)
165         self.assertIn('LDAP_CONSTRAINT_VIOLATION', err)
166         self.assertIn(f'{werror.WERR_PASSWORD_RESTRICTION:08X}', err)
167
168         # Now search for the user, and make sure we don't find anything.
169         res = self.samdb.search(self.samdb.domain_dn(),
170                                 expression=f'(sAMAccountName={username})',
171                                 scope=ldb.SCOPE_SUBTREE)
172         self.assertEqual(0, len(res), 'expected not to find the user')
173
174     def _verify_supplementalCredentials(self, ldif,
175                                         min_packages=3,
176                                         max_packages=6):
177         msgs = self.samdb.parse_ldif(ldif)
178         (changetype, obj) = next(msgs)
179
180         self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
181         sc_blob = obj["supplementalCredentials"][0]
182         sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
183
184         self.assertGreaterEqual(sc.sub.num_packages,
185                                 min_packages, "min_packages check")
186         self.assertLessEqual(sc.sub.num_packages,
187                              max_packages, "max_packages check")
188
189         if max_packages == 0:
190             return
191
192         def find_package(packages, name, start_idx=0):
193             for i in range(start_idx, len(packages)):
194                 if packages[i].name == name:
195                     return (i, packages[i])
196             return (None, None)
197
198         # The ordering is this
199         #
200         # Primary:Kerberos-Newer-Keys (optional)
201         # Primary:Kerberos
202         # Primary:WDigest
203         # Primary:CLEARTEXT (optional)
204         # Primary:SambaGPG (optional)
205         #
206         # And the 'Packages' package is insert before the last
207         # other package.
208
209         nidx = 0
210         (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
211         self.assertIsNotNone(pp, "Packages required")
212         self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
213                          "Packages needs to be at num_packages - 1")
214
215         (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
216                                     start_idx=nidx)
217         if knidx is not None:
218             self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
219             nidx = nidx + 1
220             if nidx == pidx:
221                 nidx = nidx + 1
222
223         (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
224                                   start_idx=nidx)
225         self.assertIsNotNone(pp, "Primary:Kerberos required")
226         self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
227         nidx = nidx + 1
228         if nidx == pidx:
229             nidx = nidx + 1
230
231         (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
232                                   start_idx=nidx)
233         self.assertIsNotNone(pp, "Primary:WDigest required")
234         self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
235         nidx = nidx + 1
236         if nidx == pidx:
237             nidx = nidx + 1
238
239         (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
240                                   start_idx=nidx)
241         if cidx is not None:
242             self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
243             nidx = nidx + 1
244             if nidx == pidx:
245                 nidx = nidx + 1
246
247         (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
248                                   start_idx=nidx)
249         if gidx is not None:
250             self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
251             nidx = nidx + 1
252             if nidx == pidx:
253                 nidx = nidx + 1
254
255         self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
256
257     def test_setpassword(self):
258         expect_nt_hash = bool(int(os.environ.get("EXPECT_NT_HASH", "1")))
259
260         for user in self.users:
261             newpasswd = self.random_password(16)
262             (result, out, err) = self.runsubcmd("user", "setpassword",
263                                                 user["name"],
264                                                 "--newpassword=%s" % newpasswd,
265                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
266                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
267             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
268             self.assertEqual(err, "", "setpassword with url")
269             self.assertMatch(out, "Changed password OK", "setpassword with url")
270
271         attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
272         (result, out, err) = self.runsubcmd("user", "syncpasswords",
273                                             "--cache-ldb-initialize",
274                                             "--attributes=%s" % attributes,
275                                             "--decrypt-samba-gpg")
276         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
277         self.assertEqual(err, "", "getpassword without url")
278         cache_attrs = {
279             "objectClass": {"value": "userSyncPasswords"},
280             "samdbUrl": {},
281             "dirsyncFilter": {},
282             "dirsyncAttribute": {},
283             "dirsyncControl": {"value": "dirsync:1:0:0"},
284             "passwordAttribute": {},
285             "decryptSambaGPG": {},
286             "currentTime": {},
287         }
288         for a in cache_attrs.keys():
289             v = cache_attrs[a].get("value", "")
290             self.assertMatch(out, "%s: %s" % (a, v),
291                              "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
292
293         (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
294         self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
295         self.assertEqual(err, "", "syncpasswords --no-wait")
296         self.assertMatch(out, "dirsync_loop(): results 0",
297                          "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
298         for user in self.users:
299             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
300                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
301
302         for user in self.users:
303             newpasswd = self.random_password(16)
304             creds = credentials.Credentials()
305             creds.set_anonymous()
306             creds.set_password(newpasswd)
307             unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
308             virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8')
309             virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8')
310
311             (result, out, err) = self.runsubcmd("user", "setpassword",
312                                                 user["name"],
313                                                 "--newpassword=%s" % newpasswd)
314             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
315             self.assertEqual(err, "", "setpassword without url")
316             self.assertMatch(out, "Changed password OK", "setpassword without url")
317
318             (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
319             self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
320             self.assertEqual(err, "", "syncpasswords --no-wait")
321             self.assertMatch(out, "dirsync_loop(): results 0",
322                              "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
323             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
324                              "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
325             self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
326                              "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
327             if expect_nt_hash or "virtualSambaGPG:: " in out:
328                 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
329                                  "getpassword unicodePwd: out[%s]" % out)
330             else:
331                 self.assertNotIn("unicodePwd:: %s" % unicodePwd, out)
332             self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
333                              "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
334             self.assertMatch(out, "supplementalCredentials:: ",
335                              "getpassword supplementalCredentials: out[%s]" % out)
336             if "virtualSambaGPG:: " in out:
337                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
338                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
339                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
340                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
341                 self.assertMatch(out, "virtualSSHA: ",
342                                  "getpassword virtualSSHA: out[%s]" % out)
343
344             (result, out, err) = self.runsubcmd("user", "getpassword",
345                                                 user["name"],
346                                                 "--attributes=%s" % attributes,
347                                                 "--decrypt-samba-gpg")
348             self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
349             self.assertEqual(err, "Any available password returned OK\n", "getpassword without url")
350             self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
351                              "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
352             if expect_nt_hash or "virtualSambaGPG:: " in out:
353                 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
354                                  "getpassword unicodePwd: out[%s]" % out)
355             else:
356                 self.assertNotIn("unicodePwd:: %s" % unicodePwd, out)
357             self.assertMatch(out, "supplementalCredentials:: ",
358                              "getpassword supplementalCredentials: out[%s]" % out)
359             self._verify_supplementalCredentials(out)
360             if "virtualSambaGPG:: " in out:
361                 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
362                                  "getpassword virtualClearTextUTF8: out[%s]" % out)
363                 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
364                                  "getpassword virtualClearTextUTF16: out[%s]" % out)
365                 self.assertMatch(out, "virtualSSHA: ",
366                                  "getpassword virtualSSHA: out[%s]" % out)
367
368         for user in self.users:
369             newpasswd = self.random_password(16)
370             (result, out, err) = self.runsubcmd("user", "setpassword",
371                                                 user["name"],
372                                                 "--newpassword=%s" % newpasswd,
373                                                 "--must-change-at-next-login",
374                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
375                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
376             self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
377             self.assertEqual(err, "", "setpassword with forced change")
378             self.assertMatch(out, "Changed password OK", "setpassword with forced change")
379
380     def test_setexpiry(self):
381         for user in self.users:
382             twodays = time.time() + (2 * 24 * 60 * 60)
383
384             (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
385                                                 "--days=2",
386                                                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
387                                                 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
388             self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
389             self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
390
391             found = self._find_user(user["name"])
392
393             expires = nttime2unix(int("%s" % found.get("accountExpires")))
394             self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
395
396         # TODO: re-enable this after the filter case is sorted out
397         if "filters are broken, bail now":
398             return
399
400         # now run the expiration based on a filter
401         fourdays = time.time() + (4 * 24 * 60 * 60)
402         (result, out, err) = self.runsubcmd("user", "setexpiry",
403                                             "--filter", "(&(objectClass=user)(company=comp2))",
404                                             "--days=4",
405                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
406                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
407         self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
408
409         for user in self.users:
410             found = self._find_user(user["name"])
411             if ("%s" % found.get("company")) == "comp2":
412                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
413                 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
414             else:
415                 expires = nttime2unix(int("%s" % found.get("accountExpires")))
416                 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
417
418     def test_list(self):
419         (result, out, err) = self.runsubcmd("user", "list",
420                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
421                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
422                                                           os.environ["DC_PASSWORD"]))
423         self.assertCmdSuccess(result, out, err, "Error running list")
424
425         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
426                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
427
428         userlist = self.samdb.search(base=self.samdb.domain_dn(),
429                                      scope=ldb.SCOPE_SUBTREE,
430                                      expression=search_filter,
431                                      attrs=["samaccountname"])
432
433         self.assertTrue(len(userlist) > 0, "no users found in samdb")
434
435         for userobj in userlist:
436             name = str(userobj.get("samaccountname", idx=0))
437             self.assertMatch(out, name,
438                              "user '%s' not found" % name)
439
440     # Test: samba-tool user list --locked-only
441     # This test does not verify that the command lists the locked user, it just
442     # tests that it does not list unlocked users. The funcional test, which
443     # lists locked users, is located in the 'samba4.ldap.password_lockout' test
444     # in source8/dsdb/tests/python/password_lockout.py
445     def test_list_locked(self):
446         (result, out, err) = self.runsubcmd("user", "list",
447                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
448                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
449                                                           os.environ["DC_PASSWORD"]),
450                                             "--locked-only")
451         self.assertCmdSuccess(result, out, err, "Error running list")
452
453         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
454                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
455
456         userlist = self.samdb.search(base=self.samdb.domain_dn(),
457                                      scope=ldb.SCOPE_SUBTREE,
458                                      expression=search_filter,
459                                      attrs=["samaccountname"])
460
461         for userobj in userlist:
462             name = str(userobj.get("samaccountname", idx=0))
463             self.assertNotIn(name, out,
464                              "user '%s' is incorrectly listed as locked" % name)
465
466     def test_list_base_dn(self):
467         base_dn = "CN=Users"
468         (result, out, err) = self.runsubcmd("user", "list", "-b", base_dn,
469                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
470                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
471                                                           os.environ["DC_PASSWORD"]))
472         self.assertCmdSuccess(result, out, err, "Error running list")
473
474         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
475                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
476
477         userlist = self.samdb.search(base=self.samdb.normalize_dn_in_domain(base_dn),
478                                      scope=ldb.SCOPE_SUBTREE,
479                                      expression=search_filter,
480                                      attrs=["samaccountname"])
481
482         self.assertTrue(len(userlist) > 0, "no users found in samdb")
483
484         for userobj in userlist:
485             name = str(userobj.get("samaccountname", idx=0))
486             self.assertMatch(out, name,
487                              "user '%s' not found" % name)
488
489     def test_list_full_dn(self):
490         (result, out, err) = self.runsubcmd("user", "list", "--full-dn",
491                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
492                                             "-U%s%%%s" % (os.environ["DC_USERNAME"],
493                                                           os.environ["DC_PASSWORD"]))
494         self.assertCmdSuccess(result, out, err, "Error running list")
495
496         search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
497                          (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
498
499         userlist = self.samdb.search(base=self.samdb.domain_dn(),
500                                      scope=ldb.SCOPE_SUBTREE,
501                                      expression=search_filter,
502                                      attrs=["dn"])
503
504         self.assertTrue(len(userlist) > 0, "no users found in samdb")
505
506         for userobj in userlist:
507             name = str(userobj.get("dn", idx=0))
508             self.assertMatch(out, name,
509                              "user '%s' not found" % name)
510
511     def test_list_hide_expired(self):
512         expire_username = "expireUser"
513         expire_user = self._randomUser({"name": expire_username})
514         self._create_user(expire_user)
515
516         (result, out, err) = self.runsubcmd(
517             "user",
518             "list",
519             "--hide-expired",
520             "-H",
521             "ldap://%s" % os.environ["DC_SERVER"],
522             "-U%s%%%s" % (os.environ["DC_USERNAME"],
523                           os.environ["DC_PASSWORD"]))
524         self.assertCmdSuccess(result, out, err, "Error running list")
525         self.assertTrue(expire_username in out,
526                         "user '%s' not found" % expire_username)
527
528         # user will be expired one second ago
529         self.samdb.setexpiry(
530             "(sAMAccountname=%s)" % expire_username,
531             -1,
532             False)
533
534         (result, out, err) = self.runsubcmd(
535             "user",
536             "list",
537             "--hide-expired",
538             "-H",
539             "ldap://%s" % os.environ["DC_SERVER"],
540             "-U%s%%%s" % (os.environ["DC_USERNAME"],
541                           os.environ["DC_PASSWORD"]))
542         self.assertCmdSuccess(result, out, err, "Error running list")
543         self.assertFalse(expire_username in out,
544                          "user '%s' found" % expire_username)
545
546         self.samdb.deleteuser(expire_username)
547
548     def test_list_hide_disabled(self):
549         disable_username = "disableUser"
550         disable_user = self._randomUser({"name": disable_username})
551         self._create_user(disable_user)
552
553         (result, out, err) = self.runsubcmd(
554             "user",
555             "list",
556             "--hide-disabled",
557             "-H",
558             "ldap://%s" % os.environ["DC_SERVER"],
559             "-U%s%%%s" % (os.environ["DC_USERNAME"],
560                           os.environ["DC_PASSWORD"]))
561         self.assertCmdSuccess(result, out, err, "Error running list")
562         self.assertTrue(disable_username in out,
563                         "user '%s' not found" % disable_username)
564
565         self.samdb.disable_account("(sAMAccountname=%s)" % disable_username)
566
567         (result, out, err) = self.runsubcmd(
568             "user",
569             "list",
570             "--hide-disabled",
571             "-H",
572             "ldap://%s" % os.environ["DC_SERVER"],
573             "-U%s%%%s" % (os.environ["DC_USERNAME"],
574                           os.environ["DC_PASSWORD"]))
575         self.assertCmdSuccess(result, out, err, "Error running list")
576         self.assertFalse(disable_username in out,
577                          "user '%s' found" % disable_username)
578
579         self.samdb.deleteuser(disable_username)
580
581     def test_show(self):
582         for user in self.users:
583             (result, out, err) = self.runsubcmd(
584                 "user", "show", user["name"],
585                 "--attributes=sAMAccountName,company",
586                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
587                 "-U%s%%%s" % (os.environ["DC_USERNAME"],
588                               os.environ["DC_PASSWORD"]))
589             self.assertCmdSuccess(result, out, err, "Error running show")
590
591             expected_out = """dn: CN=%s %s,CN=Users,%s
592 company: %s
593 sAMAccountName: %s
594
595 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
596                 user["company"], user["name"])
597
598             self.assertEqual(out, expected_out,
599                              "Unexpected show output for user '%s'" %
600                              user["name"])
601
602             time_attrs = [
603                 "name", # test that invalid values are just ignored
604                 "whenCreated",
605                 "whenChanged",
606                 "accountExpires",
607                 "badPasswordTime",
608                 "lastLogoff",
609                 "lastLogon",
610                 "lastLogonTimestamp",
611                 "lockoutTime",
612                 "msDS-UserPasswordExpiryTimeComputed",
613                 "pwdLastSet",
614                 ]
615
616             attrs = []
617             for ta in time_attrs:
618                 attrs.append(ta)
619                 for fm in ["GeneralizedTime", "UnixTime", "TimeSpec"]:
620                     attrs.append("%s;format=%s" % (ta, fm))
621
622             (result, out, err) = self.runsubcmd(
623                 "user", "show", user["name"],
624                 "--attributes=%s" % ",".join(attrs),
625                 "-H", "ldap://%s" % os.environ["DC_SERVER"],
626                 "-U%s%%%s" % (os.environ["DC_USERNAME"],
627                               os.environ["DC_PASSWORD"]))
628             self.assertCmdSuccess(result, out, err,
629                                   "Error running show --attributes=%s"
630                                   % ",".join(attrs))
631
632             self.assertIn(";format=GeneralizedTime", out)
633             self.assertIn(";format=UnixTime", out)
634             self.assertIn(";format=TimeSpec", out)
635
636             self.assertIn("name: ", out)
637             self.assertNotIn("name;format=GeneralizedTime: ", out)
638             self.assertNotIn("name;format=UnixTime: ", out)
639             self.assertNotIn("name;format=TimeSpec: ", out)
640
641             self.assertIn("whenCreated: 20", out)
642             self.assertIn("whenCreated;format=GeneralizedTime: 20", out)
643             self.assertIn("whenCreated;format=UnixTime: 1", out)
644             self.assertIn("whenCreated;format=TimeSpec: 1", out)
645
646             self.assertIn("whenChanged: 20", out)
647             self.assertIn("whenChanged;format=GeneralizedTime: 20", out)
648             self.assertIn("whenChanged;format=UnixTime: 1", out)
649             self.assertIn("whenChanged;format=TimeSpec: 1", out)
650
651             self.assertIn("accountExpires: 9223372036854775807", out)
652             self.assertNotIn("accountExpires;format=GeneralizedTime: ", out)
653             self.assertNotIn("accountExpires;format=UnixTime: ", out)
654             self.assertNotIn("accountExpires;format=TimeSpec: ", out)
655
656             self.assertIn("badPasswordTime: 0", out)
657             self.assertNotIn("badPasswordTime;format=GeneralizedTime: ", out)
658             self.assertNotIn("badPasswordTime;format=UnixTime: ", out)
659             self.assertNotIn("badPasswordTime;format=TimeSpec: ", out)
660
661             self.assertIn("lastLogoff: 0", out)
662             self.assertNotIn("lastLogoff;format=GeneralizedTime: ", out)
663             self.assertNotIn("lastLogoff;format=UnixTime: ", out)
664             self.assertNotIn("lastLogoff;format=TimeSpec: ", out)
665
666             self.assertIn("lastLogon: 0", out)
667             self.assertNotIn("lastLogon;format=GeneralizedTime: ", out)
668             self.assertNotIn("lastLogon;format=UnixTime: ", out)
669             self.assertNotIn("lastLogon;format=TimeSpec: ", out)
670
671             # If a specified attribute is not available on a user object
672             # it's silently omitted.
673             self.assertNotIn("lastLogonTimestamp:", out)
674             self.assertNotIn("lockoutTime:", out)
675
676             self.assertIn("msDS-UserPasswordExpiryTimeComputed: 1", out)
677             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime: 20", out)
678             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime: 1", out)
679             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec: 1", out)
680
681             self.assertIn("pwdLastSet: 1", out)
682             self.assertIn("pwdLastSet;format=GeneralizedTime: 20", out)
683             self.assertIn("pwdLastSet;format=UnixTime: 1", out)
684             self.assertIn("pwdLastSet;format=TimeSpec: 1", out)
685
686             out_msgs = self.samdb.parse_ldif(out)
687             out_msg = next(out_msgs)[1]
688
689             self.assertIn("whenCreated", out_msg)
690             when_created_str = str(out_msg["whenCreated"][0])
691             self.assertIn("whenCreated;format=GeneralizedTime", out_msg)
692             self.assertEqual(str(out_msg["whenCreated;format=GeneralizedTime"][0]), when_created_str)
693             when_created_time = ldb.string_to_time(when_created_str)
694             self.assertIn("whenCreated;format=UnixTime", out_msg)
695             self.assertEqual(str(out_msg["whenCreated;format=UnixTime"][0]), str(when_created_time))
696             self.assertIn("whenCreated;format=TimeSpec", out_msg)
697             self.assertEqual(str(out_msg["whenCreated;format=TimeSpec"][0]),
698                              "%d.000000000" % (when_created_time))
699
700             self.assertIn("whenChanged", out_msg)
701             when_changed_str = str(out_msg["whenChanged"][0])
702             self.assertIn("whenChanged;format=GeneralizedTime", out_msg)
703             self.assertEqual(str(out_msg["whenChanged;format=GeneralizedTime"][0]), when_changed_str)
704             when_changed_time = ldb.string_to_time(when_changed_str)
705             self.assertIn("whenChanged;format=UnixTime", out_msg)
706             self.assertEqual(str(out_msg["whenChanged;format=UnixTime"][0]), str(when_changed_time))
707             self.assertIn("whenChanged;format=TimeSpec", out_msg)
708             self.assertEqual(str(out_msg["whenChanged;format=TimeSpec"][0]),
709                              "%d.000000000" % (when_changed_time))
710
711             self.assertIn("pwdLastSet;format=GeneralizedTime", out_msg)
712             pwd_last_set_str = str(out_msg["pwdLastSet;format=GeneralizedTime"][0])
713             pwd_last_set_time = ldb.string_to_time(pwd_last_set_str)
714             self.assertIn("pwdLastSet;format=UnixTime", out_msg)
715             self.assertEqual(str(out_msg["pwdLastSet;format=UnixTime"][0]), str(pwd_last_set_time))
716             self.assertIn("pwdLastSet;format=TimeSpec", out_msg)
717             self.assertIn("%d." % pwd_last_set_time, str(out_msg["pwdLastSet;format=TimeSpec"][0]))
718             self.assertNotIn(".000000000", str(out_msg["pwdLastSet;format=TimeSpec"][0]))
719
720             # assert that the pwd has been set in the minute after user creation
721             self.assertGreaterEqual(pwd_last_set_time, when_created_time)
722             self.assertLess(pwd_last_set_time, when_created_time + 60)
723
724             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime", out_msg)
725             pwd_expires_str = str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime"][0])
726             pwd_expires_time = ldb.string_to_time(pwd_expires_str)
727             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime", out_msg)
728             self.assertEqual(str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=UnixTime"][0]), str(pwd_expires_time))
729             self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec", out_msg)
730             self.assertIn("%d." % pwd_expires_time, str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
731             self.assertNotIn(".000000000", str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
732
733             # assert that the pwd expires after it was set
734             self.assertGreater(pwd_expires_time, pwd_last_set_time)
735
736     def test_move(self):
737         full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest_usr"))
738         self.addCleanup(self.samdb.delete, full_ou_dn, ["tree_delete:1"])
739
740         (result, out, err) = self.runsubcmd("ou", "add", full_ou_dn)
741         self.assertCmdSuccess(result, out, err)
742         self.assertEqual(err, "", "There shouldn't be any error message")
743         self.assertIn('Added ou "%s"' % full_ou_dn, out)
744
745         for user in self.users:
746             (result, out, err) = self.runsubcmd(
747                 "user", "move", user["name"], full_ou_dn)
748             self.assertCmdSuccess(result, out, err, "Error running move")
749             self.assertIn('Moved user "%s" into "%s"' %
750                           (user["name"], full_ou_dn), out)
751
752         # Should fail as users objects are in OU
753         (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
754         self.assertCmdFail(result)
755         self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
756                        "(it has %d children)!") % len(self.users), err)
757
758         for user in self.users:
759             new_dn = "CN=Users,%s" % self.samdb.domain_dn()
760             (result, out, err) = self.runsubcmd(
761                 "user", "move", user["name"], new_dn)
762             self.assertCmdSuccess(result, out, err, "Error running move")
763             self.assertIn('Moved user "%s" into "%s"' %
764                           (user["name"], new_dn), out)
765
766     def test_rename_surname_initials_givenname(self):
767         """rename the existing surname and given name and add missing
768         initials, then remove them, for all users"""
769         for user in self.users:
770             new_givenname = "new_given_name_of_" + user["name"]
771             new_initials = "A"
772             new_surname = "new_surname_of_" + user["name"]
773             found = self._find_user(user["name"])
774             old_cn = str(found.get("cn"))
775
776             # rename given name, initials and surname
777             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
778                                                 "--surname=%s" % new_surname,
779                                                 "--initials=%s" % new_initials,
780                                                 "--given-name=%s" % new_givenname)
781             self.assertCmdSuccess(result, out, err)
782             self.assertEqual(err, "", "Shouldn't be any error messages")
783             self.assertIn('successfully', out)
784
785             found = self._find_user(user["name"])
786             self.assertEqual("%s" % found.get("givenName"), new_givenname)
787             self.assertEqual("%s" % found.get("initials"), new_initials)
788             self.assertEqual("%s" % found.get("sn"), new_surname)
789             self.assertEqual("%s" % found.get("name"),
790                              "%s %s. %s" % (new_givenname, new_initials, new_surname))
791             self.assertEqual("%s" % found.get("cn"),
792                              "%s %s. %s" % (new_givenname, new_initials, new_surname))
793
794             # remove given name, initials and surname
795             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
796                                                 "--surname=",
797                                                 "--initials=",
798                                                 "--given-name=")
799             self.assertCmdSuccess(result, out, err)
800             self.assertEqual(err, "", "Shouldn't be any error messages")
801             self.assertIn('successfully', out)
802
803             found = self._find_user(user["name"])
804             self.assertEqual(found.get("givenName"), None)
805             self.assertEqual(found.get("initials"), None)
806             self.assertEqual(found.get("sn"), None)
807             self.assertEqual("%s" % found.get("cn"), user["name"])
808
809             # reset changes (initials are removed)
810             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
811                                                 "--surname=%(surname)s" % user,
812                                                 "--given-name=%(given-name)s" % user)
813             self.assertCmdSuccess(result, out, err)
814
815             if old_cn:
816                 (result, out, err) = self.runsubcmd("user", "rename", user["name"],
817                                                 "--force-new-cn=%s" % old_cn)
818
819     def test_rename_cn_samaccountname(self):
820         """rename and try to remove the cn and the samaccount of all users"""
821         for user in self.users:
822             new_cn = "new_cn_of_" + user["name"]
823             new_samaccountname = "new_samaccount_of_" + user["name"]
824             new_surname = "new_surname_of_" + user["name"]
825
826             # rename cn
827             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
828                                                 "--samaccountname=%s"
829                                                  % new_samaccountname,
830                                                 "--force-new-cn=%s" % new_cn)
831             self.assertCmdSuccess(result, out, err)
832             self.assertEqual(err, "", "Shouldn't be any error messages")
833             self.assertIn('successfully', out)
834
835             found = self._find_user(new_samaccountname)
836             self.assertEqual("%s" % found.get("cn"), new_cn)
837             self.assertEqual("%s" % found.get("sAMAccountName"),
838                              new_samaccountname)
839
840             # changing the surname has no effect to the cn
841             (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
842                                                 "--surname=%s" % new_surname)
843             self.assertCmdSuccess(result, out, err)
844
845             found = self._find_user(new_samaccountname)
846             self.assertEqual("%s" % found.get("cn"), new_cn)
847
848             # trying to remove cn (throws an error)
849             (result, out, err) = self.runsubcmd("user", "rename",
850                                                 new_samaccountname,
851                                                 "--force-new-cn=")
852             self.assertCmdFail(result)
853             self.assertIn('Failed to rename user', err)
854             self.assertIn("delete protected attribute", err)
855
856             # trying to remove the samccountname (throws an error)
857             (result, out, err) = self.runsubcmd("user", "rename",
858                                                 new_samaccountname,
859                                                 "--samaccountname=")
860             self.assertCmdFail(result)
861             self.assertIn('Failed to rename user', err)
862             self.assertIn('delete protected attribute', err)
863
864             # reset changes (cn must be the name)
865             (result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
866                                                 "--samaccountname=%(name)s"
867                                                   % user,
868                                                 "--force-new-cn=%(name)s" % user)
869             self.assertCmdSuccess(result, out, err)
870
871     def test_rename_standard_cn(self):
872         """reset the cn of all users to the standard"""
873         for user in self.users:
874             new_cn = "new_cn_of_" + user["name"]
875             new_givenname = "new_given_name_of_" + user["name"]
876             new_initials = "A"
877             new_surname = "new_surname_of_" + user["name"]
878
879             # set different cn
880             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
881                                                 "--force-new-cn=%s" % new_cn)
882             self.assertCmdSuccess(result, out, err)
883
884             # remove given name, initials and surname
885             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
886                                                 "--surname=",
887                                                 "--initials=",
888                                                 "--given-name=")
889             self.assertCmdSuccess(result, out, err)
890
891             # reset the CN (no given name, initials or surname --> samaccountname)
892             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
893                                                 "--reset-cn")
894
895             self.assertCmdSuccess(result, out, err)
896             self.assertEqual(err, "", "Shouldn't be any error messages")
897             self.assertIn('successfully', out)
898
899             found = self._find_user(user["name"])
900             self.assertEqual("%s" % found.get("cn"), user["name"])
901
902             # set given name, initials and surname and set different cn
903             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
904                                                 "--force-new-cn=%s" % new_cn,
905                                                 "--surname=%s" % new_surname,
906                                                 "--initials=%s" % new_initials,
907                                                 "--given-name=%s" % new_givenname)
908             self.assertCmdSuccess(result, out, err)
909
910             # reset the CN (given name, initials or surname are given --> given name)
911             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
912                                                 "--reset-cn")
913
914             self.assertCmdSuccess(result, out, err)
915             self.assertEqual(err, "", "Shouldn't be any error messages")
916             self.assertIn('successfully', out)
917
918             found = self._find_user(user["name"])
919             self.assertEqual("%s" % found.get("cn"),
920                              "%s %s. %s" % (new_givenname, new_initials, new_surname))
921
922             # reset changes
923             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
924                                                 "--reset-cn",
925                                                 "--initials=",
926                                                 "--surname=%(surname)s" % user,
927                                                 "--given-name=%(given-name)s" % user)
928             self.assertCmdSuccess(result, out, err)
929
930     def test_rename_mailaddress_displayname(self):
931         for user in self.users:
932             new_mail = "new_mailaddress_of_" + user["name"]
933             new_displayname = "new displayname of " + user["name"]
934
935             # change mail and displayname
936             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
937                                                 "--mail-address=%s"
938                                                   % new_mail,
939                                                 "--display-name=%s"
940                                                   % new_displayname)
941             self.assertCmdSuccess(result, out, err)
942             self.assertEqual(err, "", "Shouldn't be any error messages")
943             self.assertIn('successfully', out)
944
945             found = self._find_user(user["name"])
946             self.assertEqual("%s" % found.get("mail"), new_mail)
947             self.assertEqual("%s" % found.get("displayName"), new_displayname)
948
949             # remove mail and displayname
950             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
951                                                 "--mail-address=",
952                                                 "--display-name=")
953             self.assertCmdSuccess(result, out, err)
954             self.assertEqual(err, "", "Shouldn't be any error messages")
955             self.assertIn('successfully', out)
956
957             found = self._find_user(user["name"])
958             self.assertEqual(found.get("mail"), None)
959             self.assertEqual(found.get("displayName"), None)
960
961     def test_rename_upn(self):
962         """rename upn of all users"""
963         for user in self.users:
964             found = self._find_user(user["name"])
965             old_upn = "%s" % found.get("userPrincipalName")
966             valid_suffix = old_upn.split('@')[1]   # samba.example.com
967
968             valid_new_upn = "new_%s@%s" % (user["name"], valid_suffix)
969             invalid_new_upn = "%s@invalid.suffix" + user["name"]
970
971             # trying to set invalid upn
972             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
973                                                 "--upn=%s"
974                                                   % invalid_new_upn)
975             self.assertCmdFail(result)
976             self.assertIn('is not a valid upn', err)
977
978             # set valid upn
979             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
980                                                 "--upn=%s"
981                                                   % valid_new_upn)
982             self.assertCmdSuccess(result, out, err)
983             self.assertEqual(err, "", "Shouldn't be any error messages")
984             self.assertIn('successfully', out)
985
986             found = self._find_user(user["name"])
987             self.assertEqual("%s" % found.get("userPrincipalName"), valid_new_upn)
988
989             # trying to remove upn
990             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
991                                                 "--upn=%s")
992             self.assertCmdFail(result)
993             self.assertIn('is not a valid upn', err)
994
995             # reset upn
996             (result, out, err) = self.runsubcmd("user", "rename", user["name"],
997                                                 "--upn=%s" % old_upn)
998             self.assertCmdSuccess(result, out, err)
999
1000     def test_getpwent(self):
1001         try:
1002             import pwd
1003         except ImportError:
1004             self.skipTest("Skipping getpwent test, no 'pwd' module available")
1005             return
1006
1007         # get the current user's data for the test
1008         uid = os.geteuid()
1009         try:
1010             u = pwd.getpwuid(uid)
1011         except KeyError:
1012             self.skipTest("Skipping getpwent test, current EUID not found in NSS")
1013             return
1014
1015
1016 # samba-tool user create command didn't support users with empty gecos if none is
1017 # specified on the command line and the user hasn't one in the passwd file it
1018 # will fail, so let's add some contents
1019
1020         gecos = u[4]
1021         if (gecos is None or len(gecos) == 0):
1022             gecos = "Foo GECOS"
1023         user = self._randomPosixUser({
1024                         "name": u[0],
1025                         "uid": u[0],
1026                         "uidNumber": u[2],
1027                         "gidNumber": u[3],
1028                         "gecos": gecos,
1029                         "loginShell": u[6],
1030                         })
1031
1032         # Remove user if it already exists
1033         if self._find_user(u[0]):
1034             self.runsubcmd("user", "delete", u[0])
1035         # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
1036         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
1037                                             "--surname=%s" % user["surname"],
1038                                             "--given-name=%s" % user["given-name"],
1039                                             "--job-title=%s" % user["job-title"],
1040                                             "--department=%s" % user["department"],
1041                                             "--description=%s" % user["description"],
1042                                             "--company=%s" % user["company"],
1043                                             "--gecos=%s" % user["gecos"],
1044                                             "--rfc2307-from-nss",
1045                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
1046                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1047
1048         self.assertCmdSuccess(result, out, err)
1049         self.assertEqual(err, "", "Shouldn't be any error messages")
1050         self.assertIn("User '%s' added successfully" % user["name"], out)
1051
1052         self._check_posix_user(user)
1053         self.runsubcmd("user", "delete", user["name"])
1054
1055         # Check if overriding the attributes from NSS with explicit values works
1056         #
1057         # get a user with all random posix attributes
1058         user = self._randomPosixUser({"name": u[0]})
1059
1060         # Remove user if it already exists
1061         if self._find_user(u[0]):
1062             self.runsubcmd("user", "delete", u[0])
1063         # create a user with posix attributes from nss but override all of them with the
1064         # random ones just obtained
1065         (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
1066                                             "--surname=%s" % user["surname"],
1067                                             "--given-name=%s" % user["given-name"],
1068                                             "--job-title=%s" % user["job-title"],
1069                                             "--department=%s" % user["department"],
1070                                             "--description=%s" % user["description"],
1071                                             "--company=%s" % user["company"],
1072                                             "--rfc2307-from-nss",
1073                                             "--gecos=%s" % user["gecos"],
1074                                             "--login-shell=%s" % user["loginShell"],
1075                                             "--uid=%s" % user["uid"],
1076                                             "--uid-number=%s" % user["uidNumber"],
1077                                             "--gid-number=%s" % user["gidNumber"],
1078                                             "-H", "ldap://%s" % os.environ["DC_SERVER"],
1079                                             "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1080
1081         self.assertCmdSuccess(result, out, err)
1082         self.assertEqual(err, "", "Shouldn't be any error messages")
1083         self.assertIn("User '%s' added successfully" % user["name"], out)
1084
1085         self._check_posix_user(user)
1086         self.runsubcmd("user", "delete", user["name"])
1087
1088     # Test: samba-tool user unlock
1089     # This test does not verify that the command unlocks the user, it just
1090     # tests the command itself. The unlock test, which unlocks locked users,
1091     # is located in the 'samba4.ldap.password_lockout' test in
1092     # source4/dsdb/tests/python/password_lockout.py
1093     def test_unlock(self):
1094
1095         # try to unlock a nonexistent user, this should fail
1096         nonexistentusername = "userdoesnotexist"
1097         (result, out, err) = self.runsubcmd(
1098             "user", "unlock", nonexistentusername)
1099         self.assertCmdFail(result, "Ensure that unlock nonexistent user fails")
1100         self.assertIn("Failed to unlock user '%s'" % nonexistentusername, err)
1101         self.assertIn("Unable to find user", err)
1102
1103         # try to unlock with insufficient permissions, this should fail
1104         unprivileged_username = "unprivilegedunlockuser"
1105         unlocktest_username = "usertounlock"
1106
1107         self.runsubcmd("user", "add", unprivileged_username, "Passw0rd")
1108         self.runsubcmd("user", "add", unlocktest_username, "Passw0rd")
1109
1110         (result, out, err) = self.runsubcmd(
1111             "user", "unlock", unlocktest_username,
1112             "-H", "ldap://%s" % os.environ["DC_SERVER"],
1113             "-U%s%%%s" % (unprivileged_username,
1114                           "Passw0rd"))
1115         self.assertCmdFail(result, "Fail with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
1116         self.assertIn("Failed to unlock user '%s'" % unlocktest_username, err)
1117         self.assertIn("LDAP error 50 LDAP_INSUFFICIENT_ACCESS_RIGHTS", err)
1118
1119         self.runsubcmd("user", "delete", unprivileged_username)
1120         self.runsubcmd("user", "delete", unlocktest_username)
1121
1122         # run unlock against test users
1123         for user in self.users:
1124             (result, out, err) = self.runsubcmd(
1125                 "user", "unlock", user["name"])
1126             self.assertCmdSuccess(result, out, err, "Error running user unlock")
1127             self.assertEqual(err, "", "Shouldn't be any error messages")
1128
1129     def _randomUser(self, base=None):
1130         """create a user with random attribute values, you can specify base attributes"""
1131         if base is None:
1132             base = {}
1133         user = {
1134             "name": self.randomName(),
1135             "password": self.random_password(16),
1136             "surname": self.randomName(),
1137             "given-name": self.randomName(),
1138             "job-title": self.randomName(),
1139             "department": self.randomName(),
1140             "company": self.randomName(),
1141             "description": self.randomName(count=100),
1142             "createUserFn": self._create_user,
1143             "checkUserFn": self._check_user,
1144         }
1145         user.update(base)
1146         return user
1147
1148     def _randomPosixUser(self, base=None):
1149         """create a user with random attribute values and additional RFC2307
1150         attributes, you can specify base attributes"""
1151         if base is None:
1152             base = {}
1153         user = self._randomUser({})
1154         user.update(base)
1155         posixAttributes = {
1156             "uid": self.randomName(),
1157             "loginShell": self.randomName(),
1158             "gecos": self.randomName(),
1159             "uidNumber": self.randomXid(),
1160             "gidNumber": self.randomXid(),
1161             "createUserFn": self._create_posix_user,
1162             "checkUserFn": self._check_posix_user,
1163         }
1164         user.update(posixAttributes)
1165         user.update(base)
1166         return user
1167
1168     def _randomUnixUser(self, base=None):
1169         """create a user with random attribute values and additional RFC2307
1170         attributes, you can specify base attributes"""
1171         if base is None:
1172             base = {}
1173         user = self._randomUser({})
1174         user.update(base)
1175         posixAttributes = {
1176             "uidNumber": self.randomXid(),
1177             "gidNumber": self.randomXid(),
1178             "uid": self.randomName(),
1179             "loginShell": self.randomName(),
1180             "gecos": self.randomName(),
1181             "createUserFn": self._create_unix_user,
1182             "checkUserFn": self._check_unix_user,
1183         }
1184         user.update(posixAttributes)
1185         user.update(base)
1186         return user
1187
1188     def _check_user(self, user):
1189         """ check if a user from SamDB has the same attributes as its template """
1190         found = self._find_user(user["name"])
1191
1192         self.assertEqual("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
1193         self.assertEqual("%s" % found.get("title"), user["job-title"])
1194         self.assertEqual("%s" % found.get("company"), user["company"])
1195         self.assertEqual("%s" % found.get("description"), user["description"])
1196         self.assertEqual("%s" % found.get("department"), user["department"])
1197
1198     def _check_posix_user(self, user):
1199         """ check if a posix_user from SamDB has the same attributes as its template """
1200         found = self._find_user(user["name"])
1201
1202         self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
1203         self.assertEqual("%s" % found.get("gecos"), user["gecos"])
1204         self.assertEqual("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
1205         self.assertEqual("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
1206         self.assertEqual("%s" % found.get("uid"), user["uid"])
1207         self._check_user(user)
1208
1209     def _check_unix_user(self, user):
1210         """ check if a unix_user from SamDB has the same attributes as its
1211 template """
1212         found = self._find_user(user["name"])
1213
1214         self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
1215         self.assertEqual("%s" % found.get("gecos"), user["gecos"])
1216         self.assertEqual("%s" % found.get("uidNumber"), "%s" %
1217                           user["uidNumber"])
1218         self.assertEqual("%s" % found.get("gidNumber"), "%s" %
1219                           user["gidNumber"])
1220         self.assertEqual("%s" % found.get("uid"), user["uid"])
1221         self.assertIn('/home/test/', "%s" % found.get("unixHomeDirectory"))
1222         self._check_user(user)
1223
1224     def _create_user(self, user):
1225         return self.runsubcmd("user", "add", user["name"], user["password"],
1226                               "--surname=%s" % user["surname"],
1227                               "--given-name=%s" % user["given-name"],
1228                               "--job-title=%s" % user["job-title"],
1229                               "--department=%s" % user["department"],
1230                               "--description=%s" % user["description"],
1231                               "--company=%s" % user["company"],
1232                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
1233                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1234
1235     def _create_posix_user(self, user):
1236         """ create a new user with RFC2307 attributes """
1237         return self.runsubcmd("user", "create", user["name"], user["password"],
1238                               "--surname=%s" % user["surname"],
1239                               "--given-name=%s" % user["given-name"],
1240                               "--job-title=%s" % user["job-title"],
1241                               "--department=%s" % user["department"],
1242                               "--description=%s" % user["description"],
1243                               "--company=%s" % user["company"],
1244                               "--gecos=%s" % user["gecos"],
1245                               "--login-shell=%s" % user["loginShell"],
1246                               "--uid=%s" % user["uid"],
1247                               "--uid-number=%s" % user["uidNumber"],
1248                               "--gid-number=%s" % user["gidNumber"],
1249                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
1250                               "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
1251
1252     def _create_unix_user(self, user):
1253         """ Add RFC2307 attributes to a user"""
1254         self._create_user(user)
1255         return self.runsubcmd("user", "addunixattrs", user["name"],
1256                               "%s" % user["uidNumber"],
1257                               "--gid-number=%s" % user["gidNumber"],
1258                               "--gecos=%s" % user["gecos"],
1259                               "--login-shell=%s" % user["loginShell"],
1260                               "--uid=%s" % user["uid"],
1261                               "-H", "ldap://%s" % os.environ["DC_SERVER"],
1262                               "-U%s%%%s" % (os.environ["DC_USERNAME"],
1263                                             os.environ["DC_PASSWORD"]))
1264
1265     def _find_user(self, name):
1266         search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
1267         userlist = self.samdb.search(base=self.samdb.domain_dn(),
1268                                      scope=ldb.SCOPE_SUBTREE,
1269                                      expression=search_filter)
1270         if userlist:
1271             return userlist[0]
1272         else:
1273             return None