2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
6 How does it work when the password is changed on the RWDC?
17 sys.path.insert(0, "bin/python")
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
31 import password_lockout_base
34 def passwd_encode(pw):
35 return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
38 class RodcRwdcTestException(Exception):
42 def make_creds(username, password, kerberos_state=None):
43 # use the global CREDS as a template
45 c.set_username(username)
46 c.set_password(password)
47 c.set_domain(CREDS.get_domain())
48 c.set_realm(CREDS.get_realm())
49 c.set_workstation(CREDS.get_workstation())
51 if kerberos_state is None:
52 kerberos_state = CREDS.get_kerberos_state()
53 c.set_kerberos_state(kerberos_state)
56 if kerberos_state == MUST_USE_KERBEROS:
57 print("we seem to be using kerberos for %s %s" % (username, password))
58 elif kerberos_state == DONT_USE_KERBEROS:
59 print("NOT using kerberos for %s %s" % (username, password))
61 print("kerberos state is %s" % kerberos_state)
63 c.set_gensec_features(c.get_gensec_features() |
68 def set_auto_replication(dc, allow):
69 credstring = '-U%s%%%s' % (CREDS.get_username(),
72 on_or_off = '-' if allow else '+'
74 for opt in ['DISABLE_INBOUND_REPL',
75 'DISABLE_OUTBOUND_REPL']:
76 cmd = ['bin/samba-tool',
79 "--dsa-option=%s%s" % (on_or_off, opt)]
81 p = subprocess.Popen(cmd,
82 stderr=subprocess.PIPE,
83 stdout=subprocess.PIPE)
84 stdout, stderr = p.communicate()
86 if 'LDAP_REFERRAL' not in stderr:
87 raise RodcRwdcTestException()
88 print("ignoring +%s REFERRAL error; assuming %s is RODC" %
92 def preload_rodc_user(user_dn):
93 credstring = '-U%s%%%s' % (CREDS.get_username(),
96 set_auto_replication(RWDC, True)
97 cmd = ['bin/samba-tool',
104 subprocess.check_call(cmd)
105 set_auto_replication(RWDC, False)
108 def get_server_ref_from_samdb(samdb):
109 server_name = samdb.get_serverName()
110 res = samdb.search(server_name,
111 scope=ldb.SCOPE_BASE,
112 attrs=['serverReference'])
114 return res[0]['serverReference'][0]
117 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
118 counter = itertools.count(1).next
120 def _check_account_initial(self, dn):
121 self.force_replication()
122 return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
124 def _check_account(self, dn,
126 badPasswordTime=None,
129 lastLogonTimestamp=None,
131 userAccountControl=None,
132 msDSUserAccountControlComputed=None,
133 effective_bad_password_count=None,
135 badPwdCountOnly=False):
136 # Wait for the RWDC to get any delayed messages
137 # e.g. SendToSam or KRB5 bad passwords via winbindd
138 if (self.kerberos and isinstance(badPasswordTime, tuple) or
142 return super(RodcRwdcCachedTests,
143 self)._check_account(dn, badPwdCount, badPasswordTime,
144 logonCount, lastLogon,
145 lastLogonTimestamp, lockoutTime,
147 msDSUserAccountControlComputed,
148 effective_bad_password_count, msg,
151 def force_replication(self, base=None):
155 # XXX feels like a horrendous way to do it.
156 credstring = '-U%s%%%s' % (CREDS.get_username(),
157 CREDS.get_password())
158 cmd = ['bin/samba-tool',
164 p = subprocess.Popen(cmd,
165 stderr=subprocess.PIPE,
166 stdout=subprocess.PIPE)
167 stdout, stderr = p.communicate()
169 print("failed with code %s" % p.returncode)
175 raise RodcRwdcTestException()
177 def _change_password(self, user_dn, old_password, new_password):
178 self.rwdc_db.modify_ldif(
180 "changetype: modify\n"
181 "delete: userPassword\n"
183 "add: userPassword\n"
184 "userPassword: %s\n" % (user_dn, old_password, new_password))
187 super(RodcRwdcCachedTests, self).tearDown()
188 set_auto_replication(RWDC, True)
191 self.kerberos = False # To be set later
193 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
194 session_info=system_session(LP), lp=LP)
196 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
197 session_info=system_session(LP), lp=LP)
199 # Define variables for BasePasswordTestCase
201 self.global_creds = CREDS
203 self.host_url = 'ldap://%s' % RWDC
204 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
205 credentials=self.global_creds, lp=self.lp)
207 super(RodcRwdcCachedTests, self).setUp()
208 self.host_url = 'ldap://%s' % RODC
210 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
211 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
212 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
214 self.base_dn = self.rwdc_db.domain_dn()
216 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
217 attrs=['dsServiceName'])
218 self.service = root[0]['dsServiceName'][0]
219 self.tag = uuid.uuid4().hex
221 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
222 self.rwdc_db.set_dsheuristics("000000001")
224 set_auto_replication(RWDC, False)
226 # make sure DCs are synchronized before the test
227 self.force_replication()
229 def delete_ldb_connections(self):
230 super(RodcRwdcCachedTests, self).delete_ldb_connections()
234 def test_cache_and_flush_password(self):
235 username = self.lockout1krb5_creds.get_username()
236 userpass = self.lockout1krb5_creds.get_password()
237 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
239 ldb_system = SamDB(session_info=system_session(self.lp),
240 credentials=self.global_creds, lp=self.lp)
242 res = ldb_system.search(userdn, attrs=['unicodePwd'])
243 self.assertFalse('unicodePwd' in res[0])
245 preload_rodc_user(userdn)
247 res = ldb_system.search(userdn, attrs=['unicodePwd'])
248 self.assertTrue('unicodePwd' in res[0])
250 newpass = userpass + '!'
252 # Forcing replication should blank out password (when changed)
253 self._change_password(userdn, userpass, newpass)
254 self.force_replication()
256 res = ldb_system.search(userdn, attrs=['unicodePwd'])
257 self.assertFalse('unicodePwd' in res[0])
259 def test_login_lockout_krb5(self):
260 username = self.lockout1krb5_creds.get_username()
261 userpass = self.lockout1krb5_creds.get_password()
262 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
264 preload_rodc_user(userdn)
268 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
270 res = self.rodc_db.search(self.rodc_dn,
271 scope=ldb.SCOPE_BASE,
272 attrs=['msDS-RevealOnDemandGroup'])
274 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
277 m.dn = ldb.Dn(self.rwdc_db, group)
278 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
279 self.rwdc_db.modify(m)
282 m.dn = ldb.Dn(self.ldb, self.base_dn)
284 self.account_lockout_duration = 10
285 account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
287 m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
288 ldb.FLAG_MOD_REPLACE,
291 self.lockout_observation_window = 10
292 lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
294 m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
295 ldb.FLAG_MOD_REPLACE,
296 "lockOutObservationWindow")
298 self.rwdc_db.modify(m)
299 self.force_replication()
301 self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
303 def test_login_lockout_ntlm(self):
304 username = self.lockout1ntlm_creds.get_username()
305 userpass = self.lockout1ntlm_creds.get_password()
306 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
308 preload_rodc_user(userdn)
310 self.kerberos = False
312 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
314 res = self.rodc_db.search(self.rodc_dn,
315 scope=ldb.SCOPE_BASE,
316 attrs=['msDS-RevealOnDemandGroup'])
318 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
321 m.dn = ldb.Dn(self.rwdc_db, group)
322 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
323 self.rwdc_db.modify(m)
325 self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
327 def test_login_lockout_not_revealed(self):
328 '''Test that SendToSam is restricted by preloaded users/groups'''
330 username = self.lockout1ntlm_creds.get_username()
331 userpass = self.lockout1ntlm_creds.get_password()
332 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
334 # Preload but do not add to revealed group
335 preload_rodc_user(userdn)
337 self.kerberos = False
339 creds = self.lockout1ntlm_creds
341 # Open a second LDB connection with the user credentials. Use the
342 # command line credentials for informations like the domain, the realm
343 # and the workstation.
344 creds_lockout = self.insta_creds(creds)
347 creds_lockout.set_password("thatsAcomplPASS1x")
349 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
354 lastLogonTimestamp = 0
355 logoncount_relation = ''
356 lastlogon_relation = ''
358 res = self._check_account(userdn,
360 badPasswordTime=("greater", badPasswordTime),
361 logonCount=logonCount,
363 lastLogonTimestamp=lastLogonTimestamp,
364 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
365 msDSUserAccountControlComputed=0,
366 msg='lastlogontimestamp with wrong password')
367 badPasswordTime = int(res[0]["badPasswordTime"][0])
369 # BadPwdCount on RODC increases alongside RWDC
370 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
371 self.assertTrue('badPwdCount' in res[0])
372 self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
374 # Correct old password
375 creds_lockout.set_password(userpass)
377 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
379 # Wait for potential SendToSam...
382 # BadPwdCount on RODC decreases, but not the RWDC
383 res = self._check_account(userdn,
385 badPasswordTime=badPasswordTime,
386 logonCount=(logoncount_relation, logonCount),
387 lastLogon=('greater', lastLogon),
388 lastLogonTimestamp=lastLogonTimestamp,
389 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
390 msDSUserAccountControlComputed=0,
391 msg='badPwdCount not reset on RWDC')
393 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
394 self.assertTrue('badPwdCount' in res[0])
395 self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
397 def _test_login_lockout_rodc_rwdc(self, creds, userdn):
398 username = creds.get_username()
399 userpass = creds.get_password()
401 # Open a second LDB connection with the user credentials. Use the
402 # command line credentials for informations like the domain, the realm
403 # and the workstation.
404 creds_lockout = self.insta_creds(creds)
407 creds_lockout.set_password("thatsAcomplPASS1x")
409 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
414 lastLogonTimestamp = 0
415 logoncount_relation = ''
416 lastlogon_relation = ''
418 res = self._check_account(userdn,
420 badPasswordTime=("greater", badPasswordTime),
421 logonCount=logonCount,
423 lastLogonTimestamp=lastLogonTimestamp,
424 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
425 msDSUserAccountControlComputed=0,
426 msg='lastlogontimestamp with wrong password')
427 badPasswordTime = int(res[0]["badPasswordTime"][0])
429 # Correct old password
430 creds_lockout.set_password(userpass)
432 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
434 # lastLogonTimestamp should not change
435 # lastLogon increases if badPwdCount is non-zero (!)
436 res = self._check_account(userdn,
438 badPasswordTime=badPasswordTime,
439 logonCount=(logoncount_relation, logonCount),
440 lastLogon=('greater', lastLogon),
441 lastLogonTimestamp=lastLogonTimestamp,
442 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
443 msDSUserAccountControlComputed=0,
444 msg='LLTimestamp is updated to lastlogon')
446 logonCount = int(res[0]["logonCount"][0])
447 lastLogon = int(res[0]["lastLogon"][0])
450 creds_lockout.set_password("thatsAcomplPASS1x")
452 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
454 res = self._check_account(userdn,
456 badPasswordTime=("greater", badPasswordTime),
457 logonCount=logonCount,
459 lastLogonTimestamp=lastLogonTimestamp,
460 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
461 msDSUserAccountControlComputed=0)
462 badPasswordTime = int(res[0]["badPasswordTime"][0])
465 creds_lockout.set_password("thatsAcomplPASS1x")
468 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
471 except LdbError as e1:
473 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
475 res = self._check_account(userdn,
477 badPasswordTime=("greater", badPasswordTime),
478 logonCount=logonCount,
480 lastLogonTimestamp=lastLogonTimestamp,
481 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
482 msDSUserAccountControlComputed=0)
483 badPasswordTime = int(res[0]["badPasswordTime"][0])
485 print("two failed password change")
488 creds_lockout.set_password("thatsAcomplPASS1x")
491 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
494 except LdbError as e2:
496 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
498 res = self._check_account(userdn,
500 badPasswordTime=("greater", badPasswordTime),
501 logonCount=logonCount,
503 lastLogonTimestamp=lastLogonTimestamp,
504 lockoutTime=("greater", badPasswordTime),
505 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
506 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
507 badPasswordTime = int(res[0]["badPasswordTime"][0])
508 lockoutTime = int(res[0]["lockoutTime"][0])
511 creds_lockout.set_password("thatsAcomplPASS1x")
513 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
515 except LdbError as e3:
517 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
519 res = self._check_account(userdn,
521 badPasswordTime=badPasswordTime,
522 logonCount=logonCount,
524 lastLogonTimestamp=lastLogonTimestamp,
525 lockoutTime=lockoutTime,
526 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
527 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
530 creds_lockout.set_password("thatsAcomplPASS1x")
532 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
534 except LdbError as e4:
536 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
538 res = self._check_account(userdn,
540 badPasswordTime=badPasswordTime,
541 logonCount=logonCount,
543 lastLogonTimestamp=lastLogonTimestamp,
544 lockoutTime=lockoutTime,
545 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
546 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
548 # The correct password, but we are locked out
549 creds_lockout.set_password(userpass)
551 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
553 except LdbError as e5:
555 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
557 res = self._check_account(userdn,
559 badPasswordTime=badPasswordTime,
560 logonCount=logonCount,
562 lastLogonTimestamp=lastLogonTimestamp,
563 lockoutTime=lockoutTime,
564 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
565 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
567 # wait for the lockout to end
568 time.sleep(self.account_lockout_duration + 1)
569 print(self.account_lockout_duration + 1)
571 res = self._check_account(userdn,
572 badPwdCount=3, effective_bad_password_count=0,
573 badPasswordTime=badPasswordTime,
574 logonCount=logonCount,
575 lockoutTime=lockoutTime,
577 lastLogonTimestamp=lastLogonTimestamp,
578 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
579 msDSUserAccountControlComputed=0)
581 # The correct password after letting the timeout expire
583 creds_lockout.set_password(userpass)
585 creds_lockout2 = self.insta_creds(creds_lockout)
587 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
590 res = self._check_account(userdn,
592 badPasswordTime=badPasswordTime,
593 logonCount=(logoncount_relation, logonCount),
594 lastLogon=(lastlogon_relation, lastLogon),
595 lastLogonTimestamp=lastLogonTimestamp,
596 lockoutTime=lockoutTime,
597 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
598 msDSUserAccountControlComputed=0,
599 msg="lastLogon is way off")
602 creds_lockout.set_password("thatsAcomplPASS1x")
604 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
606 except LdbError as e6:
608 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
610 res = self._check_account(userdn,
612 badPasswordTime=("greater", badPasswordTime),
613 logonCount=logonCount,
614 lockoutTime=lockoutTime,
616 lastLogonTimestamp=lastLogonTimestamp,
617 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
618 msDSUserAccountControlComputed=0)
619 badPasswordTime = int(res[0]["badPasswordTime"][0])
622 creds_lockout.set_password("thatsAcomplPASS1x")
624 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
626 except LdbError as e7:
628 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
630 res = self._check_account(userdn,
632 badPasswordTime=("greater", badPasswordTime),
633 logonCount=logonCount,
634 lockoutTime=lockoutTime,
636 lastLogonTimestamp=lastLogonTimestamp,
637 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
638 msDSUserAccountControlComputed=0)
639 badPasswordTime = int(res[0]["badPasswordTime"][0])
641 time.sleep(self.lockout_observation_window + 1)
643 res = self._check_account(userdn,
644 badPwdCount=2, effective_bad_password_count=0,
645 badPasswordTime=badPasswordTime,
646 logonCount=logonCount,
647 lockoutTime=lockoutTime,
649 lastLogonTimestamp=lastLogonTimestamp,
650 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
651 msDSUserAccountControlComputed=0)
654 creds_lockout.set_password("thatsAcomplPASS1x")
656 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
658 except LdbError as e8:
660 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
662 res = self._check_account(userdn,
664 badPasswordTime=("greater", badPasswordTime),
665 logonCount=logonCount,
666 lockoutTime=lockoutTime,
668 lastLogonTimestamp=lastLogonTimestamp,
669 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
670 msDSUserAccountControlComputed=0)
671 badPasswordTime = int(res[0]["badPasswordTime"][0])
673 # The correct password without letting the timeout expire
674 creds_lockout.set_password(userpass)
675 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
677 res = self._check_account(userdn,
679 badPasswordTime=badPasswordTime,
680 logonCount=(logoncount_relation, logonCount),
681 lockoutTime=lockoutTime,
682 lastLogon=("greater", lastLogon),
683 lastLogonTimestamp=lastLogonTimestamp,
684 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
685 msDSUserAccountControlComputed=0)
688 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
689 counter = itertools.count(1).next
691 def force_replication(self, base=None):
695 # XXX feels like a horrendous way to do it.
696 credstring = '-U%s%%%s' % (CREDS.get_username(),
697 CREDS.get_password())
698 cmd = ['bin/samba-tool',
704 p = subprocess.Popen(cmd,
705 stderr=subprocess.PIPE,
706 stdout=subprocess.PIPE)
707 stdout, stderr = p.communicate()
709 print("failed with code %s" % p.returncode)
715 raise RodcRwdcTestException()
717 def _check_account_initial(self, dn):
718 self.force_replication()
719 return super(RodcRwdcTests, self)._check_account_initial(dn)
722 super(RodcRwdcTests, self).tearDown()
723 self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
724 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
725 set_auto_replication(RWDC, True)
728 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
729 session_info=system_session(LP), lp=LP)
731 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
732 session_info=system_session(LP), lp=LP)
734 # Define variables for BasePasswordTestCase
736 self.global_creds = CREDS
738 self.host_url = 'ldap://%s' % RWDC
739 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
740 credentials=self.global_creds, lp=self.lp)
742 super(RodcRwdcTests, self).setUp()
744 self.host_url = 'ldap://%s' % RODC
745 self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
746 credentials=self.global_creds, lp=self.lp)
748 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
749 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
750 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
752 self.base_dn = self.rwdc_db.domain_dn()
754 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
755 attrs=['dsServiceName'])
756 self.service = root[0]['dsServiceName'][0]
757 self.tag = uuid.uuid4().hex
759 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
760 self.rwdc_db.set_dsheuristics("000000001")
762 set_auto_replication(RWDC, False)
764 # make sure DCs are synchronized before the test
765 self.force_replication()
766 self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
767 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
769 def delete_ldb_connections(self):
770 super(RodcRwdcTests, self).delete_ldb_connections()
774 def assertReferral(self, fn, *args, **kwargs):
777 self.fail("failed to raise ldap referral")
778 except ldb.LdbError as e9:
779 (code, msg) = e9.args
780 self.assertEqual(code, ldb.ERR_REFERRAL,
781 "expected referral, got %s %s" % (code, msg))
783 def _test_rodc_dsheuristics(self):
784 d = self.rodc_db.get_dsheuristics()
785 self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
786 self.assertReferral(self.rodc_db.set_dsheuristics, d)
788 def TEST_rodc_heuristics_kerberos(self):
789 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
790 self._test_rodc_dsheuristics()
792 def TEST_rodc_heuristics_ntlm(self):
793 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
794 self._test_rodc_dsheuristics()
796 def _test_add(self, objects, cross_ncs=False):
800 base = str(self.rwdc_db.get_config_basedn())
801 controls = ["search_options:1:2"]
802 cn = dn.split(',', 1)[0]
803 expression = '(%s)' % cn
810 res = self.rodc_db.search(base,
811 expression=expression,
812 scope=ldb.SCOPE_SUBTREE,
815 self.assertEqual(len(res), 0)
816 except ldb.LdbError as e:
817 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
822 except ldb.LdbError as e:
823 (ecode, emsg) = e.args
824 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
828 self.force_replication(base=base)
830 self.force_replication()
833 res = self.rodc_db.search(base,
834 expression=expression,
835 scope=ldb.SCOPE_SUBTREE,
838 self.assertEqual(len(res), 1)
839 except ldb.LdbError as e:
840 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
841 "replication seems to have failed")
843 def _test_add_replicated_objects(self, mode):
844 tag = "%s%s" % (self.tag, mode)
847 'dn': "ou=%s1,%s" % (tag, self.base_dn),
848 "objectclass": "organizationalUnit"
851 'dn': "cn=%s2,%s" % (tag, self.base_dn),
852 "objectclass": "user"
855 'dn': "cn=%s3,%s" % (tag, self.base_dn),
856 "objectclass": "group"
859 self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
860 self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
861 self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
863 def test_add_replicated_objects_kerberos(self):
864 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
865 self._test_add_replicated_objects('kerberos')
867 def test_add_replicated_objects_ntlm(self):
868 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
869 self._test_add_replicated_objects('ntlm')
871 def _test_add_replicated_connections(self, mode):
872 tag = "%s%s" % (self.tag, mode)
875 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
876 "objectclass": "NTDSConnection",
877 'enabledConnection': 'TRUE',
878 'fromServer': self.base_dn,
882 self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
884 def test_add_replicated_connections_kerberos(self):
885 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
886 self._test_add_replicated_connections('kerberos')
888 def test_add_replicated_connections_ntlm(self):
889 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
890 self._test_add_replicated_connections('ntlm')
892 def _test_modify_replicated_attributes(self):
893 dn = 'CN=Guest,CN=Users,' + self.base_dn
895 for attr in ['carLicense', 'middleName']:
897 m.dn = ldb.Dn(self.rwdc_db, dn)
898 m[attr] = ldb.MessageElement(value,
899 ldb.FLAG_MOD_REPLACE,
902 self.rwdc_db.modify(m)
903 except ldb.LdbError as e:
904 self.fail("Failed to modify %s %s on RWDC %s with %s" %
907 self.force_replication()
910 res = self.rodc_db.search(dn,
911 scope=ldb.SCOPE_SUBTREE,
913 results = [x[attr][0] for x in res]
914 self.assertEqual(results, [value])
915 except ldb.LdbError as e:
916 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
917 "replication seems to have failed")
919 def test_modify_replicated_attributes_kerberos(self):
920 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
921 self._test_modify_replicated_attributes()
923 def test_modify_replicated_attributes_ntlm(self):
924 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
925 self._test_modify_replicated_attributes()
927 def _test_add_modify_delete(self):
928 dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
929 values = ["%s%s" % (i, self.tag) for i in range(3)]
934 "objectclass": "user",
938 self.force_replication()
939 for value in values[1:]:
942 m.dn = ldb.Dn(self.rwdc_db, dn)
943 m[attr] = ldb.MessageElement(value,
944 ldb.FLAG_MOD_REPLACE,
947 self.rwdc_db.modify(m)
948 except ldb.LdbError as e:
949 self.fail("Failed to modify %s %s on RWDC %s with %s" %
952 self.force_replication()
955 res = self.rodc_db.search(dn,
956 scope=ldb.SCOPE_SUBTREE,
958 results = [x[attr][0] for x in res]
959 self.assertEqual(results, [value])
960 except ldb.LdbError as e:
961 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
962 "replication seems to have failed")
964 self.rwdc_db.delete(dn)
965 self.force_replication()
967 res = self.rodc_db.search(dn,
968 scope=ldb.SCOPE_SUBTREE,
971 self.fail("Failed to delete %s" % (dn))
972 except ldb.LdbError as e:
973 self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
974 "Failed to delete %s" % (dn))
976 def test_add_modify_delete_kerberos(self):
977 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
978 self._test_add_modify_delete()
980 def test_add_modify_delete_ntlm(self):
981 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
982 self._test_add_modify_delete()
985 username = "u%sX%s" % (self.tag[:12], self.counter())
986 password = 'password#1'
987 dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
990 "objectclass": "user",
991 'sAMAccountName': username,
995 except ldb.LdbError as e:
996 self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
998 self.rwdc_db.modify_ldif("dn: %s\n"
999 "changetype: modify\n"
1000 "delete: userPassword\n"
1001 "add: userPassword\n"
1002 "userPassword: %s\n" % (dn, password))
1003 self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1004 return (dn, username, password)
1006 def _change_password(self, user_dn, old_password, new_password):
1007 self.rwdc_db.modify_ldif(
1009 "changetype: modify\n"
1010 "delete: userPassword\n"
1011 "userPassword: %s\n"
1012 "add: userPassword\n"
1013 "userPassword: %s\n" % (user_dn, old_password, new_password))
1015 def try_ldap_logon(self, server, creds, errno=None):
1017 tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1018 session_info=system_session(LP), lp=LP)
1019 if errno is not None:
1020 self.fail("logon failed to fail with ldb error %s" % errno)
1021 except ldb.LdbError as e10:
1022 (code, msg) = e10.args
1025 self.fail("logon incorrectly raised ldb error (code=%s)" %
1028 self.fail("logon failed to raise correct ldb error"
1029 "Expected: %s Got: %s" %
1032 def zero_min_password_age(self):
1033 min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1034 if min_pwd_age != 0:
1035 self.rwdc_db.set_minPwdAge('0')
1037 def _test_ldap_change_password(self, errno=None):
1038 self.zero_min_password_age()
1040 dn, username, password = self._new_user()
1041 creds1 = make_creds(username, password)
1043 # With NTLM, this should fail on RODC before replication,
1044 # because the user isn't known.
1045 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1046 self.force_replication()
1048 # Now the user is replicated to RODC, so logon should work
1049 self.try_ldap_logon(RODC, creds1)
1051 passwords = ['password#%s' % i for i in range(1, 6)]
1052 for prev, password in zip(passwords[:-1], passwords[1:]):
1053 self._change_password(dn, prev, password)
1055 # The password has changed enough times to make the old
1056 # password invalid (though with kerberos that doesn't matter).
1057 # For NTLM, the old creds should always fail
1058 self.try_ldap_logon(RODC, creds1, errno)
1059 self.try_ldap_logon(RWDC, creds1, errno)
1061 creds2 = make_creds(username, password)
1063 # new creds work straight away with NTLM, because although it
1064 # doesn't have the password, it knows the user and forwards
1066 self.try_ldap_logon(RODC, creds2)
1067 self.try_ldap_logon(RWDC, creds2)
1069 self.force_replication()
1071 # After another replication check RODC still works and fails,
1072 # as appropriate to various creds
1073 self.try_ldap_logon(RODC, creds2)
1074 self.try_ldap_logon(RODC, creds1, errno)
1077 password = 'password#6'
1078 self._change_password(dn, prev, password)
1079 creds3 = make_creds(username, password)
1081 # previous password should still work.
1082 self.try_ldap_logon(RWDC, creds2)
1083 self.try_ldap_logon(RODC, creds2)
1085 # new password should still work.
1086 self.try_ldap_logon(RWDC, creds3)
1087 self.try_ldap_logon(RODC, creds3)
1089 # old password should still fail (but not on kerberos).
1090 self.try_ldap_logon(RWDC, creds1, errno)
1091 self.try_ldap_logon(RODC, creds1, errno)
1093 def test_ldap_change_password_kerberos(self):
1094 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1095 self._test_ldap_change_password()
1097 def test_ldap_change_password_ntlm(self):
1098 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1099 self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1101 def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1102 self.zero_min_password_age()
1104 res = self.rodc_db.search(self.rodc_dn,
1105 scope=ldb.SCOPE_BASE,
1106 attrs=['msDS-RevealOnDemandGroup'])
1108 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1110 user_dn, username, password = self._new_user()
1111 creds1 = make_creds(username, password)
1114 m.dn = ldb.Dn(self.rwdc_db, group)
1115 m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1116 self.rwdc_db.modify(m)
1118 # Against Windows, this will just forward if no account exists on the KDC
1119 # Therefore, this does not error on Windows.
1120 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1122 self.force_replication()
1125 self.try_ldap_logon(RODC, creds1)
1126 preload_rodc_user(user_dn)
1128 # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1129 self.try_ldap_logon(RODC, creds1)
1131 passwords = ['password#%s' % i for i in range(1, 6)]
1132 for prev, password in zip(passwords[:-1], passwords[1:]):
1133 self._change_password(user_dn, prev, password)
1135 # The password has changed enough times to make the old
1136 # password invalid, but the RODC shouldn't know that.
1137 self.try_ldap_logon(RODC, creds1)
1138 self.try_ldap_logon(RWDC, creds1, errno)
1140 creds2 = make_creds(username, password)
1141 self.try_ldap_logon(RWDC, creds2)
1142 # We can forward WRONG_PASSWORD over NTLM.
1143 # This SHOULD succeed.
1144 self.try_ldap_logon(RODC, creds2)
1146 def test_change_password_reveal_on_demand_ntlm(self):
1147 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1148 self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1150 def test_change_password_reveal_on_demand_kerberos(self):
1151 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1152 self._test_ldap_change_password_reveal_on_demand()
1154 def test_login_lockout_krb5(self):
1155 username = self.lockout1krb5_creds.get_username()
1156 userpass = self.lockout1krb5_creds.get_password()
1157 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1159 preload_rodc_user(userdn)
1161 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1162 fail_creds = self.insta_creds(self.template_creds,
1164 userpass=userpass + "X",
1165 kerberos_state=use_kerberos)
1168 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1170 except LdbError as e11:
1171 (num, msg) = e11.args
1172 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1174 # Succeed to reset everything to 0
1175 success_creds = self.insta_creds(self.template_creds,
1178 kerberos_state=use_kerberos)
1180 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1182 self._test_login_lockout(self.lockout1krb5_creds)
1184 def test_login_lockout_ntlm(self):
1185 username = self.lockout1ntlm_creds.get_username()
1186 userpass = self.lockout1ntlm_creds.get_password()
1187 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1189 preload_rodc_user(userdn)
1191 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1192 fail_creds = self.insta_creds(self.template_creds,
1194 userpass=userpass + "X",
1195 kerberos_state=use_kerberos)
1198 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1200 except LdbError as e12:
1201 (num, msg) = e12.args
1202 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1204 # Succeed to reset everything to 0
1205 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1207 self._test_login_lockout(self.lockout1ntlm_creds)
1209 def test_multiple_logon_krb5(self):
1210 username = self.lockout1krb5_creds.get_username()
1211 userpass = self.lockout1krb5_creds.get_password()
1212 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1214 preload_rodc_user(userdn)
1216 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1217 fail_creds = self.insta_creds(self.template_creds,
1219 userpass=userpass + "X",
1220 kerberos_state=use_kerberos)
1223 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1225 except LdbError as e13:
1226 (num, msg) = e13.args
1227 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1229 # Succeed to reset everything to 0
1230 success_creds = self.insta_creds(self.template_creds,
1233 kerberos_state=use_kerberos)
1235 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1237 self._test_multiple_logon(self.lockout1krb5_creds)
1239 def test_multiple_logon_ntlm(self):
1240 username = self.lockout1ntlm_creds.get_username()
1241 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1242 userpass = self.lockout1ntlm_creds.get_password()
1244 preload_rodc_user(userdn)
1246 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1247 fail_creds = self.insta_creds(self.template_creds,
1249 userpass=userpass + "X",
1250 kerberos_state=use_kerberos)
1253 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1255 except LdbError as e14:
1256 (num, msg) = e14.args
1257 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1259 # Succeed to reset everything to 0
1260 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1262 self._test_multiple_logon(self.lockout1ntlm_creds)
1266 global RODC, RWDC, CREDS, LP
1267 parser = optparse.OptionParser(
1268 "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1270 sambaopts = options.SambaOptions(parser)
1271 versionopts = options.VersionOptions(parser)
1272 credopts = options.CredentialsOptions(parser)
1273 subunitopts = SubunitOptions(parser)
1275 parser.add_option_group(sambaopts)
1276 parser.add_option_group(versionopts)
1277 parser.add_option_group(credopts)
1278 parser.add_option_group(subunitopts)
1280 opts, args = parser.parse_args()
1282 LP = sambaopts.get_loadparm()
1283 CREDS = credopts.get_credentials(LP)
1284 CREDS.set_gensec_features(CREDS.get_gensec_features() |
1285 gensec.FEATURE_SEAL)
1290 parser.print_usage()
1293 set_auto_replication(RWDC, True)
1295 TestProgram(module=__name__, opts=subunitopts)
1297 set_auto_replication(RWDC, True)