2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
6 How does it work when the password is changed on the RWDC?
17 sys.path.insert(0, "bin/python")
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
31 import password_lockout_base
33 def passwd_encode(pw):
34 return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
37 class RodcRwdcTestException(Exception):
41 def make_creds(username, password, kerberos_state=None):
42 # use the global CREDS as a template
44 c.set_username(username)
45 c.set_password(password)
46 c.set_domain(CREDS.get_domain())
47 c.set_realm(CREDS.get_realm())
48 c.set_workstation(CREDS.get_workstation())
50 if kerberos_state is None:
51 kerberos_state = CREDS.get_kerberos_state()
52 c.set_kerberos_state(kerberos_state)
55 if kerberos_state == MUST_USE_KERBEROS:
56 print("we seem to be using kerberos for %s %s" % (username, password))
57 elif kerberos_state == DONT_USE_KERBEROS:
58 print("NOT using kerberos for %s %s" % (username, password))
60 print("kerberos state is %s" % kerberos_state)
62 c.set_gensec_features(c.get_gensec_features() |
67 def set_auto_replication(dc, allow):
68 credstring = '-U%s%%%s' % (CREDS.get_username(),
71 on_or_off = '-' if allow else '+'
73 for opt in ['DISABLE_INBOUND_REPL',
74 'DISABLE_OUTBOUND_REPL']:
75 cmd = ['bin/samba-tool',
78 "--dsa-option=%s%s" % (on_or_off, opt)]
80 p = subprocess.Popen(cmd,
81 stderr=subprocess.PIPE,
82 stdout=subprocess.PIPE)
83 stdout, stderr = p.communicate()
85 if 'LDAP_REFERRAL' not in stderr:
86 raise RodcRwdcTestException()
87 print ("ignoring +%s REFERRAL error; assuming %s is RODC" %
91 def preload_rodc_user(user_dn):
92 credstring = '-U%s%%%s' % (CREDS.get_username(),
95 set_auto_replication(RWDC, True)
96 cmd = ['bin/samba-tool',
103 subprocess.check_call(cmd)
104 set_auto_replication(RWDC, False)
108 def get_server_ref_from_samdb(samdb):
109 server_name = samdb.get_serverName()
110 res = samdb.search(server_name,
111 scope=ldb.SCOPE_BASE,
112 attrs=['serverReference'])
114 return res[0]['serverReference'][0]
116 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
117 counter = itertools.count(1).next
119 def _check_account_initial(self, dn):
120 self.force_replication()
121 return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
123 def _check_account(self, dn,
125 badPasswordTime=None,
128 lastLogonTimestamp=None,
130 userAccountControl=None,
131 msDSUserAccountControlComputed=None,
132 effective_bad_password_count=None,
134 badPwdCountOnly=False):
135 # Wait for the RWDC to get any delayed messages
136 # e.g. SendToSam or KRB5 bad passwords via winbindd
137 if (self.kerberos and isinstance(badPasswordTime, tuple) or
141 return super(RodcRwdcCachedTests,
142 self)._check_account(dn, badPwdCount, badPasswordTime,
143 logonCount, lastLogon,
144 lastLogonTimestamp, lockoutTime,
146 msDSUserAccountControlComputed,
147 effective_bad_password_count, msg,
150 def force_replication(self, base=None):
154 # XXX feels like a horrendous way to do it.
155 credstring = '-U%s%%%s' % (CREDS.get_username(),
156 CREDS.get_password())
157 cmd = ['bin/samba-tool',
163 p = subprocess.Popen(cmd,
164 stderr=subprocess.PIPE,
165 stdout=subprocess.PIPE)
166 stdout, stderr = p.communicate()
168 print("failed with code %s" % p.returncode)
174 raise RodcRwdcTestException()
176 def _change_password(self, user_dn, old_password, new_password):
177 self.rwdc_db.modify_ldif(
179 "changetype: modify\n"
180 "delete: userPassword\n"
182 "add: userPassword\n"
183 "userPassword: %s\n" % (user_dn, old_password, new_password))
186 super(RodcRwdcCachedTests, self).tearDown()
187 set_auto_replication(RWDC, True)
190 self.kerberos = False # To be set later
192 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
193 session_info=system_session(LP), lp=LP)
195 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
196 session_info=system_session(LP), lp=LP)
198 # Define variables for BasePasswordTestCase
200 self.global_creds = CREDS
202 self.host_url = 'ldap://%s' % RWDC
203 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
204 credentials=self.global_creds, lp=self.lp)
206 super(RodcRwdcCachedTests, self).setUp()
207 self.host_url = 'ldap://%s' % RODC
209 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
210 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
211 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
213 self.base_dn = self.rwdc_db.domain_dn()
215 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
216 attrs=['dsServiceName'])
217 self.service = root[0]['dsServiceName'][0]
218 self.tag = uuid.uuid4().hex
220 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
221 self.rwdc_db.set_dsheuristics("000000001")
223 set_auto_replication(RWDC, False)
225 # make sure DCs are synchronized before the test
226 self.force_replication()
228 def delete_ldb_connections(self):
229 super(RodcRwdcCachedTests, self).delete_ldb_connections()
233 def test_cache_and_flush_password(self):
234 username = self.lockout1krb5_creds.get_username()
235 userpass = self.lockout1krb5_creds.get_password()
236 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
238 ldb_system = SamDB(session_info=system_session(self.lp),
239 credentials=self.global_creds, lp=self.lp)
241 res = ldb_system.search(userdn, attrs=['unicodePwd'])
242 self.assertFalse('unicodePwd' in res[0])
244 preload_rodc_user(userdn)
246 res = ldb_system.search(userdn, attrs=['unicodePwd'])
247 self.assertTrue('unicodePwd' in res[0])
249 newpass = userpass + '!'
251 # Forcing replication should blank out password (when changed)
252 self._change_password(userdn, userpass, newpass)
253 self.force_replication()
255 res = ldb_system.search(userdn, attrs=['unicodePwd'])
256 self.assertFalse('unicodePwd' in res[0])
258 def test_login_lockout_krb5(self):
259 username = self.lockout1krb5_creds.get_username()
260 userpass = self.lockout1krb5_creds.get_password()
261 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
263 preload_rodc_user(userdn)
267 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
269 res = self.rodc_db.search(self.rodc_dn,
270 scope=ldb.SCOPE_BASE,
271 attrs=['msDS-RevealOnDemandGroup'])
273 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
276 m.dn = ldb.Dn(self.rwdc_db, group)
277 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
278 self.rwdc_db.modify(m)
281 m.dn = ldb.Dn(self.ldb, self.base_dn)
283 self.account_lockout_duration = 10
284 account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
286 m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
287 ldb.FLAG_MOD_REPLACE,
290 self.lockout_observation_window = 10
291 lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
293 m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
294 ldb.FLAG_MOD_REPLACE,
295 "lockOutObservationWindow")
297 self.rwdc_db.modify(m)
298 self.force_replication()
300 self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
302 def test_login_lockout_ntlm(self):
303 username = self.lockout1ntlm_creds.get_username()
304 userpass = self.lockout1ntlm_creds.get_password()
305 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
307 preload_rodc_user(userdn)
309 self.kerberos = False
311 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
313 res = self.rodc_db.search(self.rodc_dn,
314 scope=ldb.SCOPE_BASE,
315 attrs=['msDS-RevealOnDemandGroup'])
317 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
320 m.dn = ldb.Dn(self.rwdc_db, group)
321 m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
322 self.rwdc_db.modify(m)
324 self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
326 def test_login_lockout_not_revealed(self):
327 '''Test that SendToSam is restricted by preloaded users/groups'''
329 username = self.lockout1ntlm_creds.get_username()
330 userpass = self.lockout1ntlm_creds.get_password()
331 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
333 # Preload but do not add to revealed group
334 preload_rodc_user(userdn)
336 self.kerberos = False
338 creds = self.lockout1ntlm_creds
340 # Open a second LDB connection with the user credentials. Use the
341 # command line credentials for informations like the domain, the realm
342 # and the workstation.
343 creds_lockout = self.insta_creds(creds)
346 creds_lockout.set_password("thatsAcomplPASS1x")
348 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
354 logoncount_relation = ''
355 lastlogon_relation = ''
357 res = self._check_account(userdn,
359 badPasswordTime=("greater", badPasswordTime),
360 logonCount=logonCount,
362 lastLogonTimestamp=lastLogonTimestamp,
364 dsdb.UF_NORMAL_ACCOUNT,
365 msDSUserAccountControlComputed=0,
366 msg='lastlogontimestamp with wrong password')
367 badPasswordTime = int(res[0]["badPasswordTime"][0])
369 # BadPwdCount on RODC increases alongside RWDC
370 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
371 self.assertTrue('badPwdCount' in res[0])
372 self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
374 # Correct old password
375 creds_lockout.set_password(userpass)
377 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
379 # Wait for potential SendToSam...
382 # BadPwdCount on RODC decreases, but not the RWDC
383 res = self._check_account(userdn,
385 badPasswordTime=badPasswordTime,
386 logonCount=(logoncount_relation, logonCount),
387 lastLogon=('greater', lastLogon),
388 lastLogonTimestamp=lastLogonTimestamp,
390 dsdb.UF_NORMAL_ACCOUNT,
391 msDSUserAccountControlComputed=0,
392 msg='badPwdCount not reset on RWDC')
394 res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
395 self.assertTrue('badPwdCount' in res[0])
396 self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
398 def _test_login_lockout_rodc_rwdc(self, creds, userdn):
399 username = creds.get_username()
400 userpass = creds.get_password()
402 # Open a second LDB connection with the user credentials. Use the
403 # command line credentials for informations like the domain, the realm
404 # and the workstation.
405 creds_lockout = self.insta_creds(creds)
408 creds_lockout.set_password("thatsAcomplPASS1x")
410 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
416 logoncount_relation = ''
417 lastlogon_relation = ''
419 res = self._check_account(userdn,
421 badPasswordTime=("greater", badPasswordTime),
422 logonCount=logonCount,
424 lastLogonTimestamp=lastLogonTimestamp,
426 dsdb.UF_NORMAL_ACCOUNT,
427 msDSUserAccountControlComputed=0,
428 msg='lastlogontimestamp with wrong password')
429 badPasswordTime = int(res[0]["badPasswordTime"][0])
431 # Correct old password
432 creds_lockout.set_password(userpass)
434 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
436 # lastLogonTimestamp should not change
437 # lastLogon increases if badPwdCount is non-zero (!)
438 res = self._check_account(userdn,
440 badPasswordTime=badPasswordTime,
441 logonCount=(logoncount_relation, logonCount),
442 lastLogon=('greater', lastLogon),
443 lastLogonTimestamp=lastLogonTimestamp,
445 dsdb.UF_NORMAL_ACCOUNT,
446 msDSUserAccountControlComputed=0,
447 msg='LLTimestamp is updated to lastlogon')
449 logonCount = int(res[0]["logonCount"][0])
450 lastLogon = int(res[0]["lastLogon"][0])
453 creds_lockout.set_password("thatsAcomplPASS1x")
455 self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
457 res = self._check_account(userdn,
459 badPasswordTime=("greater", badPasswordTime),
460 logonCount=logonCount,
462 lastLogonTimestamp=lastLogonTimestamp,
464 dsdb.UF_NORMAL_ACCOUNT,
465 msDSUserAccountControlComputed=0)
466 badPasswordTime = int(res[0]["badPasswordTime"][0])
469 creds_lockout.set_password("thatsAcomplPASS1x")
472 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
475 except LdbError as e1:
477 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
479 res = self._check_account(userdn,
481 badPasswordTime=("greater", badPasswordTime),
482 logonCount=logonCount,
484 lastLogonTimestamp=lastLogonTimestamp,
486 dsdb.UF_NORMAL_ACCOUNT,
487 msDSUserAccountControlComputed=0)
488 badPasswordTime = int(res[0]["badPasswordTime"][0])
490 print("two failed password change")
493 creds_lockout.set_password("thatsAcomplPASS1x")
496 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
499 except LdbError as e2:
501 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
503 res = self._check_account(userdn,
505 badPasswordTime=("greater", badPasswordTime),
506 logonCount=logonCount,
508 lastLogonTimestamp=lastLogonTimestamp,
509 lockoutTime=("greater", badPasswordTime),
511 dsdb.UF_NORMAL_ACCOUNT,
512 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
513 badPasswordTime = int(res[0]["badPasswordTime"][0])
514 lockoutTime = int(res[0]["lockoutTime"][0])
517 creds_lockout.set_password("thatsAcomplPASS1x")
519 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
521 except LdbError as e3:
523 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
525 res = self._check_account(userdn,
527 badPasswordTime=badPasswordTime,
528 logonCount=logonCount,
530 lastLogonTimestamp=lastLogonTimestamp,
531 lockoutTime=lockoutTime,
533 dsdb.UF_NORMAL_ACCOUNT,
534 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
537 creds_lockout.set_password("thatsAcomplPASS1x")
539 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
541 except LdbError as e4:
543 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
545 res = self._check_account(userdn,
547 badPasswordTime=badPasswordTime,
548 logonCount=logonCount,
550 lastLogonTimestamp=lastLogonTimestamp,
551 lockoutTime=lockoutTime,
553 dsdb.UF_NORMAL_ACCOUNT,
554 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
556 # The correct password, but we are locked out
557 creds_lockout.set_password(userpass)
559 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
561 except LdbError as e5:
563 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
565 res = self._check_account(userdn,
567 badPasswordTime=badPasswordTime,
568 logonCount=logonCount,
570 lastLogonTimestamp=lastLogonTimestamp,
571 lockoutTime=lockoutTime,
573 dsdb.UF_NORMAL_ACCOUNT,
574 msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
576 # wait for the lockout to end
577 time.sleep(self.account_lockout_duration + 1)
578 print(self.account_lockout_duration + 1)
580 res = self._check_account(userdn,
581 badPwdCount=3, effective_bad_password_count=0,
582 badPasswordTime=badPasswordTime,
583 logonCount=logonCount,
584 lockoutTime=lockoutTime,
586 lastLogonTimestamp=lastLogonTimestamp,
588 dsdb.UF_NORMAL_ACCOUNT,
589 msDSUserAccountControlComputed=0)
591 # The correct password after letting the timeout expire
593 creds_lockout.set_password(userpass)
595 creds_lockout2 = self.insta_creds(creds_lockout)
597 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
600 res = self._check_account(userdn,
602 badPasswordTime=badPasswordTime,
603 logonCount=(logoncount_relation, logonCount),
604 lastLogon=(lastlogon_relation, lastLogon),
605 lastLogonTimestamp=lastLogonTimestamp,
606 lockoutTime=lockoutTime,
608 dsdb.UF_NORMAL_ACCOUNT,
609 msDSUserAccountControlComputed=0,
610 msg="lastLogon is way off")
613 creds_lockout.set_password("thatsAcomplPASS1x")
615 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
617 except LdbError as e6:
619 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
621 res = self._check_account(userdn,
623 badPasswordTime=("greater", badPasswordTime),
624 logonCount=logonCount,
625 lockoutTime=lockoutTime,
627 lastLogonTimestamp=lastLogonTimestamp,
629 dsdb.UF_NORMAL_ACCOUNT,
630 msDSUserAccountControlComputed=0)
631 badPasswordTime = int(res[0]["badPasswordTime"][0])
634 creds_lockout.set_password("thatsAcomplPASS1x")
636 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
638 except LdbError as e7:
640 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
642 res = self._check_account(userdn,
644 badPasswordTime=("greater", badPasswordTime),
645 logonCount=logonCount,
646 lockoutTime=lockoutTime,
648 lastLogonTimestamp=lastLogonTimestamp,
650 dsdb.UF_NORMAL_ACCOUNT,
651 msDSUserAccountControlComputed=0)
652 badPasswordTime = int(res[0]["badPasswordTime"][0])
654 time.sleep(self.lockout_observation_window + 1)
656 res = self._check_account(userdn,
657 badPwdCount=2, effective_bad_password_count=0,
658 badPasswordTime=badPasswordTime,
659 logonCount=logonCount,
660 lockoutTime=lockoutTime,
662 lastLogonTimestamp=lastLogonTimestamp,
664 dsdb.UF_NORMAL_ACCOUNT,
665 msDSUserAccountControlComputed=0)
668 creds_lockout.set_password("thatsAcomplPASS1x")
670 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
672 except LdbError as e8:
674 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
676 res = self._check_account(userdn,
678 badPasswordTime=("greater", badPasswordTime),
679 logonCount=logonCount,
680 lockoutTime=lockoutTime,
682 lastLogonTimestamp=lastLogonTimestamp,
684 dsdb.UF_NORMAL_ACCOUNT,
685 msDSUserAccountControlComputed=0)
686 badPasswordTime = int(res[0]["badPasswordTime"][0])
688 # The correct password without letting the timeout expire
689 creds_lockout.set_password(userpass)
690 ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
692 res = self._check_account(userdn,
694 badPasswordTime=badPasswordTime,
695 logonCount=(logoncount_relation, logonCount),
696 lockoutTime=lockoutTime,
697 lastLogon=("greater", lastLogon),
698 lastLogonTimestamp=lastLogonTimestamp,
700 dsdb.UF_NORMAL_ACCOUNT,
701 msDSUserAccountControlComputed=0)
703 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
704 counter = itertools.count(1).next
706 def force_replication(self, base=None):
710 # XXX feels like a horrendous way to do it.
711 credstring = '-U%s%%%s' % (CREDS.get_username(),
712 CREDS.get_password())
713 cmd = ['bin/samba-tool',
719 p = subprocess.Popen(cmd,
720 stderr=subprocess.PIPE,
721 stdout=subprocess.PIPE)
722 stdout, stderr = p.communicate()
724 print("failed with code %s" % p.returncode)
730 raise RodcRwdcTestException()
732 def _check_account_initial(self, dn):
733 self.force_replication()
734 return super(RodcRwdcTests, self)._check_account_initial(dn)
737 super(RodcRwdcTests, self).tearDown()
738 self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
739 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
740 set_auto_replication(RWDC, True)
743 self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
744 session_info=system_session(LP), lp=LP)
746 self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
747 session_info=system_session(LP), lp=LP)
749 # Define variables for BasePasswordTestCase
751 self.global_creds = CREDS
753 self.host_url = 'ldap://%s' % RWDC
754 self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
755 credentials=self.global_creds, lp=self.lp)
757 super(RodcRwdcTests, self).setUp()
759 self.host_url = 'ldap://%s' % RODC
760 self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
761 credentials=self.global_creds, lp=self.lp)
763 self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
764 self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
765 self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
767 self.base_dn = self.rwdc_db.domain_dn()
769 root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
770 attrs=['dsServiceName'])
771 self.service = root[0]['dsServiceName'][0]
772 self.tag = uuid.uuid4().hex
774 self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
775 self.rwdc_db.set_dsheuristics("000000001")
777 set_auto_replication(RWDC, False)
779 # make sure DCs are synchronized before the test
780 self.force_replication()
781 self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
782 self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
784 def delete_ldb_connections(self):
785 super(RodcRwdcTests, self).delete_ldb_connections()
789 def assertReferral(self, fn, *args, **kwargs):
792 self.fail("failed to raise ldap referral")
793 except ldb.LdbError as e9:
794 (code, msg) = e9.args
795 self.assertEqual(code, ldb.ERR_REFERRAL,
796 "expected referral, got %s %s" % (code, msg))
798 def _test_rodc_dsheuristics(self):
799 d = self.rodc_db.get_dsheuristics()
800 self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
801 self.assertReferral(self.rodc_db.set_dsheuristics, d)
803 def TEST_rodc_heuristics_kerberos(self):
804 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
805 self._test_rodc_dsheuristics()
807 def TEST_rodc_heuristics_ntlm(self):
808 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
809 self._test_rodc_dsheuristics()
811 def _test_add(self, objects, cross_ncs=False):
815 base = str(self.rwdc_db.get_config_basedn())
816 controls = ["search_options:1:2"]
817 cn = dn.split(',', 1)[0]
818 expression = '(%s)' % cn
825 res = self.rodc_db.search(base,
826 expression=expression,
827 scope=ldb.SCOPE_SUBTREE,
830 self.assertEqual(len(res), 0)
831 except ldb.LdbError as e:
832 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
837 except ldb.LdbError as e:
838 (ecode, emsg) = e.args
839 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
843 self.force_replication(base=base)
845 self.force_replication()
848 res = self.rodc_db.search(base,
849 expression=expression,
850 scope=ldb.SCOPE_SUBTREE,
853 self.assertEqual(len(res), 1)
854 except ldb.LdbError as e:
855 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
856 "replication seems to have failed")
858 def _test_add_replicated_objects(self, mode):
859 tag = "%s%s" % (self.tag, mode)
862 'dn': "ou=%s1,%s" % (tag, self.base_dn),
863 "objectclass": "organizationalUnit"
866 'dn': "cn=%s2,%s" % (tag, self.base_dn),
867 "objectclass": "user"
870 'dn': "cn=%s3,%s" % (tag, self.base_dn),
871 "objectclass": "group"
874 self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
875 self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
876 self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
878 def test_add_replicated_objects_kerberos(self):
879 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
880 self._test_add_replicated_objects('kerberos')
882 def test_add_replicated_objects_ntlm(self):
883 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
884 self._test_add_replicated_objects('ntlm')
886 def _test_add_replicated_connections(self, mode):
887 tag = "%s%s" % (self.tag, mode)
890 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
891 "objectclass": "NTDSConnection",
892 'enabledConnection': 'TRUE',
893 'fromServer': self.base_dn,
897 self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
899 def test_add_replicated_connections_kerberos(self):
900 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
901 self._test_add_replicated_connections('kerberos')
903 def test_add_replicated_connections_ntlm(self):
904 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
905 self._test_add_replicated_connections('ntlm')
907 def _test_modify_replicated_attributes(self):
908 dn = 'CN=Guest,CN=Users,' + self.base_dn
910 for attr in ['carLicense', 'middleName']:
912 m.dn = ldb.Dn(self.rwdc_db, dn)
913 m[attr] = ldb.MessageElement(value,
914 ldb.FLAG_MOD_REPLACE,
917 self.rwdc_db.modify(m)
918 except ldb.LdbError as e:
919 self.fail("Failed to modify %s %s on RWDC %s with %s" %
922 self.force_replication()
925 res = self.rodc_db.search(dn,
926 scope=ldb.SCOPE_SUBTREE,
928 results = [x[attr][0] for x in res]
929 self.assertEqual(results, [value])
930 except ldb.LdbError as e:
931 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
932 "replication seems to have failed")
934 def test_modify_replicated_attributes_kerberos(self):
935 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
936 self._test_modify_replicated_attributes()
938 def test_modify_replicated_attributes_ntlm(self):
939 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
940 self._test_modify_replicated_attributes()
942 def _test_add_modify_delete(self):
943 dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
944 values = ["%s%s" % (i, self.tag) for i in range(3)]
949 "objectclass": "user",
953 self.force_replication()
954 for value in values[1:]:
957 m.dn = ldb.Dn(self.rwdc_db, dn)
958 m[attr] = ldb.MessageElement(value,
959 ldb.FLAG_MOD_REPLACE,
962 self.rwdc_db.modify(m)
963 except ldb.LdbError as e:
964 self.fail("Failed to modify %s %s on RWDC %s with %s" %
967 self.force_replication()
970 res = self.rodc_db.search(dn,
971 scope=ldb.SCOPE_SUBTREE,
973 results = [x[attr][0] for x in res]
974 self.assertEqual(results, [value])
975 except ldb.LdbError as e:
976 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
977 "replication seems to have failed")
979 self.rwdc_db.delete(dn)
980 self.force_replication()
982 res = self.rodc_db.search(dn,
983 scope=ldb.SCOPE_SUBTREE,
986 self.fail("Failed to delete %s" % (dn))
987 except ldb.LdbError as e:
988 self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
989 "Failed to delete %s" % (dn))
991 def test_add_modify_delete_kerberos(self):
992 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
993 self._test_add_modify_delete()
995 def test_add_modify_delete_ntlm(self):
996 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
997 self._test_add_modify_delete()
1000 username = "u%sX%s" % (self.tag[:12], self.counter())
1001 password = 'password#1'
1002 dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
1005 "objectclass": "user",
1006 'sAMAccountName': username,
1010 except ldb.LdbError as e:
1011 self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
1013 self.rwdc_db.modify_ldif("dn: %s\n"
1014 "changetype: modify\n"
1015 "delete: userPassword\n"
1016 "add: userPassword\n"
1017 "userPassword: %s\n" % (dn, password))
1018 self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1019 return (dn, username, password)
1021 def _change_password(self, user_dn, old_password, new_password):
1022 self.rwdc_db.modify_ldif(
1024 "changetype: modify\n"
1025 "delete: userPassword\n"
1026 "userPassword: %s\n"
1027 "add: userPassword\n"
1028 "userPassword: %s\n" % (user_dn, old_password, new_password))
1030 def try_ldap_logon(self, server, creds, errno=None):
1032 tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1033 session_info=system_session(LP), lp=LP)
1034 if errno is not None:
1035 self.fail("logon failed to fail with ldb error %s" % errno)
1036 except ldb.LdbError as e10:
1037 (code, msg) = e10.args
1040 self.fail("logon incorrectly raised ldb error (code=%s)" %
1043 self.fail("logon failed to raise correct ldb error"
1044 "Expected: %s Got: %s" %
1048 def zero_min_password_age(self):
1049 min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1050 if min_pwd_age != 0:
1051 self.rwdc_db.set_minPwdAge('0')
1053 def _test_ldap_change_password(self, errno=None):
1054 self.zero_min_password_age()
1056 dn, username, password = self._new_user()
1057 creds1 = make_creds(username, password)
1059 # With NTLM, this should fail on RODC before replication,
1060 # because the user isn't known.
1061 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1062 self.force_replication()
1064 # Now the user is replicated to RODC, so logon should work
1065 self.try_ldap_logon(RODC, creds1)
1067 passwords = ['password#%s' % i for i in range(1, 6)]
1068 for prev, password in zip(passwords[:-1], passwords[1:]):
1069 self._change_password(dn, prev, password)
1071 # The password has changed enough times to make the old
1072 # password invalid (though with kerberos that doesn't matter).
1073 # For NTLM, the old creds should always fail
1074 self.try_ldap_logon(RODC, creds1, errno)
1075 self.try_ldap_logon(RWDC, creds1, errno)
1077 creds2 = make_creds(username, password)
1079 # new creds work straight away with NTLM, because although it
1080 # doesn't have the password, it knows the user and forwards
1082 self.try_ldap_logon(RODC, creds2)
1083 self.try_ldap_logon(RWDC, creds2)
1085 self.force_replication()
1087 # After another replication check RODC still works and fails,
1088 # as appropriate to various creds
1089 self.try_ldap_logon(RODC, creds2)
1090 self.try_ldap_logon(RODC, creds1, errno)
1093 password = 'password#6'
1094 self._change_password(dn, prev, password)
1095 creds3 = make_creds(username, password)
1097 # previous password should still work.
1098 self.try_ldap_logon(RWDC, creds2)
1099 self.try_ldap_logon(RODC, creds2)
1101 # new password should still work.
1102 self.try_ldap_logon(RWDC, creds3)
1103 self.try_ldap_logon(RODC, creds3)
1105 # old password should still fail (but not on kerberos).
1106 self.try_ldap_logon(RWDC, creds1, errno)
1107 self.try_ldap_logon(RODC, creds1, errno)
1109 def test_ldap_change_password_kerberos(self):
1110 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1111 self._test_ldap_change_password()
1113 def test_ldap_change_password_ntlm(self):
1114 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1115 self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1117 def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1118 self.zero_min_password_age()
1120 res = self.rodc_db.search(self.rodc_dn,
1121 scope=ldb.SCOPE_BASE,
1122 attrs=['msDS-RevealOnDemandGroup'])
1124 group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1126 user_dn, username, password = self._new_user()
1127 creds1 = make_creds(username, password)
1130 m.dn = ldb.Dn(self.rwdc_db, group)
1131 m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1132 self.rwdc_db.modify(m)
1134 # Against Windows, this will just forward if no account exists on the KDC
1135 # Therefore, this does not error on Windows.
1136 self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1138 self.force_replication()
1141 self.try_ldap_logon(RODC, creds1)
1142 preload_rodc_user(user_dn)
1144 # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1145 self.try_ldap_logon(RODC, creds1)
1147 passwords = ['password#%s' % i for i in range(1, 6)]
1148 for prev, password in zip(passwords[:-1], passwords[1:]):
1149 self._change_password(user_dn, prev, password)
1151 # The password has changed enough times to make the old
1152 # password invalid, but the RODC shouldn't know that.
1153 self.try_ldap_logon(RODC, creds1)
1154 self.try_ldap_logon(RWDC, creds1, errno)
1156 creds2 = make_creds(username, password)
1157 self.try_ldap_logon(RWDC, creds2)
1158 # We can forward WRONG_PASSWORD over NTLM.
1159 # This SHOULD succeed.
1160 self.try_ldap_logon(RODC, creds2)
1163 def test_change_password_reveal_on_demand_ntlm(self):
1164 CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1165 self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1167 def test_change_password_reveal_on_demand_kerberos(self):
1168 CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1169 self._test_ldap_change_password_reveal_on_demand()
1171 def test_login_lockout_krb5(self):
1172 username = self.lockout1krb5_creds.get_username()
1173 userpass = self.lockout1krb5_creds.get_password()
1174 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1176 preload_rodc_user(userdn)
1178 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1179 fail_creds = self.insta_creds(self.template_creds,
1181 userpass=userpass+"X",
1182 kerberos_state=use_kerberos)
1185 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1187 except LdbError as e11:
1188 (num, msg) = e11.args
1189 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1191 # Succeed to reset everything to 0
1192 success_creds = self.insta_creds(self.template_creds,
1195 kerberos_state=use_kerberos)
1197 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1199 self._test_login_lockout(self.lockout1krb5_creds)
1201 def test_login_lockout_ntlm(self):
1202 username = self.lockout1ntlm_creds.get_username()
1203 userpass = self.lockout1ntlm_creds.get_password()
1204 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1206 preload_rodc_user(userdn)
1208 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1209 fail_creds = self.insta_creds(self.template_creds,
1211 userpass=userpass+"X",
1212 kerberos_state=use_kerberos)
1215 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1217 except LdbError as e12:
1218 (num, msg) = e12.args
1219 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1221 # Succeed to reset everything to 0
1222 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1224 self._test_login_lockout(self.lockout1ntlm_creds)
1226 def test_multiple_logon_krb5(self):
1227 username = self.lockout1krb5_creds.get_username()
1228 userpass = self.lockout1krb5_creds.get_password()
1229 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1231 preload_rodc_user(userdn)
1233 use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1234 fail_creds = self.insta_creds(self.template_creds,
1236 userpass=userpass+"X",
1237 kerberos_state=use_kerberos)
1240 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1242 except LdbError as e13:
1243 (num, msg) = e13.args
1244 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1246 # Succeed to reset everything to 0
1247 success_creds = self.insta_creds(self.template_creds,
1250 kerberos_state=use_kerberos)
1252 ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1254 self._test_multiple_logon(self.lockout1krb5_creds)
1256 def test_multiple_logon_ntlm(self):
1257 username = self.lockout1ntlm_creds.get_username()
1258 userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1259 userpass = self.lockout1ntlm_creds.get_password()
1261 preload_rodc_user(userdn)
1263 use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1264 fail_creds = self.insta_creds(self.template_creds,
1266 userpass=userpass+"X",
1267 kerberos_state=use_kerberos)
1270 ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1272 except LdbError as e14:
1273 (num, msg) = e14.args
1274 self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1276 # Succeed to reset everything to 0
1277 ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1279 self._test_multiple_logon(self.lockout1ntlm_creds)
1282 global RODC, RWDC, CREDS, LP
1283 parser = optparse.OptionParser(
1284 "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1286 sambaopts = options.SambaOptions(parser)
1287 versionopts = options.VersionOptions(parser)
1288 credopts = options.CredentialsOptions(parser)
1289 subunitopts = SubunitOptions(parser)
1291 parser.add_option_group(sambaopts)
1292 parser.add_option_group(versionopts)
1293 parser.add_option_group(credopts)
1294 parser.add_option_group(subunitopts)
1296 opts, args = parser.parse_args()
1298 LP = sambaopts.get_loadparm()
1299 CREDS = credopts.get_credentials(LP)
1300 CREDS.set_gensec_features(CREDS.get_gensec_features() |
1301 gensec.FEATURE_SEAL)
1306 parser.print_usage()
1309 set_auto_replication(RWDC, True)
1311 TestProgram(module=__name__, opts=subunitopts)
1313 set_auto_replication(RWDC, True)