2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
6 How does it work when the password is changed on the RWDC?
17 sys.path.insert(0, "bin/python")
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
31 import password_lockout_base
34 def passwd_encode(pw):
35 return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
38 class RodcRwdcTestException(Exception):
42 def make_creds(username, password, kerberos_state=None):
43 # use the global CREDS as a template
45 c.set_username(username)
46 c.set_password(password)
47 c.set_domain(CREDS.get_domain())
48 c.set_realm(CREDS.get_realm())
49 c.set_workstation(CREDS.get_workstation())
51 if kerberos_state is None:
52 kerberos_state = CREDS.get_kerberos_state()
53 c.set_kerberos_state(kerberos_state)
56 if kerberos_state == MUST_USE_KERBEROS:
57 print("we seem to be using kerberos for %s %s" % (username, password))
58 elif kerberos_state == DONT_USE_KERBEROS:
59 print("NOT using kerberos for %s %s" % (username, password))
61 print("kerberos state is %s" % kerberos_state)
63 c.set_gensec_features(c.get_gensec_features() |
68 def set_auto_replication(dc, allow):
69 credstring = '-U%s%%%s' % (CREDS.get_username(),
72 on_or_off = '-' if allow else '+'
74 for opt in ['DISABLE_INBOUND_REPL',
75 'DISABLE_OUTBOUND_REPL']:
76 cmd = ['bin/samba-tool',
79 "--dsa-option=%s%s" % (on_or_off, opt)]
81 p = subprocess.Popen(cmd,
82 stderr=subprocess.PIPE,
83 stdout=subprocess.PIPE)
84 stdout, stderr = p.communicate()
86 if 'LDAP_REFERRAL' not in stderr:
87 raise RodcRwdcTestException()
88 print("ignoring +%s REFERRAL error; assuming %s is RODC" %
92 def preload_rodc_user(user_dn):
93 credstring = '-U%s%%%s' % (CREDS.get_username(),
96 set_auto_replication(RWDC, True)
97 cmd = ['bin/samba-tool',
104 subprocess.check_call(cmd)
105 set_auto_replication(RWDC, False)
109 def get_server_ref_from_samdb(samdb):
110 server_name = samdb.get_serverName()
111 res = samdb.search(server_name,
112 scope=ldb.SCOPE_BASE,
113 attrs=['serverReference'])
115 return res[0]['serverReference'][0]
118 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
119 counter = itertools.count(1).next
121 def _check_account_initial(self, dn):
122 self.force_replication()
123 return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
125 def _check_account(self, dn,
127 badPasswordTime=None,
130 lastLogonTimestamp=None,
132 userAccountControl=None,
133 msDSUserAccountControlComputed=None,
134 effective_bad_password_count=None,
136 badPwdCountOnly=False):
137 # Wait for the RWDC to get any delayed messages
138 # e.g. SendToSam or KRB5 bad passwords via winbindd
139 if (self.kerberos and isinstance(badPasswordTime, tuple) or
143 return super(RodcRwdcCachedTests,
144 self)._check_account(dn, badPwdCount, badPasswordTime,
145 logonCount, lastLogon,
146 lastLogonTimestamp, lockoutTime,
148 msDSUserAccountControlComputed,
149 effective_bad_password_count, msg,
152 def force_replication(self, base=None):
156 # XXX feels like a horrendous way to do it.
157 credstring = '-U%s%%%s' % (CREDS.get_username(),
158 CREDS.get_password())
159 cmd = ['bin/samba-tool',
165 p = subprocess.Popen(cmd,
166 stderr=subprocess.PIPE,
167 stdout=subprocess.PIPE)
168 stdout, stderr = p.communicate()
170 print("failed with code %s" % p.returncode)
176 raise RodcRwdcTestException()
178 def _change_password(self, user_dn, old_password, new_password):
179 self.rwdc_db.modify_ldif(
181 "changetype: modify\n"
182 "delete: userPassword\n"
184 "add: userPassword\n"
185 "userPassword: %s\n" % (user_dn, old_password, new_password))
188 super(RodcRwdcCachedTests, self).tearDown()
189 set_auto_replication(RWDC, True)
192 self.kerberos = False # To be set later
194 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
195 session_info=system_session(LP), lp=LP)
197 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
198 session_info=system_session(LP), lp=LP)
200 # Define variables for BasePasswordTestCase
202 self.global_creds = CREDS
204 self.host_url = 'ldap://%s' % RWDC
205 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
206 credentials=self.global_creds, lp=self.lp)
208 super(RodcRwdcCachedTests, self).setUp()
209 self.host_url = 'ldap://%s' % RODC
211 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
212 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
213 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
215 self.base_dn = self.rwdc_db.domain_dn()
217 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
218 attrs=['dsServiceName'])
219 self.service = root[0]['dsServiceName'][0]
220 self.tag = uuid.uuid4().hex
222 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
223 self.rwdc_db.set_dsheuristics("000000001")
225 set_auto_replication(RWDC, False)
227 # make sure DCs are synchronized before the test
228 self.force_replication()
230 def delete_ldb_connections(self):
231 super(RodcRwdcCachedTests, self).delete_ldb_connections()
235 def test_cache_and_flush_password(self):
236 username = self.lockout1krb5_creds.get_username()
237 userpass = self.lockout1krb5_creds.get_password()
238 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
240 ldb_system = SamDB(session_info=system_session(self.lp),
241 credentials=self.global_creds, lp=self.lp)
243 res = ldb_system.search(userdn, attrs=['unicodePwd'])
244 self.assertFalse('unicodePwd' in res[0])
246 preload_rodc_user(userdn)
248 res = ldb_system.search(userdn, attrs=['unicodePwd'])
249 self.assertTrue('unicodePwd' in res[0])
251 newpass = userpass + '!'
253 # Forcing replication should blank out password (when changed)
254 self._change_password(userdn, userpass, newpass)
255 self.force_replication()
257 res = ldb_system.search(userdn, attrs=['unicodePwd'])
258 self.assertFalse('unicodePwd' in res[0])
260 def test_login_lockout_krb5(self):
261 username = self.lockout1krb5_creds.get_username()
262 userpass = self.lockout1krb5_creds.get_password()
263 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
265 preload_rodc_user(userdn)
269 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
271 res = self.rodc_db.search(self.rodc_dn,
272 scope=ldb.SCOPE_BASE,
273 attrs=['msDS-RevealOnDemandGroup'])
275 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
278 m.dn = ldb.Dn(self.rwdc_db, group)
279 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
280 self.rwdc_db.modify(m)
283 m.dn = ldb.Dn(self.ldb, self.base_dn)
285 self.account_lockout_duration = 10
286 account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
288 m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
289 ldb.FLAG_MOD_REPLACE,
292 self.lockout_observation_window = 10
293 lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
295 m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
296 ldb.FLAG_MOD_REPLACE,
297 "lockOutObservationWindow")
299 self.rwdc_db.modify(m)
300 self.force_replication()
302 self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
304 def test_login_lockout_ntlm(self):
305 username = self.lockout1ntlm_creds.get_username()
306 userpass = self.lockout1ntlm_creds.get_password()
307 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
309 preload_rodc_user(userdn)
311 self.kerberos = False
313 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
315 res = self.rodc_db.search(self.rodc_dn,
316 scope=ldb.SCOPE_BASE,
317 attrs=['msDS-RevealOnDemandGroup'])
319 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
322 m.dn = ldb.Dn(self.rwdc_db, group)
323 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
324 self.rwdc_db.modify(m)
326 self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
328 def test_login_lockout_not_revealed(self):
329 '''Test that SendToSam is restricted by preloaded users/groups'''
331 username = self.lockout1ntlm_creds.get_username()
332 userpass = self.lockout1ntlm_creds.get_password()
333 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
335 # Preload but do not add to revealed group
336 preload_rodc_user(userdn)
338 self.kerberos = False
340 creds = self.lockout1ntlm_creds
342 # Open a second LDB connection with the user credentials. Use the
343 # command line credentials for informations like the domain, the realm
344 # and the workstation.
345 creds_lockout = self.insta_creds(creds)
348 creds_lockout.set_password("thatsAcomplPASS1x")
350 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
355 lastLogonTimestamp = 0
356 logoncount_relation = ''
357 lastlogon_relation = ''
359 res = self._check_account(userdn,
361 badPasswordTime=("greater", badPasswordTime),
362 logonCount=logonCount,
364 lastLogonTimestamp=lastLogonTimestamp,
365 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
366 msDSUserAccountControlComputed=0,
367 msg='lastlogontimestamp with wrong password')
368 badPasswordTime = int(res[0]["badPasswordTime"][0])
370 # BadPwdCount on RODC increases alongside RWDC
371 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
372 self.assertTrue('badPwdCount' in res[0])
373 self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
375 # Correct old password
376 creds_lockout.set_password(userpass)
378 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
380 # Wait for potential SendToSam...
383 # BadPwdCount on RODC decreases, but not the RWDC
384 res = self._check_account(userdn,
386 badPasswordTime=badPasswordTime,
387 logonCount=(logoncount_relation, logonCount),
388 lastLogon=('greater', lastLogon),
389 lastLogonTimestamp=lastLogonTimestamp,
390 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
391 msDSUserAccountControlComputed=0,
392 msg='badPwdCount not reset on RWDC')
394 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
395 self.assertTrue('badPwdCount' in res[0])
396 self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
398 def _test_login_lockout_rodc_rwdc(self, creds, userdn):
399 username = creds.get_username()
400 userpass = creds.get_password()
402 # Open a second LDB connection with the user credentials. Use the
403 # command line credentials for informations like the domain, the realm
404 # and the workstation.
405 creds_lockout = self.insta_creds(creds)
408 creds_lockout.set_password("thatsAcomplPASS1x")
410 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
415 lastLogonTimestamp = 0
416 logoncount_relation = ''
417 lastlogon_relation = ''
419 res = self._check_account(userdn,
421 badPasswordTime=("greater", badPasswordTime),
422 logonCount=logonCount,
424 lastLogonTimestamp=lastLogonTimestamp,
425 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
426 msDSUserAccountControlComputed=0,
427 msg='lastlogontimestamp with wrong password')
428 badPasswordTime = int(res[0]["badPasswordTime"][0])
430 # Correct old password
431 creds_lockout.set_password(userpass)
433 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
435 # lastLogonTimestamp should not change
436 # lastLogon increases if badPwdCount is non-zero (!)
437 res = self._check_account(userdn,
439 badPasswordTime=badPasswordTime,
440 logonCount=(logoncount_relation, logonCount),
441 lastLogon=('greater', lastLogon),
442 lastLogonTimestamp=lastLogonTimestamp,
443 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
444 msDSUserAccountControlComputed=0,
445 msg='LLTimestamp is updated to lastlogon')
447 logonCount = int(res[0]["logonCount"][0])
448 lastLogon = int(res[0]["lastLogon"][0])
451 creds_lockout.set_password("thatsAcomplPASS1x")
453 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
455 res = self._check_account(userdn,
457 badPasswordTime=("greater", badPasswordTime),
458 logonCount=logonCount,
460 lastLogonTimestamp=lastLogonTimestamp,
461 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
462 msDSUserAccountControlComputed=0)
463 badPasswordTime = int(res[0]["badPasswordTime"][0])
466 creds_lockout.set_password("thatsAcomplPASS1x")
469 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
472 except LdbError as e1:
474 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
476 res = self._check_account(userdn,
478 badPasswordTime=("greater", badPasswordTime),
479 logonCount=logonCount,
481 lastLogonTimestamp=lastLogonTimestamp,
482 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
483 msDSUserAccountControlComputed=0)
484 badPasswordTime = int(res[0]["badPasswordTime"][0])
486 print("two failed password change")
489 creds_lockout.set_password("thatsAcomplPASS1x")
492 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
495 except LdbError as e2:
497 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
499 res = self._check_account(userdn,
501 badPasswordTime=("greater", badPasswordTime),
502 logonCount=logonCount,
504 lastLogonTimestamp=lastLogonTimestamp,
505 lockoutTime=("greater", badPasswordTime),
506 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
507 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
508 badPasswordTime = int(res[0]["badPasswordTime"][0])
509 lockoutTime = int(res[0]["lockoutTime"][0])
512 creds_lockout.set_password("thatsAcomplPASS1x")
514 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
516 except LdbError as e3:
518 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
520 res = self._check_account(userdn,
522 badPasswordTime=badPasswordTime,
523 logonCount=logonCount,
525 lastLogonTimestamp=lastLogonTimestamp,
526 lockoutTime=lockoutTime,
527 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
528 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
531 creds_lockout.set_password("thatsAcomplPASS1x")
533 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
535 except LdbError as e4:
537 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
539 res = self._check_account(userdn,
541 badPasswordTime=badPasswordTime,
542 logonCount=logonCount,
544 lastLogonTimestamp=lastLogonTimestamp,
545 lockoutTime=lockoutTime,
546 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
547 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
549 # The correct password, but we are locked out
550 creds_lockout.set_password(userpass)
552 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
554 except LdbError as e5:
556 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
558 res = self._check_account(userdn,
560 badPasswordTime=badPasswordTime,
561 logonCount=logonCount,
563 lastLogonTimestamp=lastLogonTimestamp,
564 lockoutTime=lockoutTime,
565 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
566 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
568 # wait for the lockout to end
569 time.sleep(self.account_lockout_duration + 1)
570 print(self.account_lockout_duration + 1)
572 res = self._check_account(userdn,
573 badPwdCount=3, effective_bad_password_count=0,
574 badPasswordTime=badPasswordTime,
575 logonCount=logonCount,
576 lockoutTime=lockoutTime,
578 lastLogonTimestamp=lastLogonTimestamp,
579 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
580 msDSUserAccountControlComputed=0)
582 # The correct password after letting the timeout expire
584 creds_lockout.set_password(userpass)
586 creds_lockout2 = self.insta_creds(creds_lockout)
588 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
591 res = self._check_account(userdn,
593 badPasswordTime=badPasswordTime,
594 logonCount=(logoncount_relation, logonCount),
595 lastLogon=(lastlogon_relation, lastLogon),
596 lastLogonTimestamp=lastLogonTimestamp,
597 lockoutTime=lockoutTime,
598 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
599 msDSUserAccountControlComputed=0,
600 msg="lastLogon is way off")
603 creds_lockout.set_password("thatsAcomplPASS1x")
605 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
607 except LdbError as e6:
609 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
611 res = self._check_account(userdn,
613 badPasswordTime=("greater", badPasswordTime),
614 logonCount=logonCount,
615 lockoutTime=lockoutTime,
617 lastLogonTimestamp=lastLogonTimestamp,
618 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
619 msDSUserAccountControlComputed=0)
620 badPasswordTime = int(res[0]["badPasswordTime"][0])
623 creds_lockout.set_password("thatsAcomplPASS1x")
625 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
627 except LdbError as e7:
629 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
631 res = self._check_account(userdn,
633 badPasswordTime=("greater", badPasswordTime),
634 logonCount=logonCount,
635 lockoutTime=lockoutTime,
637 lastLogonTimestamp=lastLogonTimestamp,
638 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
639 msDSUserAccountControlComputed=0)
640 badPasswordTime = int(res[0]["badPasswordTime"][0])
642 time.sleep(self.lockout_observation_window + 1)
644 res = self._check_account(userdn,
645 badPwdCount=2, effective_bad_password_count=0,
646 badPasswordTime=badPasswordTime,
647 logonCount=logonCount,
648 lockoutTime=lockoutTime,
650 lastLogonTimestamp=lastLogonTimestamp,
651 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
652 msDSUserAccountControlComputed=0)
655 creds_lockout.set_password("thatsAcomplPASS1x")
657 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
659 except LdbError as e8:
661 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
663 res = self._check_account(userdn,
665 badPasswordTime=("greater", badPasswordTime),
666 logonCount=logonCount,
667 lockoutTime=lockoutTime,
669 lastLogonTimestamp=lastLogonTimestamp,
670 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
671 msDSUserAccountControlComputed=0)
672 badPasswordTime = int(res[0]["badPasswordTime"][0])
674 # The correct password without letting the timeout expire
675 creds_lockout.set_password(userpass)
676 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
678 res = self._check_account(userdn,
680 badPasswordTime=badPasswordTime,
681 logonCount=(logoncount_relation, logonCount),
682 lockoutTime=lockoutTime,
683 lastLogon=("greater", lastLogon),
684 lastLogonTimestamp=lastLogonTimestamp,
685 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
686 msDSUserAccountControlComputed=0)
689 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
690 counter = itertools.count(1).next
692 def force_replication(self, base=None):
696 # XXX feels like a horrendous way to do it.
697 credstring = '-U%s%%%s' % (CREDS.get_username(),
698 CREDS.get_password())
699 cmd = ['bin/samba-tool',
705 p = subprocess.Popen(cmd,
706 stderr=subprocess.PIPE,
707 stdout=subprocess.PIPE)
708 stdout, stderr = p.communicate()
710 print("failed with code %s" % p.returncode)
716 raise RodcRwdcTestException()
718 def _check_account_initial(self, dn):
719 self.force_replication()
720 return super(RodcRwdcTests, self)._check_account_initial(dn)
723 super(RodcRwdcTests, self).tearDown()
724 self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
725 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
726 set_auto_replication(RWDC, True)
729 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
730 session_info=system_session(LP), lp=LP)
732 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
733 session_info=system_session(LP), lp=LP)
735 # Define variables for BasePasswordTestCase
737 self.global_creds = CREDS
739 self.host_url = 'ldap://%s' % RWDC
740 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
741 credentials=self.global_creds, lp=self.lp)
743 super(RodcRwdcTests, self).setUp()
745 self.host_url = 'ldap://%s' % RODC
746 self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
747 credentials=self.global_creds, lp=self.lp)
749 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
750 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
751 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
753 self.base_dn = self.rwdc_db.domain_dn()
755 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
756 attrs=['dsServiceName'])
757 self.service = root[0]['dsServiceName'][0]
758 self.tag = uuid.uuid4().hex
760 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
761 self.rwdc_db.set_dsheuristics("000000001")
763 set_auto_replication(RWDC, False)
765 # make sure DCs are synchronized before the test
766 self.force_replication()
767 self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
768 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
770 def delete_ldb_connections(self):
771 super(RodcRwdcTests, self).delete_ldb_connections()
775 def assertReferral(self, fn, *args, **kwargs):
778 self.fail("failed to raise ldap referral")
779 except ldb.LdbError as e9:
780 (code, msg) = e9.args
781 self.assertEqual(code, ldb.ERR_REFERRAL,
782 "expected referral, got %s %s" % (code, msg))
784 def _test_rodc_dsheuristics(self):
785 d = self.rodc_db.get_dsheuristics()
786 self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
787 self.assertReferral(self.rodc_db.set_dsheuristics, d)
789 def TEST_rodc_heuristics_kerberos(self):
790 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
791 self._test_rodc_dsheuristics()
793 def TEST_rodc_heuristics_ntlm(self):
794 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
795 self._test_rodc_dsheuristics()
797 def _test_add(self, objects, cross_ncs=False):
801 base = str(self.rwdc_db.get_config_basedn())
802 controls = ["search_options:1:2"]
803 cn = dn.split(',', 1)[0]
804 expression = '(%s)' % cn
811 res = self.rodc_db.search(base,
812 expression=expression,
813 scope=ldb.SCOPE_SUBTREE,
816 self.assertEqual(len(res), 0)
817 except ldb.LdbError as e:
818 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
823 except ldb.LdbError as e:
824 (ecode, emsg) = e.args
825 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
829 self.force_replication(base=base)
831 self.force_replication()
834 res = self.rodc_db.search(base,
835 expression=expression,
836 scope=ldb.SCOPE_SUBTREE,
839 self.assertEqual(len(res), 1)
840 except ldb.LdbError as e:
841 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
842 "replication seems to have failed")
844 def _test_add_replicated_objects(self, mode):
845 tag = "%s%s" % (self.tag, mode)
848 'dn': "ou=%s1,%s" % (tag, self.base_dn),
849 "objectclass": "organizationalUnit"
852 'dn': "cn=%s2,%s" % (tag, self.base_dn),
853 "objectclass": "user"
856 'dn': "cn=%s3,%s" % (tag, self.base_dn),
857 "objectclass": "group"
860 self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
861 self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
862 self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
864 def test_add_replicated_objects_kerberos(self):
865 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
866 self._test_add_replicated_objects('kerberos')
868 def test_add_replicated_objects_ntlm(self):
869 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
870 self._test_add_replicated_objects('ntlm')
872 def _test_add_replicated_connections(self, mode):
873 tag = "%s%s" % (self.tag, mode)
876 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
877 "objectclass": "NTDSConnection",
878 'enabledConnection': 'TRUE',
879 'fromServer': self.base_dn,
883 self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
885 def test_add_replicated_connections_kerberos(self):
886 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
887 self._test_add_replicated_connections('kerberos')
889 def test_add_replicated_connections_ntlm(self):
890 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
891 self._test_add_replicated_connections('ntlm')
893 def _test_modify_replicated_attributes(self):
894 dn = 'CN=Guest,CN=Users,' + self.base_dn
896 for attr in ['carLicense', 'middleName']:
898 m.dn = ldb.Dn(self.rwdc_db, dn)
899 m[attr] = ldb.MessageElement(value,
900 ldb.FLAG_MOD_REPLACE,
903 self.rwdc_db.modify(m)
904 except ldb.LdbError as e:
905 self.fail("Failed to modify %s %s on RWDC %s with %s" %
908 self.force_replication()
911 res = self.rodc_db.search(dn,
912 scope=ldb.SCOPE_SUBTREE,
914 results = [x[attr][0] for x in res]
915 self.assertEqual(results, [value])
916 except ldb.LdbError as e:
917 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
918 "replication seems to have failed")
920 def test_modify_replicated_attributes_kerberos(self):
921 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
922 self._test_modify_replicated_attributes()
924 def test_modify_replicated_attributes_ntlm(self):
925 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
926 self._test_modify_replicated_attributes()
928 def _test_add_modify_delete(self):
929 dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
930 values = ["%s%s" % (i, self.tag) for i in range(3)]
935 "objectclass": "user",
939 self.force_replication()
940 for value in values[1:]:
943 m.dn = ldb.Dn(self.rwdc_db, dn)
944 m[attr] = ldb.MessageElement(value,
945 ldb.FLAG_MOD_REPLACE,
948 self.rwdc_db.modify(m)
949 except ldb.LdbError as e:
950 self.fail("Failed to modify %s %s on RWDC %s with %s" %
953 self.force_replication()
956 res = self.rodc_db.search(dn,
957 scope=ldb.SCOPE_SUBTREE,
959 results = [x[attr][0] for x in res]
960 self.assertEqual(results, [value])
961 except ldb.LdbError as e:
962 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
963 "replication seems to have failed")
965 self.rwdc_db.delete(dn)
966 self.force_replication()
968 res = self.rodc_db.search(dn,
969 scope=ldb.SCOPE_SUBTREE,
972 self.fail("Failed to delete %s" % (dn))
973 except ldb.LdbError as e:
974 self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
975 "Failed to delete %s" % (dn))
977 def test_add_modify_delete_kerberos(self):
978 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
979 self._test_add_modify_delete()
981 def test_add_modify_delete_ntlm(self):
982 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
983 self._test_add_modify_delete()
986 username = "u%sX%s" % (self.tag[:12], self.counter())
987 password = 'password#1'
988 dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
991 "objectclass": "user",
992 'sAMAccountName': username,
996 except ldb.LdbError as e:
997 self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
999 self.rwdc_db.modify_ldif("dn: %s\n"
1000 "changetype: modify\n"
1001 "delete: userPassword\n"
1002 "add: userPassword\n"
1003 "userPassword: %s\n" % (dn, password))
1004 self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1005 return (dn, username, password)
1007 def _change_password(self, user_dn, old_password, new_password):
1008 self.rwdc_db.modify_ldif(
1010 "changetype: modify\n"
1011 "delete: userPassword\n"
1012 "userPassword: %s\n"
1013 "add: userPassword\n"
1014 "userPassword: %s\n" % (user_dn, old_password, new_password))
1016 def try_ldap_logon(self, server, creds, errno=None):
1018 tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1019 session_info=system_session(LP), lp=LP)
1020 if errno is not None:
1021 self.fail("logon failed to fail with ldb error %s" % errno)
1022 except ldb.LdbError as e10:
1023 (code, msg) = e10.args
1026 self.fail("logon incorrectly raised ldb error (code=%s)" %
1029 self.fail("logon failed to raise correct ldb error"
1030 "Expected: %s Got: %s" %
1034 def zero_min_password_age(self):
1035 min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1036 if min_pwd_age != 0:
1037 self.rwdc_db.set_minPwdAge('0')
1039 def _test_ldap_change_password(self, errno=None):
1040 self.zero_min_password_age()
1042 dn, username, password = self._new_user()
1043 creds1 = make_creds(username, password)
1045 # With NTLM, this should fail on RODC before replication,
1046 # because the user isn't known.
1047 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1048 self.force_replication()
1050 # Now the user is replicated to RODC, so logon should work
1051 self.try_ldap_logon(RODC, creds1)
1053 passwords = ['password#%s' % i for i in range(1, 6)]
1054 for prev, password in zip(passwords[:-1], passwords[1:]):
1055 self._change_password(dn, prev, password)
1057 # The password has changed enough times to make the old
1058 # password invalid (though with kerberos that doesn't matter).
1059 # For NTLM, the old creds should always fail
1060 self.try_ldap_logon(RODC, creds1, errno)
1061 self.try_ldap_logon(RWDC, creds1, errno)
1063 creds2 = make_creds(username, password)
1065 # new creds work straight away with NTLM, because although it
1066 # doesn't have the password, it knows the user and forwards
1068 self.try_ldap_logon(RODC, creds2)
1069 self.try_ldap_logon(RWDC, creds2)
1071 self.force_replication()
1073 # After another replication check RODC still works and fails,
1074 # as appropriate to various creds
1075 self.try_ldap_logon(RODC, creds2)
1076 self.try_ldap_logon(RODC, creds1, errno)
1079 password = 'password#6'
1080 self._change_password(dn, prev, password)
1081 creds3 = make_creds(username, password)
1083 # previous password should still work.
1084 self.try_ldap_logon(RWDC, creds2)
1085 self.try_ldap_logon(RODC, creds2)
1087 # new password should still work.
1088 self.try_ldap_logon(RWDC, creds3)
1089 self.try_ldap_logon(RODC, creds3)
1091 # old password should still fail (but not on kerberos).
1092 self.try_ldap_logon(RWDC, creds1, errno)
1093 self.try_ldap_logon(RODC, creds1, errno)
1095 def test_ldap_change_password_kerberos(self):
1096 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1097 self._test_ldap_change_password()
1099 def test_ldap_change_password_ntlm(self):
1100 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1101 self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1103 def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1104 self.zero_min_password_age()
1106 res = self.rodc_db.search(self.rodc_dn,
1107 scope=ldb.SCOPE_BASE,
1108 attrs=['msDS-RevealOnDemandGroup'])
1110 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1112 user_dn, username, password = self._new_user()
1113 creds1 = make_creds(username, password)
1116 m.dn = ldb.Dn(self.rwdc_db, group)
1117 m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1118 self.rwdc_db.modify(m)
1120 # Against Windows, this will just forward if no account exists on the KDC
1121 # Therefore, this does not error on Windows.
1122 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1124 self.force_replication()
1127 self.try_ldap_logon(RODC, creds1)
1128 preload_rodc_user(user_dn)
1130 # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1131 self.try_ldap_logon(RODC, creds1)
1133 passwords = ['password#%s' % i for i in range(1, 6)]
1134 for prev, password in zip(passwords[:-1], passwords[1:]):
1135 self._change_password(user_dn, prev, password)
1137 # The password has changed enough times to make the old
1138 # password invalid, but the RODC shouldn't know that.
1139 self.try_ldap_logon(RODC, creds1)
1140 self.try_ldap_logon(RWDC, creds1, errno)
1142 creds2 = make_creds(username, password)
1143 self.try_ldap_logon(RWDC, creds2)
1144 # We can forward WRONG_PASSWORD over NTLM.
1145 # This SHOULD succeed.
1146 self.try_ldap_logon(RODC, creds2)
1149 def test_change_password_reveal_on_demand_ntlm(self):
1150 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1151 self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1153 def test_change_password_reveal_on_demand_kerberos(self):
1154 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1155 self._test_ldap_change_password_reveal_on_demand()
1157 def test_login_lockout_krb5(self):
1158 username = self.lockout1krb5_creds.get_username()
1159 userpass = self.lockout1krb5_creds.get_password()
1160 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1162 preload_rodc_user(userdn)
1164 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1165 fail_creds = self.insta_creds(self.template_creds,
1167 userpass=userpass + "X",
1168 kerberos_state=use_kerberos)
1171 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1173 except LdbError as e11:
1174 (num, msg) = e11.args
1175 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1177 # Succeed to reset everything to 0
1178 success_creds = self.insta_creds(self.template_creds,
1181 kerberos_state=use_kerberos)
1183 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1185 self._test_login_lockout(self.lockout1krb5_creds)
1187 def test_login_lockout_ntlm(self):
1188 username = self.lockout1ntlm_creds.get_username()
1189 userpass = self.lockout1ntlm_creds.get_password()
1190 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1192 preload_rodc_user(userdn)
1194 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1195 fail_creds = self.insta_creds(self.template_creds,
1197 userpass=userpass + "X",
1198 kerberos_state=use_kerberos)
1201 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1203 except LdbError as e12:
1204 (num, msg) = e12.args
1205 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1207 # Succeed to reset everything to 0
1208 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1210 self._test_login_lockout(self.lockout1ntlm_creds)
1212 def test_multiple_logon_krb5(self):
1213 username = self.lockout1krb5_creds.get_username()
1214 userpass = self.lockout1krb5_creds.get_password()
1215 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1217 preload_rodc_user(userdn)
1219 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1220 fail_creds = self.insta_creds(self.template_creds,
1222 userpass=userpass + "X",
1223 kerberos_state=use_kerberos)
1226 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1228 except LdbError as e13:
1229 (num, msg) = e13.args
1230 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1232 # Succeed to reset everything to 0
1233 success_creds = self.insta_creds(self.template_creds,
1236 kerberos_state=use_kerberos)
1238 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1240 self._test_multiple_logon(self.lockout1krb5_creds)
1242 def test_multiple_logon_ntlm(self):
1243 username = self.lockout1ntlm_creds.get_username()
1244 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1245 userpass = self.lockout1ntlm_creds.get_password()
1247 preload_rodc_user(userdn)
1249 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1250 fail_creds = self.insta_creds(self.template_creds,
1252 userpass=userpass + "X",
1253 kerberos_state=use_kerberos)
1256 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1258 except LdbError as e14:
1259 (num, msg) = e14.args
1260 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1262 # Succeed to reset everything to 0
1263 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1265 self._test_multiple_logon(self.lockout1ntlm_creds)
1269 global RODC, RWDC, CREDS, LP
1270 parser = optparse.OptionParser(
1271 "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1273 sambaopts = options.SambaOptions(parser)
1274 versionopts = options.VersionOptions(parser)
1275 credopts = options.CredentialsOptions(parser)
1276 subunitopts = SubunitOptions(parser)
1278 parser.add_option_group(sambaopts)
1279 parser.add_option_group(versionopts)
1280 parser.add_option_group(credopts)
1281 parser.add_option_group(subunitopts)
1283 opts, args = parser.parse_args()
1285 LP = sambaopts.get_loadparm()
1286 CREDS = credopts.get_credentials(LP)
1287 CREDS.set_gensec_features(CREDS.get_gensec_features() |
1288 gensec.FEATURE_SEAL)
1293 parser.print_usage()
1296 set_auto_replication(RWDC, True)
1298 TestProgram(module=__name__, opts=subunitopts)
1300 set_auto_replication(RWDC, True)