2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
6 How does it work when the password is changed on the RWDC?
17 sys.path.insert(0, "bin/python")
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
31 import password_lockout_base
33 def passwd_encode(pw):
34 return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
37 class RodcRwdcTestException(Exception):
41 def make_creds(username, password, kerberos_state=None):
42 # use the global CREDS as a template
44 c.set_username(username)
45 c.set_password(password)
46 c.set_domain(CREDS.get_domain())
47 c.set_realm(CREDS.get_realm())
48 c.set_workstation(CREDS.get_workstation())
50 if kerberos_state is None:
51 kerberos_state = CREDS.get_kerberos_state()
52 c.set_kerberos_state(kerberos_state)
55 if kerberos_state == MUST_USE_KERBEROS:
56 print("we seem to be using kerberos for %s %s" % (username, password))
57 elif kerberos_state == DONT_USE_KERBEROS:
58 print("NOT using kerberos for %s %s" % (username, password))
60 print("kerberos state is %s" % kerberos_state)
62 c.set_gensec_features(c.get_gensec_features() |
67 def set_auto_replication(dc, allow):
68 credstring = '-U%s%%%s' % (CREDS.get_username(),
71 on_or_off = '-' if allow else '+'
73 for opt in ['DISABLE_INBOUND_REPL',
74 'DISABLE_OUTBOUND_REPL']:
75 cmd = ['bin/samba-tool',
78 "--dsa-option=%s%s" % (on_or_off, opt)]
80 p = subprocess.Popen(cmd,
81 stderr=subprocess.PIPE,
82 stdout=subprocess.PIPE)
83 stdout, stderr = p.communicate()
85 if 'LDAP_REFERRAL' not in stderr:
86 raise RodcRwdcTestException()
87 print("ignoring +%s REFERRAL error; assuming %s is RODC" %
91 def preload_rodc_user(user_dn):
92 credstring = '-U%s%%%s' % (CREDS.get_username(),
95 set_auto_replication(RWDC, True)
96 cmd = ['bin/samba-tool',
103 subprocess.check_call(cmd)
104 set_auto_replication(RWDC, False)
108 def get_server_ref_from_samdb(samdb):
109 server_name = samdb.get_serverName()
110 res = samdb.search(server_name,
111 scope=ldb.SCOPE_BASE,
112 attrs=['serverReference'])
114 return res[0]['serverReference'][0]
116 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
117 counter = itertools.count(1).next
119 def _check_account_initial(self, dn):
120 self.force_replication()
121 return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
123 def _check_account(self, dn,
125 badPasswordTime=None,
128 lastLogonTimestamp=None,
130 userAccountControl=None,
131 msDSUserAccountControlComputed=None,
132 effective_bad_password_count=None,
134 badPwdCountOnly=False):
135 # Wait for the RWDC to get any delayed messages
136 # e.g. SendToSam or KRB5 bad passwords via winbindd
137 if (self.kerberos and isinstance(badPasswordTime, tuple) or
141 return super(RodcRwdcCachedTests,
142 self)._check_account(dn, badPwdCount, badPasswordTime,
143 logonCount, lastLogon,
144 lastLogonTimestamp, lockoutTime,
146 msDSUserAccountControlComputed,
147 effective_bad_password_count, msg,
150 def force_replication(self, base=None):
154 # XXX feels like a horrendous way to do it.
155 credstring = '-U%s%%%s' % (CREDS.get_username(),
156 CREDS.get_password())
157 cmd = ['bin/samba-tool',
163 p = subprocess.Popen(cmd,
164 stderr=subprocess.PIPE,
165 stdout=subprocess.PIPE)
166 stdout, stderr = p.communicate()
168 print("failed with code %s" % p.returncode)
174 raise RodcRwdcTestException()
176 def _change_password(self, user_dn, old_password, new_password):
177 self.rwdc_db.modify_ldif(
179 "changetype: modify\n"
180 "delete: userPassword\n"
182 "add: userPassword\n"
183 "userPassword: %s\n" % (user_dn, old_password, new_password))
186 super(RodcRwdcCachedTests, self).tearDown()
187 set_auto_replication(RWDC, True)
190 self.kerberos = False # To be set later
192 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
193 session_info=system_session(LP), lp=LP)
195 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
196 session_info=system_session(LP), lp=LP)
198 # Define variables for BasePasswordTestCase
200 self.global_creds = CREDS
202 self.host_url = 'ldap://%s' % RWDC
203 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
204 credentials=self.global_creds, lp=self.lp)
206 super(RodcRwdcCachedTests, self).setUp()
207 self.host_url = 'ldap://%s' % RODC
209 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
210 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
211 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
213 self.base_dn = self.rwdc_db.domain_dn()
215 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
216 attrs=['dsServiceName'])
217 self.service = root[0]['dsServiceName'][0]
218 self.tag = uuid.uuid4().hex
220 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
221 self.rwdc_db.set_dsheuristics("000000001")
223 set_auto_replication(RWDC, False)
225 # make sure DCs are synchronized before the test
226 self.force_replication()
228 def delete_ldb_connections(self):
229 super(RodcRwdcCachedTests, self).delete_ldb_connections()
233 def test_cache_and_flush_password(self):
234 username = self.lockout1krb5_creds.get_username()
235 userpass = self.lockout1krb5_creds.get_password()
236 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
238 ldb_system = SamDB(session_info=system_session(self.lp),
239 credentials=self.global_creds, lp=self.lp)
241 res = ldb_system.search(userdn, attrs=['unicodePwd'])
242 self.assertFalse('unicodePwd' in res[0])
244 preload_rodc_user(userdn)
246 res = ldb_system.search(userdn, attrs=['unicodePwd'])
247 self.assertTrue('unicodePwd' in res[0])
249 newpass = userpass + '!'
251 # Forcing replication should blank out password (when changed)
252 self._change_password(userdn, userpass, newpass)
253 self.force_replication()
255 res = ldb_system.search(userdn, attrs=['unicodePwd'])
256 self.assertFalse('unicodePwd' in res[0])
258 def test_login_lockout_krb5(self):
259 username = self.lockout1krb5_creds.get_username()
260 userpass = self.lockout1krb5_creds.get_password()
261 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
263 preload_rodc_user(userdn)
267 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
269 res = self.rodc_db.search(self.rodc_dn,
270 scope=ldb.SCOPE_BASE,
271 attrs=['msDS-RevealOnDemandGroup'])
273 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
276 m.dn = ldb.Dn(self.rwdc_db, group)
277 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
278 self.rwdc_db.modify(m)
281 m.dn = ldb.Dn(self.ldb, self.base_dn)
283 self.account_lockout_duration = 10
284 account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
286 m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
287 ldb.FLAG_MOD_REPLACE,
290 self.lockout_observation_window = 10
291 lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
293 m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
294 ldb.FLAG_MOD_REPLACE,
295 "lockOutObservationWindow")
297 self.rwdc_db.modify(m)
298 self.force_replication()
300 self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
302 def test_login_lockout_ntlm(self):
303 username = self.lockout1ntlm_creds.get_username()
304 userpass = self.lockout1ntlm_creds.get_password()
305 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
307 preload_rodc_user(userdn)
309 self.kerberos = False
311 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
313 res = self.rodc_db.search(self.rodc_dn,
314 scope=ldb.SCOPE_BASE,
315 attrs=['msDS-RevealOnDemandGroup'])
317 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
320 m.dn = ldb.Dn(self.rwdc_db, group)
321 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
322 self.rwdc_db.modify(m)
324 self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
326 def test_login_lockout_not_revealed(self):
327 '''Test that SendToSam is restricted by preloaded users/groups'''
329 username = self.lockout1ntlm_creds.get_username()
330 userpass = self.lockout1ntlm_creds.get_password()
331 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
333 # Preload but do not add to revealed group
334 preload_rodc_user(userdn)
336 self.kerberos = False
338 creds = self.lockout1ntlm_creds
340 # Open a second LDB connection with the user credentials. Use the
341 # command line credentials for informations like the domain, the realm
342 # and the workstation.
343 creds_lockout = self.insta_creds(creds)
346 creds_lockout.set_password("thatsAcomplPASS1x")
348 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
353 lastLogonTimestamp = 0
354 logoncount_relation = ''
355 lastlogon_relation = ''
357 res = self._check_account(userdn,
359 badPasswordTime=("greater", badPasswordTime),
360 logonCount=logonCount,
362 lastLogonTimestamp=lastLogonTimestamp,
363 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
364 msDSUserAccountControlComputed=0,
365 msg='lastlogontimestamp with wrong password')
366 badPasswordTime = int(res[0]["badPasswordTime"][0])
368 # BadPwdCount on RODC increases alongside RWDC
369 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
370 self.assertTrue('badPwdCount' in res[0])
371 self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
373 # Correct old password
374 creds_lockout.set_password(userpass)
376 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
378 # Wait for potential SendToSam...
381 # BadPwdCount on RODC decreases, but not the RWDC
382 res = self._check_account(userdn,
384 badPasswordTime=badPasswordTime,
385 logonCount=(logoncount_relation, logonCount),
386 lastLogon=('greater', lastLogon),
387 lastLogonTimestamp=lastLogonTimestamp,
388 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
389 msDSUserAccountControlComputed=0,
390 msg='badPwdCount not reset on RWDC')
392 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
393 self.assertTrue('badPwdCount' in res[0])
394 self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
396 def _test_login_lockout_rodc_rwdc(self, creds, userdn):
397 username = creds.get_username()
398 userpass = creds.get_password()
400 # Open a second LDB connection with the user credentials. Use the
401 # command line credentials for informations like the domain, the realm
402 # and the workstation.
403 creds_lockout = self.insta_creds(creds)
406 creds_lockout.set_password("thatsAcomplPASS1x")
408 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
413 lastLogonTimestamp = 0
414 logoncount_relation = ''
415 lastlogon_relation = ''
417 res = self._check_account(userdn,
419 badPasswordTime=("greater", badPasswordTime),
420 logonCount=logonCount,
422 lastLogonTimestamp=lastLogonTimestamp,
423 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
424 msDSUserAccountControlComputed=0,
425 msg='lastlogontimestamp with wrong password')
426 badPasswordTime = int(res[0]["badPasswordTime"][0])
428 # Correct old password
429 creds_lockout.set_password(userpass)
431 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
433 # lastLogonTimestamp should not change
434 # lastLogon increases if badPwdCount is non-zero (!)
435 res = self._check_account(userdn,
437 badPasswordTime=badPasswordTime,
438 logonCount=(logoncount_relation, logonCount),
439 lastLogon=('greater', lastLogon),
440 lastLogonTimestamp=lastLogonTimestamp,
441 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
442 msDSUserAccountControlComputed=0,
443 msg='LLTimestamp is updated to lastlogon')
445 logonCount = int(res[0]["logonCount"][0])
446 lastLogon = int(res[0]["lastLogon"][0])
449 creds_lockout.set_password("thatsAcomplPASS1x")
451 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
453 res = self._check_account(userdn,
455 badPasswordTime=("greater", badPasswordTime),
456 logonCount=logonCount,
458 lastLogonTimestamp=lastLogonTimestamp,
459 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
460 msDSUserAccountControlComputed=0)
461 badPasswordTime = int(res[0]["badPasswordTime"][0])
464 creds_lockout.set_password("thatsAcomplPASS1x")
467 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
470 except LdbError as e1:
472 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
474 res = self._check_account(userdn,
476 badPasswordTime=("greater", badPasswordTime),
477 logonCount=logonCount,
479 lastLogonTimestamp=lastLogonTimestamp,
480 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
481 msDSUserAccountControlComputed=0)
482 badPasswordTime = int(res[0]["badPasswordTime"][0])
484 print("two failed password change")
487 creds_lockout.set_password("thatsAcomplPASS1x")
490 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
493 except LdbError as e2:
495 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
497 res = self._check_account(userdn,
499 badPasswordTime=("greater", badPasswordTime),
500 logonCount=logonCount,
502 lastLogonTimestamp=lastLogonTimestamp,
503 lockoutTime=("greater", badPasswordTime),
504 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
505 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
506 badPasswordTime = int(res[0]["badPasswordTime"][0])
507 lockoutTime = int(res[0]["lockoutTime"][0])
510 creds_lockout.set_password("thatsAcomplPASS1x")
512 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
514 except LdbError as e3:
516 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
518 res = self._check_account(userdn,
520 badPasswordTime=badPasswordTime,
521 logonCount=logonCount,
523 lastLogonTimestamp=lastLogonTimestamp,
524 lockoutTime=lockoutTime,
525 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
526 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
529 creds_lockout.set_password("thatsAcomplPASS1x")
531 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
533 except LdbError as e4:
535 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
537 res = self._check_account(userdn,
539 badPasswordTime=badPasswordTime,
540 logonCount=logonCount,
542 lastLogonTimestamp=lastLogonTimestamp,
543 lockoutTime=lockoutTime,
544 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
545 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
547 # The correct password, but we are locked out
548 creds_lockout.set_password(userpass)
550 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
552 except LdbError as e5:
554 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
556 res = self._check_account(userdn,
558 badPasswordTime=badPasswordTime,
559 logonCount=logonCount,
561 lastLogonTimestamp=lastLogonTimestamp,
562 lockoutTime=lockoutTime,
563 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
564 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
566 # wait for the lockout to end
567 time.sleep(self.account_lockout_duration + 1)
568 print(self.account_lockout_duration + 1)
570 res = self._check_account(userdn,
571 badPwdCount=3, effective_bad_password_count=0,
572 badPasswordTime=badPasswordTime,
573 logonCount=logonCount,
574 lockoutTime=lockoutTime,
576 lastLogonTimestamp=lastLogonTimestamp,
577 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
578 msDSUserAccountControlComputed=0)
580 # The correct password after letting the timeout expire
582 creds_lockout.set_password(userpass)
584 creds_lockout2 = self.insta_creds(creds_lockout)
586 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
589 res = self._check_account(userdn,
591 badPasswordTime=badPasswordTime,
592 logonCount=(logoncount_relation, logonCount),
593 lastLogon=(lastlogon_relation, lastLogon),
594 lastLogonTimestamp=lastLogonTimestamp,
595 lockoutTime=lockoutTime,
596 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
597 msDSUserAccountControlComputed=0,
598 msg="lastLogon is way off")
601 creds_lockout.set_password("thatsAcomplPASS1x")
603 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
605 except LdbError as e6:
607 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
609 res = self._check_account(userdn,
611 badPasswordTime=("greater", badPasswordTime),
612 logonCount=logonCount,
613 lockoutTime=lockoutTime,
615 lastLogonTimestamp=lastLogonTimestamp,
616 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
617 msDSUserAccountControlComputed=0)
618 badPasswordTime = int(res[0]["badPasswordTime"][0])
621 creds_lockout.set_password("thatsAcomplPASS1x")
623 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
625 except LdbError as e7:
627 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
629 res = self._check_account(userdn,
631 badPasswordTime=("greater", badPasswordTime),
632 logonCount=logonCount,
633 lockoutTime=lockoutTime,
635 lastLogonTimestamp=lastLogonTimestamp,
636 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
637 msDSUserAccountControlComputed=0)
638 badPasswordTime = int(res[0]["badPasswordTime"][0])
640 time.sleep(self.lockout_observation_window + 1)
642 res = self._check_account(userdn,
643 badPwdCount=2, effective_bad_password_count=0,
644 badPasswordTime=badPasswordTime,
645 logonCount=logonCount,
646 lockoutTime=lockoutTime,
648 lastLogonTimestamp=lastLogonTimestamp,
649 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
650 msDSUserAccountControlComputed=0)
653 creds_lockout.set_password("thatsAcomplPASS1x")
655 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
657 except LdbError as e8:
659 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
661 res = self._check_account(userdn,
663 badPasswordTime=("greater", badPasswordTime),
664 logonCount=logonCount,
665 lockoutTime=lockoutTime,
667 lastLogonTimestamp=lastLogonTimestamp,
668 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
669 msDSUserAccountControlComputed=0)
670 badPasswordTime = int(res[0]["badPasswordTime"][0])
672 # The correct password without letting the timeout expire
673 creds_lockout.set_password(userpass)
674 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
676 res = self._check_account(userdn,
678 badPasswordTime=badPasswordTime,
679 logonCount=(logoncount_relation, logonCount),
680 lockoutTime=lockoutTime,
681 lastLogon=("greater", lastLogon),
682 lastLogonTimestamp=lastLogonTimestamp,
683 userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
684 msDSUserAccountControlComputed=0)
686 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
687 counter = itertools.count(1).next
689 def force_replication(self, base=None):
693 # XXX feels like a horrendous way to do it.
694 credstring = '-U%s%%%s' % (CREDS.get_username(),
695 CREDS.get_password())
696 cmd = ['bin/samba-tool',
702 p = subprocess.Popen(cmd,
703 stderr=subprocess.PIPE,
704 stdout=subprocess.PIPE)
705 stdout, stderr = p.communicate()
707 print("failed with code %s" % p.returncode)
713 raise RodcRwdcTestException()
715 def _check_account_initial(self, dn):
716 self.force_replication()
717 return super(RodcRwdcTests, self)._check_account_initial(dn)
720 super(RodcRwdcTests, self).tearDown()
721 self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
722 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
723 set_auto_replication(RWDC, True)
726 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
727 session_info=system_session(LP), lp=LP)
729 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
730 session_info=system_session(LP), lp=LP)
732 # Define variables for BasePasswordTestCase
734 self.global_creds = CREDS
736 self.host_url = 'ldap://%s' % RWDC
737 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
738 credentials=self.global_creds, lp=self.lp)
740 super(RodcRwdcTests, self).setUp()
742 self.host_url = 'ldap://%s' % RODC
743 self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
744 credentials=self.global_creds, lp=self.lp)
746 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
747 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
748 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
750 self.base_dn = self.rwdc_db.domain_dn()
752 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
753 attrs=['dsServiceName'])
754 self.service = root[0]['dsServiceName'][0]
755 self.tag = uuid.uuid4().hex
757 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
758 self.rwdc_db.set_dsheuristics("000000001")
760 set_auto_replication(RWDC, False)
762 # make sure DCs are synchronized before the test
763 self.force_replication()
764 self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
765 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
767 def delete_ldb_connections(self):
768 super(RodcRwdcTests, self).delete_ldb_connections()
772 def assertReferral(self, fn, *args, **kwargs):
775 self.fail("failed to raise ldap referral")
776 except ldb.LdbError as e9:
777 (code, msg) = e9.args
778 self.assertEqual(code, ldb.ERR_REFERRAL,
779 "expected referral, got %s %s" % (code, msg))
781 def _test_rodc_dsheuristics(self):
782 d = self.rodc_db.get_dsheuristics()
783 self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
784 self.assertReferral(self.rodc_db.set_dsheuristics, d)
786 def TEST_rodc_heuristics_kerberos(self):
787 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
788 self._test_rodc_dsheuristics()
790 def TEST_rodc_heuristics_ntlm(self):
791 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
792 self._test_rodc_dsheuristics()
794 def _test_add(self, objects, cross_ncs=False):
798 base = str(self.rwdc_db.get_config_basedn())
799 controls = ["search_options:1:2"]
800 cn = dn.split(',', 1)[0]
801 expression = '(%s)' % cn
808 res = self.rodc_db.search(base,
809 expression=expression,
810 scope=ldb.SCOPE_SUBTREE,
813 self.assertEqual(len(res), 0)
814 except ldb.LdbError as e:
815 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
820 except ldb.LdbError as e:
821 (ecode, emsg) = e.args
822 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
826 self.force_replication(base=base)
828 self.force_replication()
831 res = self.rodc_db.search(base,
832 expression=expression,
833 scope=ldb.SCOPE_SUBTREE,
836 self.assertEqual(len(res), 1)
837 except ldb.LdbError as e:
838 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
839 "replication seems to have failed")
841 def _test_add_replicated_objects(self, mode):
842 tag = "%s%s" % (self.tag, mode)
845 'dn': "ou=%s1,%s" % (tag, self.base_dn),
846 "objectclass": "organizationalUnit"
849 'dn': "cn=%s2,%s" % (tag, self.base_dn),
850 "objectclass": "user"
853 'dn': "cn=%s3,%s" % (tag, self.base_dn),
854 "objectclass": "group"
857 self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
858 self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
859 self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
861 def test_add_replicated_objects_kerberos(self):
862 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
863 self._test_add_replicated_objects('kerberos')
865 def test_add_replicated_objects_ntlm(self):
866 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
867 self._test_add_replicated_objects('ntlm')
869 def _test_add_replicated_connections(self, mode):
870 tag = "%s%s" % (self.tag, mode)
873 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
874 "objectclass": "NTDSConnection",
875 'enabledConnection': 'TRUE',
876 'fromServer': self.base_dn,
880 self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
882 def test_add_replicated_connections_kerberos(self):
883 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
884 self._test_add_replicated_connections('kerberos')
886 def test_add_replicated_connections_ntlm(self):
887 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
888 self._test_add_replicated_connections('ntlm')
890 def _test_modify_replicated_attributes(self):
891 dn = 'CN=Guest,CN=Users,' + self.base_dn
893 for attr in ['carLicense', 'middleName']:
895 m.dn = ldb.Dn(self.rwdc_db, dn)
896 m[attr] = ldb.MessageElement(value,
897 ldb.FLAG_MOD_REPLACE,
900 self.rwdc_db.modify(m)
901 except ldb.LdbError as e:
902 self.fail("Failed to modify %s %s on RWDC %s with %s" %
905 self.force_replication()
908 res = self.rodc_db.search(dn,
909 scope=ldb.SCOPE_SUBTREE,
911 results = [x[attr][0] for x in res]
912 self.assertEqual(results, [value])
913 except ldb.LdbError as e:
914 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
915 "replication seems to have failed")
917 def test_modify_replicated_attributes_kerberos(self):
918 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
919 self._test_modify_replicated_attributes()
921 def test_modify_replicated_attributes_ntlm(self):
922 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
923 self._test_modify_replicated_attributes()
925 def _test_add_modify_delete(self):
926 dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
927 values = ["%s%s" % (i, self.tag) for i in range(3)]
932 "objectclass": "user",
936 self.force_replication()
937 for value in values[1:]:
940 m.dn = ldb.Dn(self.rwdc_db, dn)
941 m[attr] = ldb.MessageElement(value,
942 ldb.FLAG_MOD_REPLACE,
945 self.rwdc_db.modify(m)
946 except ldb.LdbError as e:
947 self.fail("Failed to modify %s %s on RWDC %s with %s" %
950 self.force_replication()
953 res = self.rodc_db.search(dn,
954 scope=ldb.SCOPE_SUBTREE,
956 results = [x[attr][0] for x in res]
957 self.assertEqual(results, [value])
958 except ldb.LdbError as e:
959 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
960 "replication seems to have failed")
962 self.rwdc_db.delete(dn)
963 self.force_replication()
965 res = self.rodc_db.search(dn,
966 scope=ldb.SCOPE_SUBTREE,
969 self.fail("Failed to delete %s" % (dn))
970 except ldb.LdbError as e:
971 self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
972 "Failed to delete %s" % (dn))
974 def test_add_modify_delete_kerberos(self):
975 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
976 self._test_add_modify_delete()
978 def test_add_modify_delete_ntlm(self):
979 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
980 self._test_add_modify_delete()
983 username = "u%sX%s" % (self.tag[:12], self.counter())
984 password = 'password#1'
985 dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
988 "objectclass": "user",
989 'sAMAccountName': username,
993 except ldb.LdbError as e:
994 self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
996 self.rwdc_db.modify_ldif("dn: %s\n"
997 "changetype: modify\n"
998 "delete: userPassword\n"
999 "add: userPassword\n"
1000 "userPassword: %s\n" % (dn, password))
1001 self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1002 return (dn, username, password)
1004 def _change_password(self, user_dn, old_password, new_password):
1005 self.rwdc_db.modify_ldif(
1007 "changetype: modify\n"
1008 "delete: userPassword\n"
1009 "userPassword: %s\n"
1010 "add: userPassword\n"
1011 "userPassword: %s\n" % (user_dn, old_password, new_password))
1013 def try_ldap_logon(self, server, creds, errno=None):
1015 tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1016 session_info=system_session(LP), lp=LP)
1017 if errno is not None:
1018 self.fail("logon failed to fail with ldb error %s" % errno)
1019 except ldb.LdbError as e10:
1020 (code, msg) = e10.args
1023 self.fail("logon incorrectly raised ldb error (code=%s)" %
1026 self.fail("logon failed to raise correct ldb error"
1027 "Expected: %s Got: %s" %
1031 def zero_min_password_age(self):
1032 min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1033 if min_pwd_age != 0:
1034 self.rwdc_db.set_minPwdAge('0')
1036 def _test_ldap_change_password(self, errno=None):
1037 self.zero_min_password_age()
1039 dn, username, password = self._new_user()
1040 creds1 = make_creds(username, password)
1042 # With NTLM, this should fail on RODC before replication,
1043 # because the user isn't known.
1044 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1045 self.force_replication()
1047 # Now the user is replicated to RODC, so logon should work
1048 self.try_ldap_logon(RODC, creds1)
1050 passwords = ['password#%s' % i for i in range(1, 6)]
1051 for prev, password in zip(passwords[:-1], passwords[1:]):
1052 self._change_password(dn, prev, password)
1054 # The password has changed enough times to make the old
1055 # password invalid (though with kerberos that doesn't matter).
1056 # For NTLM, the old creds should always fail
1057 self.try_ldap_logon(RODC, creds1, errno)
1058 self.try_ldap_logon(RWDC, creds1, errno)
1060 creds2 = make_creds(username, password)
1062 # new creds work straight away with NTLM, because although it
1063 # doesn't have the password, it knows the user and forwards
1065 self.try_ldap_logon(RODC, creds2)
1066 self.try_ldap_logon(RWDC, creds2)
1068 self.force_replication()
1070 # After another replication check RODC still works and fails,
1071 # as appropriate to various creds
1072 self.try_ldap_logon(RODC, creds2)
1073 self.try_ldap_logon(RODC, creds1, errno)
1076 password = 'password#6'
1077 self._change_password(dn, prev, password)
1078 creds3 = make_creds(username, password)
1080 # previous password should still work.
1081 self.try_ldap_logon(RWDC, creds2)
1082 self.try_ldap_logon(RODC, creds2)
1084 # new password should still work.
1085 self.try_ldap_logon(RWDC, creds3)
1086 self.try_ldap_logon(RODC, creds3)
1088 # old password should still fail (but not on kerberos).
1089 self.try_ldap_logon(RWDC, creds1, errno)
1090 self.try_ldap_logon(RODC, creds1, errno)
1092 def test_ldap_change_password_kerberos(self):
1093 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1094 self._test_ldap_change_password()
1096 def test_ldap_change_password_ntlm(self):
1097 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1098 self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1100 def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1101 self.zero_min_password_age()
1103 res = self.rodc_db.search(self.rodc_dn,
1104 scope=ldb.SCOPE_BASE,
1105 attrs=['msDS-RevealOnDemandGroup'])
1107 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1109 user_dn, username, password = self._new_user()
1110 creds1 = make_creds(username, password)
1113 m.dn = ldb.Dn(self.rwdc_db, group)
1114 m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1115 self.rwdc_db.modify(m)
1117 # Against Windows, this will just forward if no account exists on the KDC
1118 # Therefore, this does not error on Windows.
1119 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1121 self.force_replication()
1124 self.try_ldap_logon(RODC, creds1)
1125 preload_rodc_user(user_dn)
1127 # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1128 self.try_ldap_logon(RODC, creds1)
1130 passwords = ['password#%s' % i for i in range(1, 6)]
1131 for prev, password in zip(passwords[:-1], passwords[1:]):
1132 self._change_password(user_dn, prev, password)
1134 # The password has changed enough times to make the old
1135 # password invalid, but the RODC shouldn't know that.
1136 self.try_ldap_logon(RODC, creds1)
1137 self.try_ldap_logon(RWDC, creds1, errno)
1139 creds2 = make_creds(username, password)
1140 self.try_ldap_logon(RWDC, creds2)
1141 # We can forward WRONG_PASSWORD over NTLM.
1142 # This SHOULD succeed.
1143 self.try_ldap_logon(RODC, creds2)
1146 def test_change_password_reveal_on_demand_ntlm(self):
1147 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1148 self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1150 def test_change_password_reveal_on_demand_kerberos(self):
1151 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1152 self._test_ldap_change_password_reveal_on_demand()
1154 def test_login_lockout_krb5(self):
1155 username = self.lockout1krb5_creds.get_username()
1156 userpass = self.lockout1krb5_creds.get_password()
1157 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1159 preload_rodc_user(userdn)
1161 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1162 fail_creds = self.insta_creds(self.template_creds,
1164 userpass=userpass + "X",
1165 kerberos_state=use_kerberos)
1168 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1170 except LdbError as e11:
1171 (num, msg) = e11.args
1172 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1174 # Succeed to reset everything to 0
1175 success_creds = self.insta_creds(self.template_creds,
1178 kerberos_state=use_kerberos)
1180 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1182 self._test_login_lockout(self.lockout1krb5_creds)
1184 def test_login_lockout_ntlm(self):
1185 username = self.lockout1ntlm_creds.get_username()
1186 userpass = self.lockout1ntlm_creds.get_password()
1187 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1189 preload_rodc_user(userdn)
1191 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1192 fail_creds = self.insta_creds(self.template_creds,
1194 userpass=userpass + "X",
1195 kerberos_state=use_kerberos)
1198 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1200 except LdbError as e12:
1201 (num, msg) = e12.args
1202 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1204 # Succeed to reset everything to 0
1205 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1207 self._test_login_lockout(self.lockout1ntlm_creds)
1209 def test_multiple_logon_krb5(self):
1210 username = self.lockout1krb5_creds.get_username()
1211 userpass = self.lockout1krb5_creds.get_password()
1212 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1214 preload_rodc_user(userdn)
1216 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1217 fail_creds = self.insta_creds(self.template_creds,
1219 userpass=userpass + "X",
1220 kerberos_state=use_kerberos)
1223 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1225 except LdbError as e13:
1226 (num, msg) = e13.args
1227 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1229 # Succeed to reset everything to 0
1230 success_creds = self.insta_creds(self.template_creds,
1233 kerberos_state=use_kerberos)
1235 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1237 self._test_multiple_logon(self.lockout1krb5_creds)
1239 def test_multiple_logon_ntlm(self):
1240 username = self.lockout1ntlm_creds.get_username()
1241 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1242 userpass = self.lockout1ntlm_creds.get_password()
1244 preload_rodc_user(userdn)
1246 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1247 fail_creds = self.insta_creds(self.template_creds,
1249 userpass=userpass + "X",
1250 kerberos_state=use_kerberos)
1253 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1255 except LdbError as e14:
1256 (num, msg) = e14.args
1257 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1259 # Succeed to reset everything to 0
1260 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1262 self._test_multiple_logon(self.lockout1ntlm_creds)
1265 global RODC, RWDC, CREDS, LP
1266 parser = optparse.OptionParser(
1267 "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1269 sambaopts = options.SambaOptions(parser)
1270 versionopts = options.VersionOptions(parser)
1271 credopts = options.CredentialsOptions(parser)
1272 subunitopts = SubunitOptions(parser)
1274 parser.add_option_group(sambaopts)
1275 parser.add_option_group(versionopts)
1276 parser.add_option_group(credopts)
1277 parser.add_option_group(subunitopts)
1279 opts, args = parser.parse_args()
1281 LP = sambaopts.get_loadparm()
1282 CREDS = credopts.get_credentials(LP)
1283 CREDS.set_gensec_features(CREDS.get_gensec_features() |
1284 gensec.FEATURE_SEAL)
1289 parser.print_usage()
1292 set_auto_replication(RWDC, True)
1294 TestProgram(module=__name__, opts=subunitopts)
1296 set_auto_replication(RWDC, True)