2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
6 How does it work when the password is changed on the RWDC?
17 sys.path.insert(0, "bin/python")
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
32 import password_lockout_base
34 def adjust_cmd_for_py_version(parts):
35 if os.getenv("PYTHON", None):
36 parts.insert(0, os.environ["PYTHON"])
39 def passwd_encode(pw):
40 return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
43 class RodcRwdcTestException(Exception):
47 def make_creds(username, password, kerberos_state=None):
48 # use the global CREDS as a template
50 c.set_username(username)
51 c.set_password(password)
52 c.set_domain(CREDS.get_domain())
53 c.set_realm(CREDS.get_realm())
54 c.set_workstation(CREDS.get_workstation())
56 if kerberos_state is None:
57 kerberos_state = CREDS.get_kerberos_state()
58 c.set_kerberos_state(kerberos_state)
61 if kerberos_state == MUST_USE_KERBEROS:
62 print("we seem to be using kerberos for %s %s" % (username, password))
63 elif kerberos_state == DONT_USE_KERBEROS:
64 print("NOT using kerberos for %s %s" % (username, password))
66 print("kerberos state is %s" % kerberos_state)
68 c.set_gensec_features(c.get_gensec_features() |
73 def set_auto_replication(dc, allow):
74 credstring = '-U%s%%%s' % (CREDS.get_username(),
77 on_or_off = '-' if allow else '+'
79 for opt in ['DISABLE_INBOUND_REPL',
80 'DISABLE_OUTBOUND_REPL']:
81 cmd = adjust_cmd_for_py_version(['bin/samba-tool',
84 "--dsa-option=%s%s" % (on_or_off, opt)])
86 p = subprocess.Popen(cmd,
87 stderr=subprocess.PIPE,
88 stdout=subprocess.PIPE)
89 stdout, stderr = p.communicate()
91 if b'LDAP_REFERRAL' not in stderr:
92 raise RodcRwdcTestException()
93 print("ignoring +%s REFERRAL error; assuming %s is RODC" %
97 def preload_rodc_user(user_dn):
98 credstring = '-U%s%%%s' % (CREDS.get_username(),
101 set_auto_replication(RWDC, True)
102 cmd = adjust_cmd_for_py_version(['bin/samba-tool',
109 subprocess.check_call(cmd)
110 set_auto_replication(RWDC, False)
113 def get_server_ref_from_samdb(samdb):
114 server_name = samdb.get_serverName()
115 res = samdb.search(server_name,
116 scope=ldb.SCOPE_BASE,
117 attrs=['serverReference'])
119 return res[0]['serverReference'][0]
122 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
124 def _check_account_initial(self, dn):
125 self.force_replication()
126 return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
128 def _check_account(self, dn,
130 badPasswordTime=None,
133 lastLogonTimestamp=None,
135 userAccountControl=None,
136 msDSUserAccountControlComputed=None,
137 effective_bad_password_count=None,
139 badPwdCountOnly=False):
140 # Wait for the RWDC to get any delayed messages
141 # e.g. SendToSam or KRB5 bad passwords via winbindd
142 if (self.kerberos and isinstance(badPasswordTime, tuple) or
146 return super(RodcRwdcCachedTests,
147 self)._check_account(dn, badPwdCount, badPasswordTime,
148 logonCount, lastLogon,
149 lastLogonTimestamp, lockoutTime,
151 msDSUserAccountControlComputed,
152 effective_bad_password_count, msg,
155 def force_replication(self, base=None):
159 # XXX feels like a horrendous way to do it.
160 credstring = '-U%s%%%s' % (CREDS.get_username(),
161 CREDS.get_password())
162 cmd = adjust_cmd_for_py_version(['bin/samba-tool',
168 p = subprocess.Popen(cmd,
169 stderr=subprocess.PIPE,
170 stdout=subprocess.PIPE)
171 stdout, stderr = p.communicate()
173 print("failed with code %s" % p.returncode)
179 raise RodcRwdcTestException()
181 def _change_password(self, user_dn, old_password, new_password):
182 self.rwdc_db.modify_ldif(
184 "changetype: modify\n"
185 "delete: userPassword\n"
187 "add: userPassword\n"
188 "userPassword: %s\n" % (user_dn, old_password, new_password))
191 super(RodcRwdcCachedTests, self).tearDown()
192 set_auto_replication(RWDC, True)
195 self.kerberos = False # To be set later
197 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
198 session_info=system_session(LP), lp=LP)
200 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
201 session_info=system_session(LP), lp=LP)
203 # Define variables for BasePasswordTestCase
205 self.global_creds = CREDS
207 self.host_url = 'ldap://%s' % RWDC
208 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
209 credentials=self.global_creds, lp=self.lp)
211 super(RodcRwdcCachedTests, self).setUp()
212 self.host_url = 'ldap://%s' % RODC
214 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
215 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
216 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
218 self.base_dn = self.rwdc_db.domain_dn()
220 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
221 attrs=['dsServiceName'])
222 self.service = root[0]['dsServiceName'][0]
223 self.tag = uuid.uuid4().hex
225 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
226 self.rwdc_db.set_dsheuristics("000000001")
228 set_auto_replication(RWDC, False)
230 # make sure DCs are synchronized before the test
231 self.force_replication()
233 def delete_ldb_connections(self):
234 super(RodcRwdcCachedTests, self).delete_ldb_connections()
238 def test_cache_and_flush_password(self):
239 username = self.lockout1krb5_creds.get_username()
240 userpass = self.lockout1krb5_creds.get_password()
241 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
243 ldb_system = SamDB(session_info=system_session(self.lp),
244 credentials=self.global_creds, lp=self.lp)
246 res = ldb_system.search(userdn, attrs=['unicodePwd'])
247 self.assertFalse('unicodePwd' in res[0])
249 preload_rodc_user(userdn)
251 res = ldb_system.search(userdn, attrs=['unicodePwd'])
252 self.assertTrue('unicodePwd' in res[0])
254 newpass = userpass + '!'
256 # Forcing replication should blank out password (when changed)
257 self._change_password(userdn, userpass, newpass)
258 self.force_replication()
260 res = ldb_system.search(userdn, attrs=['unicodePwd'])
261 self.assertFalse('unicodePwd' in res[0])
263 def test_login_lockout_krb5(self):
264 username = self.lockout1krb5_creds.get_username()
265 userpass = self.lockout1krb5_creds.get_password()
266 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
268 preload_rodc_user(userdn)
272 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
274 res = self.rodc_db.search(self.rodc_dn,
275 scope=ldb.SCOPE_BASE,
276 attrs=['msDS-RevealOnDemandGroup'])
278 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
281 m.dn = ldb.Dn(self.rwdc_db, group)
282 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
283 self.rwdc_db.modify(m)
286 m.dn = ldb.Dn(self.ldb, self.base_dn)
288 self.account_lockout_duration = 10
289 account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
291 m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
292 ldb.FLAG_MOD_REPLACE,
295 self.lockout_observation_window = 10
296 lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
298 m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
299 ldb.FLAG_MOD_REPLACE,
300 "lockOutObservationWindow")
302 self.rwdc_db.modify(m)
303 self.force_replication()
305 self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
307 def test_login_lockout_ntlm(self):
308 username = self.lockout1ntlm_creds.get_username()
309 userpass = self.lockout1ntlm_creds.get_password()
310 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
312 preload_rodc_user(userdn)
314 self.kerberos = False
316 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
318 res = self.rodc_db.search(self.rodc_dn,
319 scope=ldb.SCOPE_BASE,
320 attrs=['msDS-RevealOnDemandGroup'])
322 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
325 m.dn = ldb.Dn(self.rwdc_db, group)
326 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
327 self.rwdc_db.modify(m)
329 self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
331 def test_login_lockout_not_revealed(self):
332 '''Test that SendToSam is restricted by preloaded users/groups'''
334 username = self.lockout1ntlm_creds.get_username()
335 userpass = self.lockout1ntlm_creds.get_password()
336 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
338 # Preload but do not add to revealed group
339 preload_rodc_user(userdn)
341 self.kerberos = False
343 creds = self.lockout1ntlm_creds
345 # Open a second LDB connection with the user credentials. Use the
346 # command line credentials for informations like the domain, the realm
347 # and the workstation.
348 creds_lockout = self.insta_creds(creds)
351 creds_lockout.set_password("thatsAcomplPASS1x")
353 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
358 lastLogonTimestamp = 0
359 logoncount_relation = ''
360 lastlogon_relation = ''
362 res = self._check_account(userdn,
364 badPasswordTime=("greater", badPasswordTime),
365 logonCount=logonCount,
367 lastLogonTimestamp=lastLogonTimestamp,
368 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
369 msDSUserAccountControlComputed=0,
370 msg='lastlogontimestamp with wrong password')
371 badPasswordTime = int(res[0]["badPasswordTime"][0])
373 # BadPwdCount on RODC increases alongside RWDC
374 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
375 self.assertTrue('badPwdCount' in res[0])
376 self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
378 # Correct old password
379 creds_lockout.set_password(userpass)
381 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
383 # Wait for potential SendToSam...
386 # BadPwdCount on RODC decreases, but not the RWDC
387 res = self._check_account(userdn,
389 badPasswordTime=badPasswordTime,
390 logonCount=(logoncount_relation, logonCount),
391 lastLogon=('greater', lastLogon),
392 lastLogonTimestamp=lastLogonTimestamp,
393 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
394 msDSUserAccountControlComputed=0,
395 msg='badPwdCount not reset on RWDC')
397 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
398 self.assertTrue('badPwdCount' in res[0])
399 self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
401 def _test_login_lockout_rodc_rwdc(self, creds, userdn):
402 username = creds.get_username()
403 userpass = creds.get_password()
405 # Open a second LDB connection with the user credentials. Use the
406 # command line credentials for informations like the domain, the realm
407 # and the workstation.
408 creds_lockout = self.insta_creds(creds)
411 creds_lockout.set_password("thatsAcomplPASS1x")
413 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
418 lastLogonTimestamp = 0
419 logoncount_relation = ''
420 lastlogon_relation = ''
422 res = self._check_account(userdn,
424 badPasswordTime=("greater", badPasswordTime),
425 logonCount=logonCount,
427 lastLogonTimestamp=lastLogonTimestamp,
428 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
429 msDSUserAccountControlComputed=0,
430 msg='lastlogontimestamp with wrong password')
431 badPasswordTime = int(res[0]["badPasswordTime"][0])
433 # Correct old password
434 creds_lockout.set_password(userpass)
436 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
438 # lastLogonTimestamp should not change
439 # lastLogon increases if badPwdCount is non-zero (!)
440 res = self._check_account(userdn,
442 badPasswordTime=badPasswordTime,
443 logonCount=(logoncount_relation, logonCount),
444 lastLogon=('greater', lastLogon),
445 lastLogonTimestamp=lastLogonTimestamp,
446 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
447 msDSUserAccountControlComputed=0,
448 msg='LLTimestamp is updated to lastlogon')
450 logonCount = int(res[0]["logonCount"][0])
451 lastLogon = int(res[0]["lastLogon"][0])
454 creds_lockout.set_password("thatsAcomplPASS1x")
456 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
458 res = self._check_account(userdn,
460 badPasswordTime=("greater", badPasswordTime),
461 logonCount=logonCount,
463 lastLogonTimestamp=lastLogonTimestamp,
464 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
465 msDSUserAccountControlComputed=0)
466 badPasswordTime = int(res[0]["badPasswordTime"][0])
469 creds_lockout.set_password("thatsAcomplPASS1x")
472 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
475 except LdbError as e1:
477 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
479 res = self._check_account(userdn,
481 badPasswordTime=("greater", badPasswordTime),
482 logonCount=logonCount,
484 lastLogonTimestamp=lastLogonTimestamp,
485 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
486 msDSUserAccountControlComputed=0)
487 badPasswordTime = int(res[0]["badPasswordTime"][0])
489 print("two failed password change")
492 creds_lockout.set_password("thatsAcomplPASS1x")
495 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
498 except LdbError as e2:
500 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
502 res = self._check_account(userdn,
504 badPasswordTime=("greater", badPasswordTime),
505 logonCount=logonCount,
507 lastLogonTimestamp=lastLogonTimestamp,
508 lockoutTime=("greater", badPasswordTime),
509 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
510 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
511 badPasswordTime = int(res[0]["badPasswordTime"][0])
512 lockoutTime = int(res[0]["lockoutTime"][0])
515 creds_lockout.set_password("thatsAcomplPASS1x")
517 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
519 except LdbError as e3:
521 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
523 res = self._check_account(userdn,
525 badPasswordTime=badPasswordTime,
526 logonCount=logonCount,
528 lastLogonTimestamp=lastLogonTimestamp,
529 lockoutTime=lockoutTime,
530 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
531 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
534 creds_lockout.set_password("thatsAcomplPASS1x")
536 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
538 except LdbError as e4:
540 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
542 res = self._check_account(userdn,
544 badPasswordTime=badPasswordTime,
545 logonCount=logonCount,
547 lastLogonTimestamp=lastLogonTimestamp,
548 lockoutTime=lockoutTime,
549 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
550 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
552 # The correct password, but we are locked out
553 creds_lockout.set_password(userpass)
555 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
557 except LdbError as e5:
559 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
561 res = self._check_account(userdn,
563 badPasswordTime=badPasswordTime,
564 logonCount=logonCount,
566 lastLogonTimestamp=lastLogonTimestamp,
567 lockoutTime=lockoutTime,
568 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
569 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
571 # wait for the lockout to end
572 time.sleep(self.account_lockout_duration + 1)
573 print(self.account_lockout_duration + 1)
575 res = self._check_account(userdn,
576 badPwdCount=3, effective_bad_password_count=0,
577 badPasswordTime=badPasswordTime,
578 logonCount=logonCount,
579 lockoutTime=lockoutTime,
581 lastLogonTimestamp=lastLogonTimestamp,
582 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
583 msDSUserAccountControlComputed=0)
585 # The correct password after letting the timeout expire
587 creds_lockout.set_password(userpass)
589 creds_lockout2 = self.insta_creds(creds_lockout)
591 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
594 res = self._check_account(userdn,
596 badPasswordTime=badPasswordTime,
597 logonCount=(logoncount_relation, logonCount),
598 lastLogon=(lastlogon_relation, lastLogon),
599 lastLogonTimestamp=lastLogonTimestamp,
600 lockoutTime=lockoutTime,
601 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
602 msDSUserAccountControlComputed=0,
603 msg="lastLogon is way off")
606 creds_lockout.set_password("thatsAcomplPASS1x")
608 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
610 except LdbError as e6:
612 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
614 res = self._check_account(userdn,
616 badPasswordTime=("greater", badPasswordTime),
617 logonCount=logonCount,
618 lockoutTime=lockoutTime,
620 lastLogonTimestamp=lastLogonTimestamp,
621 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
622 msDSUserAccountControlComputed=0)
623 badPasswordTime = int(res[0]["badPasswordTime"][0])
626 creds_lockout.set_password("thatsAcomplPASS1x")
628 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
630 except LdbError as e7:
632 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
634 res = self._check_account(userdn,
636 badPasswordTime=("greater", badPasswordTime),
637 logonCount=logonCount,
638 lockoutTime=lockoutTime,
640 lastLogonTimestamp=lastLogonTimestamp,
641 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
642 msDSUserAccountControlComputed=0)
643 badPasswordTime = int(res[0]["badPasswordTime"][0])
645 time.sleep(self.lockout_observation_window + 1)
647 res = self._check_account(userdn,
648 badPwdCount=2, effective_bad_password_count=0,
649 badPasswordTime=badPasswordTime,
650 logonCount=logonCount,
651 lockoutTime=lockoutTime,
653 lastLogonTimestamp=lastLogonTimestamp,
654 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
655 msDSUserAccountControlComputed=0)
658 creds_lockout.set_password("thatsAcomplPASS1x")
660 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
662 except LdbError as e8:
664 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
666 res = self._check_account(userdn,
668 badPasswordTime=("greater", badPasswordTime),
669 logonCount=logonCount,
670 lockoutTime=lockoutTime,
672 lastLogonTimestamp=lastLogonTimestamp,
673 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
674 msDSUserAccountControlComputed=0)
675 badPasswordTime = int(res[0]["badPasswordTime"][0])
677 # The correct password without letting the timeout expire
678 creds_lockout.set_password(userpass)
679 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
681 res = self._check_account(userdn,
683 badPasswordTime=badPasswordTime,
684 logonCount=(logoncount_relation, logonCount),
685 lockoutTime=lockoutTime,
686 lastLogon=("greater", lastLogon),
687 lastLogonTimestamp=lastLogonTimestamp,
688 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
689 msDSUserAccountControlComputed=0)
692 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
693 counter = itertools.count(1, 1)
695 def force_replication(self, base=None):
699 # XXX feels like a horrendous way to do it.
700 credstring = '-U%s%%%s' % (CREDS.get_username(),
701 CREDS.get_password())
702 cmd = adjust_cmd_for_py_version(['bin/samba-tool',
708 p = subprocess.Popen(cmd,
709 stderr=subprocess.PIPE,
710 stdout=subprocess.PIPE)
711 stdout, stderr = p.communicate()
713 print("failed with code %s" % p.returncode)
719 raise RodcRwdcTestException()
721 def _check_account_initial(self, dn):
722 self.force_replication()
723 return super(RodcRwdcTests, self)._check_account_initial(dn)
726 super(RodcRwdcTests, self).tearDown()
727 self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
728 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
729 set_auto_replication(RWDC, True)
732 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
733 session_info=system_session(LP), lp=LP)
735 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
736 session_info=system_session(LP), lp=LP)
738 # Define variables for BasePasswordTestCase
740 self.global_creds = CREDS
742 self.host_url = 'ldap://%s' % RWDC
743 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
744 credentials=self.global_creds, lp=self.lp)
746 super(RodcRwdcTests, self).setUp()
748 self.host_url = 'ldap://%s' % RODC
749 self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
750 credentials=self.global_creds, lp=self.lp)
752 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
753 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
754 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
756 self.base_dn = self.rwdc_db.domain_dn()
758 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
759 attrs=['dsServiceName'])
760 self.service = root[0]['dsServiceName'][0]
761 self.tag = uuid.uuid4().hex
763 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
764 self.rwdc_db.set_dsheuristics("000000001")
766 set_auto_replication(RWDC, False)
768 # make sure DCs are synchronized before the test
769 self.force_replication()
770 self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
771 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
773 def delete_ldb_connections(self):
774 super(RodcRwdcTests, self).delete_ldb_connections()
778 def assertReferral(self, fn, *args, **kwargs):
781 self.fail("failed to raise ldap referral")
782 except ldb.LdbError as e9:
783 (code, msg) = e9.args
784 self.assertEqual(code, ldb.ERR_REFERRAL,
785 "expected referral, got %s %s" % (code, msg))
787 def _test_rodc_dsheuristics(self):
788 d = self.rodc_db.get_dsheuristics()
789 self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
790 self.assertReferral(self.rodc_db.set_dsheuristics, d)
792 def TEST_rodc_heuristics_kerberos(self):
793 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
794 self._test_rodc_dsheuristics()
796 def TEST_rodc_heuristics_ntlm(self):
797 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
798 self._test_rodc_dsheuristics()
800 def _test_add(self, objects, cross_ncs=False):
804 base = str(self.rwdc_db.get_config_basedn())
805 controls = ["search_options:1:2"]
806 cn = dn.split(',', 1)[0]
807 expression = '(%s)' % cn
814 res = self.rodc_db.search(base,
815 expression=expression,
816 scope=ldb.SCOPE_SUBTREE,
819 self.assertEqual(len(res), 0)
820 except ldb.LdbError as e:
821 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
826 except ldb.LdbError as e:
827 (ecode, emsg) = e.args
828 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
832 self.force_replication(base=base)
834 self.force_replication()
837 res = self.rodc_db.search(base,
838 expression=expression,
839 scope=ldb.SCOPE_SUBTREE,
842 self.assertEqual(len(res), 1)
843 except ldb.LdbError as e:
844 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
845 "replication seems to have failed")
847 def _test_add_replicated_objects(self, mode):
848 tag = "%s%s" % (self.tag, mode)
851 'dn': "ou=%s1,%s" % (tag, self.base_dn),
852 "objectclass": "organizationalUnit"
855 'dn': "cn=%s2,%s" % (tag, self.base_dn),
856 "objectclass": "user"
859 'dn': "cn=%s3,%s" % (tag, self.base_dn),
860 "objectclass": "group"
863 self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
864 self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
865 self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
867 def test_add_replicated_objects_kerberos(self):
868 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
869 self._test_add_replicated_objects('kerberos')
871 def test_add_replicated_objects_ntlm(self):
872 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
873 self._test_add_replicated_objects('ntlm')
875 def _test_add_replicated_connections(self, mode):
876 tag = "%s%s" % (self.tag, mode)
879 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
880 "objectclass": "NTDSConnection",
881 'enabledConnection': 'TRUE',
882 'fromServer': self.base_dn,
886 self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
888 def test_add_replicated_connections_kerberos(self):
889 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
890 self._test_add_replicated_connections('kerberos')
892 def test_add_replicated_connections_ntlm(self):
893 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
894 self._test_add_replicated_connections('ntlm')
896 def _test_modify_replicated_attributes(self):
897 dn = 'CN=Guest,CN=Users,' + self.base_dn
899 for attr in ['carLicense', 'middleName']:
901 m.dn = ldb.Dn(self.rwdc_db, dn)
902 m[attr] = ldb.MessageElement(value,
903 ldb.FLAG_MOD_REPLACE,
906 self.rwdc_db.modify(m)
907 except ldb.LdbError as e:
908 self.fail("Failed to modify %s %s on RWDC %s with %s" %
911 self.force_replication()
914 res = self.rodc_db.search(dn,
915 scope=ldb.SCOPE_SUBTREE,
917 results = [str(x[attr][0]) for x in res]
918 self.assertEqual(results, [value])
919 except ldb.LdbError as e:
920 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
921 "replication seems to have failed")
923 def test_modify_replicated_attributes_kerberos(self):
924 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
925 self._test_modify_replicated_attributes()
927 def test_modify_replicated_attributes_ntlm(self):
928 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
929 self._test_modify_replicated_attributes()
931 def _test_add_modify_delete(self):
932 dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
933 values = ["%s%s" % (i, self.tag) for i in range(3)]
938 "objectclass": "user",
942 self.force_replication()
943 for value in values[1:]:
946 m.dn = ldb.Dn(self.rwdc_db, dn)
947 m[attr] = ldb.MessageElement(value,
948 ldb.FLAG_MOD_REPLACE,
951 self.rwdc_db.modify(m)
952 except ldb.LdbError as e:
953 self.fail("Failed to modify %s %s on RWDC %s with %s" %
956 self.force_replication()
959 res = self.rodc_db.search(dn,
960 scope=ldb.SCOPE_SUBTREE,
962 results = [str(x[attr][0]) for x in res]
963 self.assertEqual(results, [value])
964 except ldb.LdbError as e:
965 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
966 "replication seems to have failed")
968 self.rwdc_db.delete(dn)
969 self.force_replication()
971 res = self.rodc_db.search(dn,
972 scope=ldb.SCOPE_SUBTREE,
975 self.fail("Failed to delete %s" % (dn))
976 except ldb.LdbError as e:
977 self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
978 "Failed to delete %s" % (dn))
980 def test_add_modify_delete_kerberos(self):
981 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
982 self._test_add_modify_delete()
984 def test_add_modify_delete_ntlm(self):
985 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
986 self._test_add_modify_delete()
989 username = "u%sX%s" % (self.tag[:12], next(self.counter))
990 password = 'password#1'
991 dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
994 "objectclass": "user",
995 'sAMAccountName': username,
999 except ldb.LdbError as e:
1000 self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
1002 self.rwdc_db.modify_ldif("dn: %s\n"
1003 "changetype: modify\n"
1004 "delete: userPassword\n"
1005 "add: userPassword\n"
1006 "userPassword: %s\n" % (dn, password))
1007 self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1008 return (dn, username, password)
1010 def _change_password(self, user_dn, old_password, new_password):
1011 self.rwdc_db.modify_ldif(
1013 "changetype: modify\n"
1014 "delete: userPassword\n"
1015 "userPassword: %s\n"
1016 "add: userPassword\n"
1017 "userPassword: %s\n" % (user_dn, old_password, new_password))
1019 def try_ldap_logon(self, server, creds, errno=None):
1021 tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1022 session_info=system_session(LP), lp=LP)
1023 if errno is not None:
1024 self.fail("logon failed to fail with ldb error %s" % errno)
1025 except ldb.LdbError as e10:
1026 (code, msg) = e10.args
1029 self.fail("logon incorrectly raised ldb error (code=%s)" %
1032 self.fail("logon failed to raise correct ldb error"
1033 "Expected: %s Got: %s" %
1036 def zero_min_password_age(self):
1037 min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1038 if min_pwd_age != 0:
1039 self.rwdc_db.set_minPwdAge('0')
1041 def _test_ldap_change_password(self, errno=None):
1042 self.zero_min_password_age()
1044 dn, username, password = self._new_user()
1045 creds1 = make_creds(username, password)
1047 # With NTLM, this should fail on RODC before replication,
1048 # because the user isn't known.
1049 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1050 self.force_replication()
1052 # Now the user is replicated to RODC, so logon should work
1053 self.try_ldap_logon(RODC, creds1)
1055 passwords = ['password#%s' % i for i in range(1, 6)]
1056 for prev, password in zip(passwords[:-1], passwords[1:]):
1057 self._change_password(dn, prev, password)
1059 # The password has changed enough times to make the old
1060 # password invalid (though with kerberos that doesn't matter).
1061 # For NTLM, the old creds should always fail
1062 self.try_ldap_logon(RODC, creds1, errno)
1063 self.try_ldap_logon(RWDC, creds1, errno)
1065 creds2 = make_creds(username, password)
1067 # new creds work straight away with NTLM, because although it
1068 # doesn't have the password, it knows the user and forwards
1070 self.try_ldap_logon(RODC, creds2)
1071 self.try_ldap_logon(RWDC, creds2)
1073 self.force_replication()
1075 # After another replication check RODC still works and fails,
1076 # as appropriate to various creds
1077 self.try_ldap_logon(RODC, creds2)
1078 self.try_ldap_logon(RODC, creds1, errno)
1081 password = 'password#6'
1082 self._change_password(dn, prev, password)
1083 creds3 = make_creds(username, password)
1085 # previous password should still work.
1086 self.try_ldap_logon(RWDC, creds2)
1087 self.try_ldap_logon(RODC, creds2)
1089 # new password should still work.
1090 self.try_ldap_logon(RWDC, creds3)
1091 self.try_ldap_logon(RODC, creds3)
1093 # old password should still fail (but not on kerberos).
1094 self.try_ldap_logon(RWDC, creds1, errno)
1095 self.try_ldap_logon(RODC, creds1, errno)
1097 def test_ldap_change_password_kerberos(self):
1098 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1099 self._test_ldap_change_password()
1101 def test_ldap_change_password_ntlm(self):
1102 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1103 self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1105 def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1106 self.zero_min_password_age()
1108 res = self.rodc_db.search(self.rodc_dn,
1109 scope=ldb.SCOPE_BASE,
1110 attrs=['msDS-RevealOnDemandGroup'])
1112 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1114 user_dn, username, password = self._new_user()
1115 creds1 = make_creds(username, password)
1118 m.dn = ldb.Dn(self.rwdc_db, group)
1119 m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1120 self.rwdc_db.modify(m)
1122 # Against Windows, this will just forward if no account exists on the KDC
1123 # Therefore, this does not error on Windows.
1124 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1126 self.force_replication()
1129 self.try_ldap_logon(RODC, creds1)
1130 preload_rodc_user(user_dn)
1132 # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1133 self.try_ldap_logon(RODC, creds1)
1135 passwords = ['password#%s' % i for i in range(1, 6)]
1136 for prev, password in zip(passwords[:-1], passwords[1:]):
1137 self._change_password(user_dn, prev, password)
1139 # The password has changed enough times to make the old
1140 # password invalid, but the RODC shouldn't know that.
1141 self.try_ldap_logon(RODC, creds1)
1142 self.try_ldap_logon(RWDC, creds1, errno)
1144 creds2 = make_creds(username, password)
1145 self.try_ldap_logon(RWDC, creds2)
1146 # We can forward WRONG_PASSWORD over NTLM.
1147 # This SHOULD succeed.
1148 self.try_ldap_logon(RODC, creds2)
1150 def test_change_password_reveal_on_demand_ntlm(self):
1151 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1152 self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1154 def test_change_password_reveal_on_demand_kerberos(self):
1155 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1156 self._test_ldap_change_password_reveal_on_demand()
1158 def test_login_lockout_krb5(self):
1159 username = self.lockout1krb5_creds.get_username()
1160 userpass = self.lockout1krb5_creds.get_password()
1161 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1163 preload_rodc_user(userdn)
1165 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1166 fail_creds = self.insta_creds(self.template_creds,
1168 userpass=userpass + "X",
1169 kerberos_state=use_kerberos)
1172 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1174 except LdbError as e11:
1175 (num, msg) = e11.args
1176 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1178 # Succeed to reset everything to 0
1179 success_creds = self.insta_creds(self.template_creds,
1182 kerberos_state=use_kerberos)
1184 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1186 self._test_login_lockout(self.lockout1krb5_creds)
1188 def test_login_lockout_ntlm(self):
1189 username = self.lockout1ntlm_creds.get_username()
1190 userpass = self.lockout1ntlm_creds.get_password()
1191 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1193 preload_rodc_user(userdn)
1195 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1196 fail_creds = self.insta_creds(self.template_creds,
1198 userpass=userpass + "X",
1199 kerberos_state=use_kerberos)
1202 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1204 except LdbError as e12:
1205 (num, msg) = e12.args
1206 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1208 # Succeed to reset everything to 0
1209 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1211 self._test_login_lockout(self.lockout1ntlm_creds)
1213 def test_multiple_logon_krb5(self):
1214 username = self.lockout1krb5_creds.get_username()
1215 userpass = self.lockout1krb5_creds.get_password()
1216 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1218 preload_rodc_user(userdn)
1220 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1221 fail_creds = self.insta_creds(self.template_creds,
1223 userpass=userpass + "X",
1224 kerberos_state=use_kerberos)
1227 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1229 except LdbError as e13:
1230 (num, msg) = e13.args
1231 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1233 # Succeed to reset everything to 0
1234 success_creds = self.insta_creds(self.template_creds,
1237 kerberos_state=use_kerberos)
1239 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1241 self._test_multiple_logon(self.lockout1krb5_creds)
1243 def test_multiple_logon_ntlm(self):
1244 username = self.lockout1ntlm_creds.get_username()
1245 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1246 userpass = self.lockout1ntlm_creds.get_password()
1248 preload_rodc_user(userdn)
1250 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1251 fail_creds = self.insta_creds(self.template_creds,
1253 userpass=userpass + "X",
1254 kerberos_state=use_kerberos)
1257 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1259 except LdbError as e14:
1260 (num, msg) = e14.args
1261 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1263 # Succeed to reset everything to 0
1264 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1266 self._test_multiple_logon(self.lockout1ntlm_creds)
1270 global RODC, RWDC, CREDS, LP
1271 parser = optparse.OptionParser(
1272 "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1274 sambaopts = options.SambaOptions(parser)
1275 versionopts = options.VersionOptions(parser)
1276 credopts = options.CredentialsOptions(parser)
1277 subunitopts = SubunitOptions(parser)
1279 parser.add_option_group(sambaopts)
1280 parser.add_option_group(versionopts)
1281 parser.add_option_group(credopts)
1282 parser.add_option_group(subunitopts)
1284 opts, args = parser.parse_args()
1286 LP = sambaopts.get_loadparm()
1287 CREDS = credopts.get_credentials(LP)
1288 CREDS.set_gensec_features(CREDS.get_gensec_features() |
1289 gensec.FEATURE_SEAL)
1294 parser.print_usage()
1297 set_auto_replication(RWDC, True)
1299 TestProgram(module=__name__, opts=subunitopts)
1301 set_auto_replication(RWDC, True)