1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from samba.tests.samba_tool.base import SambaToolCmdTest
28 from samba.ndr import ndr_unpack
29 from samba.dcerpc import drsblobs
30 from samba.compat import get_bytes
31 from samba.compat import get_string
32 from samba.tests import env_loadparm
35 class UserCmdTestCase(SambaToolCmdTest):
36 """Tests for samba-tool user subcommands"""
41 super(UserCmdTestCase, self).setUp()
42 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
43 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
45 self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
46 self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
47 self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
48 self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
49 self.users.append(self._randomPosixUser({"name": "posixuser1"}))
50 self.users.append(self._randomPosixUser({"name": "posixuser2"}))
51 self.users.append(self._randomPosixUser({"name": "posixuser3"}))
52 self.users.append(self._randomPosixUser({"name": "posixuser4"}))
54 # setup the 8 users and ensure they are correct
55 for user in self.users:
56 (result, out, err) = user["createUserFn"](user)
58 self.assertCmdSuccess(result, out, err)
59 self.assertEquals(err, "", "Shouldn't be any error messages")
60 self.assertIn("User '%s' created successfully" % user["name"], out)
62 user["checkUserFn"](user)
65 super(UserCmdTestCase, self).tearDown()
66 # clean up all the left over users, just in case
67 for user in self.users:
68 if self._find_user(user["name"]):
69 self.runsubcmd("user", "delete", user["name"])
71 # second run of this test
72 # the cache is still there and '--cache-ldb-initialize'
74 cachedb = lp.private_path("user-syncpasswords-cache.ldb")
75 if os.path.exists(cachedb):
78 def test_newuser(self):
79 # try to add all the users again, this should fail
80 for user in self.users:
81 (result, out, err) = self._create_user(user)
82 self.assertCmdFail(result, "Ensure that create user fails")
83 self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
85 # try to delete all the 4 users we just added
86 for user in self.users:
87 (result, out, err) = self.runsubcmd("user", "delete", user["name"])
88 self.assertCmdSuccess(result, out, err, "Can we delete users")
89 found = self._find_user(user["name"])
90 self.assertIsNone(found)
92 # test adding users with --use-username-as-cn
93 for user in self.users:
94 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
95 "--use-username-as-cn",
96 "--surname=%s" % user["surname"],
97 "--given-name=%s" % user["given-name"],
98 "--job-title=%s" % user["job-title"],
99 "--department=%s" % user["department"],
100 "--description=%s" % user["description"],
101 "--company=%s" % user["company"],
102 "-H", "ldap://%s" % os.environ["DC_SERVER"],
103 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
105 self.assertCmdSuccess(result, out, err)
106 self.assertEquals(err, "", "Shouldn't be any error messages")
107 self.assertIn("User '%s' created successfully" % user["name"], out)
109 found = self._find_user(user["name"])
111 self.assertEquals("%s" % found.get("cn"), "%(name)s" % user)
112 self.assertEquals("%s" % found.get("name"), "%(name)s" % user)
114 def _verify_supplementalCredentials(self, ldif,
117 msgs = self.samdb.parse_ldif(ldif)
118 (changetype, obj) = next(msgs)
120 self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
121 sc_blob = obj["supplementalCredentials"][0]
122 sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
124 self.assertGreaterEqual(sc.sub.num_packages,
125 min_packages, "min_packages check")
126 self.assertLessEqual(sc.sub.num_packages,
127 max_packages, "max_packages check")
129 if max_packages == 0:
132 def find_package(packages, name, start_idx=0):
133 for i in range(start_idx, len(packages)):
134 if packages[i].name == name:
135 return (i, packages[i])
138 # The ordering is this
140 # Primary:Kerberos-Newer-Keys (optional)
143 # Primary:CLEARTEXT (optional)
144 # Primary:SambaGPG (optional)
146 # And the 'Packages' package is insert before the last
150 (pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
151 self.assertIsNotNone(pp, "Packages required")
152 self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
153 "Packages needs to be at num_packages - 1")
155 (knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
157 if knidx is not None:
158 self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
163 (kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
165 self.assertIsNotNone(pp, "Primary:Kerberos required")
166 self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
171 (widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
173 self.assertIsNotNone(pp, "Primary:WDigest required")
174 self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
179 (cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
182 self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
187 (gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
190 self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
195 self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
197 def test_setpassword(self):
198 for user in self.users:
199 newpasswd = self.random_password(16)
200 (result, out, err) = self.runsubcmd("user", "setpassword",
202 "--newpassword=%s" % newpasswd,
203 "-H", "ldap://%s" % os.environ["DC_SERVER"],
204 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
205 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
206 self.assertEquals(err, "", "setpassword with url")
207 self.assertMatch(out, "Changed password OK", "setpassword with url")
209 attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
210 (result, out, err) = self.runsubcmd("user", "syncpasswords",
211 "--cache-ldb-initialize",
212 "--attributes=%s" % attributes,
213 "--decrypt-samba-gpg")
214 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
215 self.assertEqual(err, "", "getpassword without url")
217 "objectClass": {"value": "userSyncPasswords"},
220 "dirsyncAttribute": {},
221 "dirsyncControl": {"value": "dirsync:1:0:0"},
222 "passwordAttribute": {},
223 "decryptSambaGPG": {},
226 for a in cache_attrs.keys():
227 v = cache_attrs[a].get("value", "")
228 self.assertMatch(out, "%s: %s" % (a, v),
229 "syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
231 (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
232 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
233 self.assertEqual(err, "", "syncpasswords --no-wait")
234 self.assertMatch(out, "dirsync_loop(): results 0",
235 "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
236 for user in self.users:
237 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
238 "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
240 for user in self.users:
241 newpasswd = self.random_password(16)
242 creds = credentials.Credentials()
243 creds.set_anonymous()
244 creds.set_password(newpasswd)
245 nthash = creds.get_nt_hash()
246 unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
247 virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8')
248 virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8')
250 (result, out, err) = self.runsubcmd("user", "setpassword",
252 "--newpassword=%s" % newpasswd)
253 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
254 self.assertEquals(err, "", "setpassword without url")
255 self.assertMatch(out, "Changed password OK", "setpassword without url")
257 (result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
258 self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
259 self.assertEqual(err, "", "syncpasswords --no-wait")
260 self.assertMatch(out, "dirsync_loop(): results 0",
261 "syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
262 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
263 "syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
264 self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
265 "getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
266 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
267 "getpassword unicodePwd: out[%s]" % out)
268 self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
269 "getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
270 self.assertMatch(out, "supplementalCredentials:: ",
271 "getpassword supplementalCredentials: out[%s]" % out)
272 if "virtualSambaGPG:: " in out:
273 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
274 "getpassword virtualClearTextUTF8: out[%s]" % out)
275 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
276 "getpassword virtualClearTextUTF16: out[%s]" % out)
277 self.assertMatch(out, "virtualSSHA: ",
278 "getpassword virtualSSHA: out[%s]" % out)
280 (result, out, err) = self.runsubcmd("user", "getpassword",
282 "--attributes=%s" % attributes,
283 "--decrypt-samba-gpg")
284 self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
285 self.assertEqual(err, "", "getpassword without url")
286 self.assertMatch(out, "Got password OK", "getpassword without url")
287 self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
288 "getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
289 self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
290 "getpassword unicodePwd: out[%s]" % out)
291 self.assertMatch(out, "supplementalCredentials:: ",
292 "getpassword supplementalCredentials: out[%s]" % out)
293 self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
294 if "virtualSambaGPG:: " in out:
295 self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
296 "getpassword virtualClearTextUTF8: out[%s]" % out)
297 self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
298 "getpassword virtualClearTextUTF16: out[%s]" % out)
299 self.assertMatch(out, "virtualSSHA: ",
300 "getpassword virtualSSHA: out[%s]" % out)
302 for user in self.users:
303 newpasswd = self.random_password(16)
304 (result, out, err) = self.runsubcmd("user", "setpassword",
306 "--newpassword=%s" % newpasswd,
307 "--must-change-at-next-login",
308 "-H", "ldap://%s" % os.environ["DC_SERVER"],
309 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
310 self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
311 self.assertEquals(err, "", "setpassword with forced change")
312 self.assertMatch(out, "Changed password OK", "setpassword with forced change")
314 def test_setexpiry(self):
315 for user in self.users:
316 twodays = time.time() + (2 * 24 * 60 * 60)
318 (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
320 "-H", "ldap://%s" % os.environ["DC_SERVER"],
321 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
322 self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
323 self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
325 found = self._find_user(user["name"])
327 expires = nttime2unix(int("%s" % found.get("accountExpires")))
328 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
330 # TODO: renable this after the filter case is sorted out
331 if "filters are broken, bail now":
334 # now run the expiration based on a filter
335 fourdays = time.time() + (4 * 24 * 60 * 60)
336 (result, out, err) = self.runsubcmd("user", "setexpiry",
337 "--filter", "(&(objectClass=user)(company=comp2))",
339 "-H", "ldap://%s" % os.environ["DC_SERVER"],
340 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
341 self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
343 for user in self.users:
344 found = self._find_user(user["name"])
345 if ("%s" % found.get("company")) == "comp2":
346 expires = nttime2unix(int("%s" % found.get("accountExpires")))
347 self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
349 expires = nttime2unix(int("%s" % found.get("accountExpires")))
350 self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
353 (result, out, err) = self.runsubcmd("user", "list",
354 "-H", "ldap://%s" % os.environ["DC_SERVER"],
355 "-U%s%%%s" % (os.environ["DC_USERNAME"],
356 os.environ["DC_PASSWORD"]))
357 self.assertCmdSuccess(result, out, err, "Error running list")
359 search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
360 (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
362 userlist = self.samdb.search(base=self.samdb.domain_dn(),
363 scope=ldb.SCOPE_SUBTREE,
364 expression=search_filter,
365 attrs=["samaccountname"])
367 self.assertTrue(len(userlist) > 0, "no users found in samdb")
369 for userobj in userlist:
370 name = str(userobj.get("samaccountname", idx=0))
371 found = self.assertMatch(out, name,
372 "user '%s' not found" % name)
375 for user in self.users:
376 (result, out, err) = self.runsubcmd(
377 "user", "show", user["name"],
378 "--attributes=sAMAccountName,company",
379 "-H", "ldap://%s" % os.environ["DC_SERVER"],
380 "-U%s%%%s" % (os.environ["DC_USERNAME"],
381 os.environ["DC_PASSWORD"]))
382 self.assertCmdSuccess(result, out, err, "Error running show")
384 expected_out = """dn: CN=%s %s,CN=Users,%s
388 """ % (user["given-name"], user["surname"], self.samdb.domain_dn(),
389 user["company"], user["name"])
391 self.assertEqual(out, expected_out,
392 "Unexpected show output for user '%s'" %
396 full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
397 (result, out, err) = self.runsubcmd("ou", "create", full_ou_dn)
398 self.assertCmdSuccess(result, out, err)
399 self.assertEquals(err, "", "There shouldn't be any error message")
400 self.assertIn('Created ou "%s"' % full_ou_dn, out)
402 for user in self.users:
403 (result, out, err) = self.runsubcmd(
404 "user", "move", user["name"], full_ou_dn)
405 self.assertCmdSuccess(result, out, err, "Error running move")
406 self.assertIn('Moved user "%s" into "%s"' %
407 (user["name"], full_ou_dn), out)
409 # Should fail as users objects are in OU
410 (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
411 self.assertCmdFail(result)
412 self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
413 "(it has %d children)!") % len(self.users), err)
415 for user in self.users:
416 new_dn = "CN=Users,%s" % self.samdb.domain_dn()
417 (result, out, err) = self.runsubcmd(
418 "user", "move", user["name"], new_dn)
419 self.assertCmdSuccess(result, out, err, "Error running move")
420 self.assertIn('Moved user "%s" into "%s"' %
421 (user["name"], new_dn), out)
423 (result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
424 self.assertCmdSuccess(result, out, err,
425 "Failed to delete ou '%s'" % full_ou_dn)
427 def test_getpwent(self):
431 self.skipTest("Skipping getpwent test, no 'pwd' module available")
434 # get the current user's data for the test
437 u = pwd.getpwuid(uid)
439 self.skipTest("Skipping getpwent test, current EUID not found in NSS")
443 # samba-tool user create command didn't support users with empty gecos if none is
444 # specified on the command line and the user hasn't one in the passwd file it
445 # will fail, so let's add some contents
448 if (gecos is None or len(gecos) == 0):
450 user = self._randomPosixUser({
458 # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
459 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
460 "--surname=%s" % user["surname"],
461 "--given-name=%s" % user["given-name"],
462 "--job-title=%s" % user["job-title"],
463 "--department=%s" % user["department"],
464 "--description=%s" % user["description"],
465 "--company=%s" % user["company"],
466 "--gecos=%s" % user["gecos"],
467 "--rfc2307-from-nss",
468 "-H", "ldap://%s" % os.environ["DC_SERVER"],
469 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
471 self.assertCmdSuccess(result, out, err)
472 self.assertEquals(err, "", "Shouldn't be any error messages")
473 self.assertIn("User '%s' created successfully" % user["name"], out)
475 self._check_posix_user(user)
476 self.runsubcmd("user", "delete", user["name"])
478 # Check if overriding the attributes from NSS with explicit values works
480 # get a user with all random posix attributes
481 user = self._randomPosixUser({"name": u[0]})
482 # create a user with posix attributes from nss but override all of them with the
483 # random ones just obtained
484 (result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
485 "--surname=%s" % user["surname"],
486 "--given-name=%s" % user["given-name"],
487 "--job-title=%s" % user["job-title"],
488 "--department=%s" % user["department"],
489 "--description=%s" % user["description"],
490 "--company=%s" % user["company"],
491 "--rfc2307-from-nss",
492 "--gecos=%s" % user["gecos"],
493 "--login-shell=%s" % user["loginShell"],
494 "--uid=%s" % user["uid"],
495 "--uid-number=%s" % user["uidNumber"],
496 "--gid-number=%s" % user["gidNumber"],
497 "-H", "ldap://%s" % os.environ["DC_SERVER"],
498 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
500 self.assertCmdSuccess(result, out, err)
501 self.assertEquals(err, "", "Shouldn't be any error messages")
502 self.assertIn("User '%s' created successfully" % user["name"], out)
504 self._check_posix_user(user)
505 self.runsubcmd("user", "delete", user["name"])
507 def _randomUser(self, base={}):
508 """create a user with random attribute values, you can specify base attributes"""
510 "name": self.randomName(),
511 "password": self.random_password(16),
512 "surname": self.randomName(),
513 "given-name": self.randomName(),
514 "job-title": self.randomName(),
515 "department": self.randomName(),
516 "company": self.randomName(),
517 "description": self.randomName(count=100),
518 "createUserFn": self._create_user,
519 "checkUserFn": self._check_user,
524 def _randomPosixUser(self, base={}):
525 """create a user with random attribute values and additional RFC2307
526 attributes, you can specify base attributes"""
527 user = self._randomUser({})
530 "uid": self.randomName(),
531 "loginShell": self.randomName(),
532 "gecos": self.randomName(),
533 "uidNumber": self.randomXid(),
534 "gidNumber": self.randomXid(),
535 "createUserFn": self._create_posix_user,
536 "checkUserFn": self._check_posix_user,
538 user.update(posixAttributes)
542 def _check_user(self, user):
543 """ check if a user from SamDB has the same attributes as its template """
544 found = self._find_user(user["name"])
546 self.assertEquals("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
547 self.assertEquals("%s" % found.get("title"), user["job-title"])
548 self.assertEquals("%s" % found.get("company"), user["company"])
549 self.assertEquals("%s" % found.get("description"), user["description"])
550 self.assertEquals("%s" % found.get("department"), user["department"])
552 def _check_posix_user(self, user):
553 """ check if a posix_user from SamDB has the same attributes as its template """
554 found = self._find_user(user["name"])
556 self.assertEquals("%s" % found.get("loginShell"), user["loginShell"])
557 self.assertEquals("%s" % found.get("gecos"), user["gecos"])
558 self.assertEquals("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
559 self.assertEquals("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
560 self.assertEquals("%s" % found.get("uid"), user["uid"])
561 self._check_user(user)
563 def _create_user(self, user):
564 return self.runsubcmd("user", "create", user["name"], user["password"],
565 "--surname=%s" % user["surname"],
566 "--given-name=%s" % user["given-name"],
567 "--job-title=%s" % user["job-title"],
568 "--department=%s" % user["department"],
569 "--description=%s" % user["description"],
570 "--company=%s" % user["company"],
571 "-H", "ldap://%s" % os.environ["DC_SERVER"],
572 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
574 def _create_posix_user(self, user):
575 """ create a new user with RFC2307 attributes """
576 return self.runsubcmd("user", "create", user["name"], user["password"],
577 "--surname=%s" % user["surname"],
578 "--given-name=%s" % user["given-name"],
579 "--job-title=%s" % user["job-title"],
580 "--department=%s" % user["department"],
581 "--description=%s" % user["description"],
582 "--company=%s" % user["company"],
583 "--gecos=%s" % user["gecos"],
584 "--login-shell=%s" % user["loginShell"],
585 "--uid=%s" % user["uid"],
586 "--uid-number=%s" % user["uidNumber"],
587 "--gid-number=%s" % user["gidNumber"],
588 "-H", "ldap://%s" % os.environ["DC_SERVER"],
589 "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
591 def _find_user(self, name):
592 search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
593 userlist = self.samdb.search(base=self.samdb.domain_dn(),
594 scope=ldb.SCOPE_SUBTREE,
595 expression=search_filter, attrs=[])