2 # -*- coding: utf-8 -*-
3 """Test communication of credentials etc, between an RODC and a RWDC.
5 How does it work when the password is changed on the RWDC?
16 sys.path.insert(0, "bin/python")
20 from samba.tests.subunitrun import SubunitOptions, TestProgram
21 import samba.getopt as options
23 from samba.auth import system_session
24 from samba.samdb import SamDB
25 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
26 from samba import gensec, dsdb
27 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
28 from samba.dcerpc import security, samr
30 import password_lockout_base
32 def passwd_encode(pw):
33 return base64.b64encode(('"%s"' % pw).encode('utf-16-le'))
36 class RodcRwdcTestException(Exception):
40 def make_creds(username, password, kerberos_state=None):
41 # use the global CREDS as a template
43 c.set_username(username)
44 c.set_password(password)
45 c.set_domain(CREDS.get_domain())
46 c.set_realm(CREDS.get_realm())
47 c.set_workstation(CREDS.get_workstation())
49 if kerberos_state is None:
50 kerberos_state = CREDS.get_kerberos_state()
51 c.set_kerberos_state(kerberos_state)
54 if kerberos_state == MUST_USE_KERBEROS:
55 print "we seem to be using kerberos for %s %s" % (username, password)
56 elif kerberos_state == DONT_USE_KERBEROS:
57 print "NOT using kerberos for %s %s" % (username, password)
59 print "kerberos state is %s" % kerberos_state
61 c.set_gensec_features(c.get_gensec_features() |
66 def set_auto_replication(dc, allow):
67 credstring = '-U%s%%%s' % (CREDS.get_username(),
70 on_or_off = '-' if allow else '+'
72 for opt in ['DISABLE_INBOUND_REPL',
73 'DISABLE_OUTBOUND_REPL']:
74 cmd = ['bin/samba-tool',
77 "--dsa-option=%s%s" % (on_or_off, opt)]
79 p = subprocess.Popen(cmd,
80 stderr=subprocess.PIPE,
81 stdout=subprocess.PIPE)
82 stdout, stderr = p.communicate()
84 if 'LDAP_REFERRAL' not in stderr:
85 raise RodcRwdcTestException()
86 print ("ignoring +%s REFERRAL error; assuming %s is RODC" %
90 def preload_rodc_user(user_dn):
91 credstring = '-U%s%%%s' % (CREDS.get_username(),
94 set_auto_replication(RWDC, True)
95 cmd = ['bin/samba-tool',
102 subprocess.check_call(cmd)
103 set_auto_replication(RWDC, False)
107 def get_server_ref_from_samdb(samdb):
108 server_name = samdb.get_serverName()
109 res = samdb.search(server_name,
110 scope=ldb.SCOPE_BASE,
111 attrs=['serverReference'])
113 return res[0]['serverReference'][0]
117 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
118 counter = itertools.count(1).next
120 def force_replication(self, base=None):
124 # XXX feels like a horrendous way to do it.
125 credstring = '-U%s%%%s' % (CREDS.get_username(),
126 CREDS.get_password())
127 cmd = ['bin/samba-tool',
133 p = subprocess.Popen(cmd,
134 stderr=subprocess.PIPE,
135 stdout=subprocess.PIPE)
136 stdout, stderr = p.communicate()
138 print "failed with code %s" % p.returncode
144 raise RodcRwdcTestException()
146 def _check_account_initial(self, dn):
147 self.force_replication()
148 return super(RodcRwdcTests, self)._check_account_initial(dn)
151 super(RodcRwdcTests, self).tearDown()
152 self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
153 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
154 set_auto_replication(RWDC, True)
157 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
158 session_info=system_session(LP), lp=LP)
160 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
161 session_info=system_session(LP), lp=LP)
163 # Define variables for BasePasswordTestCase
165 self.global_creds = CREDS
167 self.host_url = 'ldap://%s' % RWDC
168 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
169 credentials=self.global_creds, lp=self.lp)
171 super(RodcRwdcTests, self).setUp()
173 self.host_url = 'ldap://%s' % RODC
174 self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
175 credentials=self.global_creds, lp=self.lp)
177 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
178 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
179 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
181 self.base_dn = self.rwdc_db.domain_dn()
183 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
184 attrs=['dsServiceName'])
185 self.service = root[0]['dsServiceName'][0]
186 self.tag = uuid.uuid4().hex
188 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
189 self.rwdc_db.set_dsheuristics("000000001")
191 set_auto_replication(RWDC, False)
193 # make sure DCs are synchronized before the test
194 self.force_replication()
195 self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
196 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
198 def assertReferral(self, fn, *args, **kwargs):
201 self.fail("failed to raise ldap referral")
202 except ldb.LdbError as (code, msg):
203 self.assertEqual(code, ldb.ERR_REFERRAL,
204 "expected referral, got %s %s" % (code, msg))
206 def _test_rodc_dsheuristics(self):
207 d = self.rodc_db.get_dsheuristics()
208 self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
209 self.assertReferral(self.rodc_db.set_dsheuristics, d)
211 def TEST_rodc_heuristics_kerberos(self):
212 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
213 self._test_rodc_dsheuristics()
215 def TEST_rodc_heuristics_ntlm(self):
216 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
217 self._test_rodc_dsheuristics()
219 def _test_add(self, objects, cross_ncs=False):
223 base = str(self.rwdc_db.get_config_basedn())
224 controls = ["search_options:1:2"]
225 cn = dn.split(',', 1)[0]
226 expression = '(%s)' % cn
233 res = self.rodc_db.search(base,
234 expression=expression,
235 scope=ldb.SCOPE_SUBTREE,
238 self.assertEqual(len(res), 0)
239 except ldb.LdbError, e:
240 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
245 except ldb.LdbError as (ecode, emsg):
246 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
250 self.force_replication(base=base)
252 self.force_replication()
255 res = self.rodc_db.search(base,
256 expression=expression,
257 scope=ldb.SCOPE_SUBTREE,
260 self.assertEqual(len(res), 1)
261 except ldb.LdbError, e:
262 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
263 "replication seems to have failed")
265 def _test_add_replicated_objects(self, mode):
266 tag = "%s%s" % (self.tag, mode)
269 'dn': "ou=%s1,%s" % (tag, self.base_dn),
270 "objectclass": "organizationalUnit"
273 'dn': "cn=%s2,%s" % (tag, self.base_dn),
274 "objectclass": "user"
277 'dn': "cn=%s3,%s" % (tag, self.base_dn),
278 "objectclass": "group"
281 self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
282 self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
283 self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
285 def test_add_replicated_objects_kerberos(self):
286 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
287 self._test_add_replicated_objects('kerberos')
289 def test_add_replicated_objects_ntlm(self):
290 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
291 self._test_add_replicated_objects('ntlm')
293 def _test_add_replicated_connections(self, mode):
294 tag = "%s%s" % (self.tag, mode)
297 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
298 "objectclass": "NTDSConnection",
299 'enabledConnection': 'TRUE',
300 'fromServer': self.base_dn,
304 self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
306 def test_add_replicated_connections_kerberos(self):
307 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
308 self._test_add_replicated_connections('kerberos')
310 def test_add_replicated_connections_ntlm(self):
311 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
312 self._test_add_replicated_connections('ntlm')
314 def _test_modify_replicated_attributes(self):
315 dn = 'CN=Guest,CN=Users,' + self.base_dn
317 for attr in ['carLicense', 'middleName']:
319 m.dn = ldb.Dn(self.rwdc_db, dn)
320 m[attr] = ldb.MessageElement(value,
321 ldb.FLAG_MOD_REPLACE,
324 self.rwdc_db.modify(m)
325 except ldb.LdbError as e:
326 self.fail("Failed to modify %s %s on RWDC %s with %s" %
329 self.force_replication()
332 res = self.rodc_db.search(dn,
333 scope=ldb.SCOPE_SUBTREE,
335 results = [x[attr][0] for x in res]
336 self.assertEqual(results, [value])
337 except ldb.LdbError, e:
338 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
339 "replication seems to have failed")
341 def test_modify_replicated_attributes_kerberos(self):
342 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
343 self._test_modify_replicated_attributes()
345 def test_modify_replicated_attributes_ntlm(self):
346 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
347 self._test_modify_replicated_attributes()
349 def _test_add_modify_delete(self):
350 dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
351 values = ["%s%s" % (i, self.tag) for i in range(3)]
356 "objectclass": "user",
360 self.force_replication()
361 for value in values[1:]:
364 m.dn = ldb.Dn(self.rwdc_db, dn)
365 m[attr] = ldb.MessageElement(value,
366 ldb.FLAG_MOD_REPLACE,
369 self.rwdc_db.modify(m)
370 except ldb.LdbError as e:
371 self.fail("Failed to modify %s %s on RWDC %s with %s" %
374 self.force_replication()
377 res = self.rodc_db.search(dn,
378 scope=ldb.SCOPE_SUBTREE,
380 results = [x[attr][0] for x in res]
381 self.assertEqual(results, [value])
382 except ldb.LdbError, e:
383 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
384 "replication seems to have failed")
386 self.rwdc_db.delete(dn)
387 self.force_replication()
389 res = self.rodc_db.search(dn,
390 scope=ldb.SCOPE_SUBTREE,
393 self.fail("Failed to delete %s" % (dn))
394 except ldb.LdbError, e:
395 self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
396 "Failed to delete %s" % (dn))
398 def test_add_modify_delete_kerberos(self):
399 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
400 self._test_add_modify_delete()
402 def test_add_modify_delete_ntlm(self):
403 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
404 self._test_add_modify_delete()
407 username = "u%sX%s" % (self.tag[:12], self.counter())
408 password = 'password#1'
409 dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
412 "objectclass": "user",
413 'sAMAccountName': username,
417 except ldb.LdbError as e:
418 self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
420 self.rwdc_db.modify_ldif("dn: %s\n"
421 "changetype: modify\n"
422 "delete: userPassword\n"
423 "add: userPassword\n"
424 "userPassword: %s\n" % (dn, password))
425 self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
426 return (dn, username, password)
428 def _change_password(self, user_dn, old_password, new_password):
429 self.rwdc_db.modify_ldif(
431 "changetype: modify\n"
432 "delete: userPassword\n"
434 "add: userPassword\n"
435 "userPassword: %s\n" % (user_dn, old_password, new_password))
437 def try_ldap_logon(self, server, creds, errno=None):
439 tmpdb = SamDB('ldap://%s' % server, credentials=creds,
440 session_info=system_session(LP), lp=LP)
441 if errno is not None:
442 self.fail("logon failed to fail with ldb error %s" % errno)
443 except ldb.LdbError as (code, msg):
446 self.fail("logon incorrectly raised ldb error (code=%s)" %
449 self.fail("logon failed to raise correct ldb error"
450 "Expected: %s Got: %s" %
454 def zero_min_password_age(self):
455 min_pwd_age = int(self.rwdc_db.get_minPwdAge())
457 self.rwdc_db.set_minPwdAge('0')
459 def _test_ldap_change_password(self, errno=None):
460 self.zero_min_password_age()
462 dn, username, password = self._new_user()
463 creds1 = make_creds(username, password)
465 # With NTLM, this should fail on RODC before replication,
466 # because the user isn't known.
467 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
468 self.force_replication()
470 # Now the user is replicated to RODC, so logon should work
471 self.try_ldap_logon(RODC, creds1)
473 passwords = ['password#%s' % i for i in range(1, 6)]
474 for prev, password in zip(passwords[:-1], passwords[1:]):
475 self._change_password(dn, prev, password)
477 # The password has changed enough times to make the old
478 # password invalid (though with kerberos that doesn't matter).
479 # For NTLM, the old creds should always fail
480 self.try_ldap_logon(RODC, creds1, errno)
481 self.try_ldap_logon(RWDC, creds1, errno)
483 creds2 = make_creds(username, password)
485 # new creds work straight away with NTLM, because although it
486 # doesn't have the password, it knows the user and forwards
488 self.try_ldap_logon(RODC, creds2)
489 self.try_ldap_logon(RWDC, creds2)
491 self.force_replication()
493 # After another replication check RODC still works and fails,
494 # as appropriate to various creds
495 self.try_ldap_logon(RODC, creds2)
496 self.try_ldap_logon(RODC, creds1, errno)
499 password = 'password#6'
500 self._change_password(dn, prev, password)
501 creds3 = make_creds(username, password)
503 # previous password should still work.
504 self.try_ldap_logon(RWDC, creds2)
505 self.try_ldap_logon(RODC, creds2)
507 # new password should still work.
508 self.try_ldap_logon(RWDC, creds3)
509 self.try_ldap_logon(RODC, creds3)
511 # old password should still fail (but not on kerberos).
512 self.try_ldap_logon(RWDC, creds1, errno)
513 self.try_ldap_logon(RODC, creds1, errno)
515 def test_ldap_change_password_kerberos(self):
516 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
517 self._test_ldap_change_password()
519 def test_ldap_change_password_ntlm(self):
520 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
521 self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
523 def _test_ldap_change_password_reveal_on_demand(self, errno=None):
524 self.zero_min_password_age()
526 res = self.rodc_db.search(self.rodc_dn,
527 scope=ldb.SCOPE_BASE,
528 attrs=['msDS-RevealOnDemandGroup'])
530 group = res[0]['msDS-RevealOnDemandGroup'][0]
532 user_dn, username, password = self._new_user()
533 creds1 = make_creds(username, password)
536 m.dn = ldb.Dn(self.rwdc_db, group)
537 m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
538 self.rwdc_db.modify(m)
540 # Against Windows, this will just forward if no account exists on the KDC
541 # Therefore, this does not error on Windows.
542 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
544 self.force_replication()
547 self.try_ldap_logon(RODC, creds1)
548 preload_rodc_user(user_dn)
550 # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
551 self.try_ldap_logon(RODC, creds1)
553 passwords = ['password#%s' % i for i in range(1, 6)]
554 for prev, password in zip(passwords[:-1], passwords[1:]):
555 self._change_password(user_dn, prev, password)
557 # The password has changed enough times to make the old
558 # password invalid, but the RODC shouldn't know that.
559 self.try_ldap_logon(RODC, creds1)
560 self.try_ldap_logon(RWDC, creds1, errno)
562 creds2 = make_creds(username, password)
563 self.try_ldap_logon(RWDC, creds2)
564 # We can forward WRONG_PASSWORD over NTLM.
565 # This SHOULD succeed.
566 self.try_ldap_logon(RODC, creds2)
569 def test_change_password_reveal_on_demand_ntlm(self):
570 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
571 self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
573 def test_change_password_reveal_on_demand_kerberos(self):
574 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
575 self._test_ldap_change_password_reveal_on_demand()
577 def test_login_lockout_krb5(self):
578 username = self.lockout1krb5_creds.get_username()
579 userpass = self.lockout1krb5_creds.get_password()
580 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
582 preload_rodc_user(userdn)
584 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
585 fail_creds = self.insta_creds(self.template_creds,
587 userpass=userpass+"X",
588 kerberos_state=use_kerberos)
591 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
593 except LdbError, (num, msg):
594 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
596 # Succeed to reset everything to 0
597 success_creds = self.insta_creds(self.template_creds,
600 kerberos_state=use_kerberos)
602 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
604 self._test_login_lockout(self.lockout1krb5_creds)
606 def test_login_lockout_ntlm(self):
607 username = self.lockout1ntlm_creds.get_username()
608 userpass = self.lockout1ntlm_creds.get_password()
609 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
611 preload_rodc_user(userdn)
613 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
614 fail_creds = self.insta_creds(self.template_creds,
616 userpass=userpass+"X",
617 kerberos_state=use_kerberos)
620 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
622 except LdbError, (num, msg):
623 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
625 # Succeed to reset everything to 0
626 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
628 self._test_login_lockout(self.lockout1ntlm_creds)
630 def test_multiple_logon_krb5(self):
631 username = self.lockout1krb5_creds.get_username()
632 userpass = self.lockout1krb5_creds.get_password()
633 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
635 preload_rodc_user(userdn)
637 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
638 fail_creds = self.insta_creds(self.template_creds,
640 userpass=userpass+"X",
641 kerberos_state=use_kerberos)
644 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
646 except LdbError, (num, msg):
647 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
649 # Succeed to reset everything to 0
650 success_creds = self.insta_creds(self.template_creds,
653 kerberos_state=use_kerberos)
655 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
657 self._test_multiple_logon(self.lockout1krb5_creds)
659 def test_multiple_logon_ntlm(self):
660 username = self.lockout1ntlm_creds.get_username()
661 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
662 userpass = self.lockout1ntlm_creds.get_password()
664 preload_rodc_user(userdn)
666 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
667 fail_creds = self.insta_creds(self.template_creds,
669 userpass=userpass+"X",
670 kerberos_state=use_kerberos)
673 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
675 except LdbError, (num, msg):
676 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
678 # Succeed to reset everything to 0
679 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
681 self._test_multiple_logon(self.lockout1ntlm_creds)
684 global RODC, RWDC, CREDS, LP
685 parser = optparse.OptionParser(
686 "rodc_rwdc.py [options] <rodc host> <rwdc host>")
688 sambaopts = options.SambaOptions(parser)
689 versionopts = options.VersionOptions(parser)
690 credopts = options.CredentialsOptions(parser)
691 subunitopts = SubunitOptions(parser)
693 parser.add_option_group(sambaopts)
694 parser.add_option_group(versionopts)
695 parser.add_option_group(credopts)
696 parser.add_option_group(subunitopts)
698 opts, args = parser.parse_args()
700 LP = sambaopts.get_loadparm()
701 CREDS = credopts.get_credentials(LP)
702 CREDS.set_gensec_features(CREDS.get_gensec_features() |
711 set_auto_replication(RWDC, True)
713 TestProgram(module=__name__, opts=subunitopts)
715 set_auto_replication(RWDC, True)