userAccountControl=None,
msDSUserAccountControlComputed=None,
effective_bad_password_count=None,
- msg=None):
+ msg=None,
+ badPwdCountOnly=False):
print '-=' * 36
if msg is not None:
print "\033[01;32m %s \033[00m\n" % msg
res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs)
self.assertTrue(len(res) == 1)
self._check_attribute(res, "badPwdCount", badPwdCount)
- self._check_attribute(res, "badPasswordTime", badPasswordTime)
- self._check_attribute(res, "logonCount", logonCount)
- self._check_attribute(res, "lastLogon", lastLogon)
- self._check_attribute(res, "lastLogonTimestamp", lastLogonTimestamp)
self._check_attribute(res, "lockoutTime", lockoutTime)
- self._check_attribute(res, "userAccountControl", userAccountControl)
- self._check_attribute(res, "msDS-User-Account-Control-Computed",
- msDSUserAccountControlComputed)
+ self._check_attribute(res, "badPasswordTime", badPasswordTime)
+ if not badPwdCountOnly:
+ self._check_attribute(res, "logonCount", logonCount)
+ self._check_attribute(res, "lastLogon", lastLogon)
+ self._check_attribute(res, "lastLogonTimestamp", lastLogonTimestamp)
+ self._check_attribute(res, "userAccountControl", userAccountControl)
+ self._check_attribute(res, "msDS-User-Account-Control-Computed",
+ msDSUserAccountControlComputed)
- lastLogon = int(res[0]["lastLogon"][0])
- logonCount = int(res[0]["logonCount"][0])
+ lastLogon = int(res[0]["lastLogon"][0])
+ logonCount = int(res[0]["logonCount"][0])
samr_user = self._open_samr_user(res)
uinfo3 = self.samr.QueryUserInfo(samr_user, 3)
self.samr.Close(samr_user)
expected_acb_info = 0
- if userAccountControl & dsdb.UF_NORMAL_ACCOUNT:
- expected_acb_info |= samr.ACB_NORMAL
- if userAccountControl & dsdb.UF_ACCOUNTDISABLE:
- expected_acb_info |= samr.ACB_DISABLED
- if userAccountControl & dsdb.UF_PASSWD_NOTREQD:
- expected_acb_info |= samr.ACB_PWNOTREQ
- if msDSUserAccountControlComputed & dsdb.UF_LOCKOUT:
- expected_acb_info |= samr.ACB_AUTOLOCK
- if msDSUserAccountControlComputed & dsdb.UF_PASSWORD_EXPIRED:
- expected_acb_info |= samr.ACB_PW_EXPIRED
+ if not badPwdCountOnly:
+ if userAccountControl & dsdb.UF_NORMAL_ACCOUNT:
+ expected_acb_info |= samr.ACB_NORMAL
+ if userAccountControl & dsdb.UF_ACCOUNTDISABLE:
+ expected_acb_info |= samr.ACB_DISABLED
+ if userAccountControl & dsdb.UF_PASSWD_NOTREQD:
+ expected_acb_info |= samr.ACB_PWNOTREQ
+ if msDSUserAccountControlComputed & dsdb.UF_LOCKOUT:
+ expected_acb_info |= samr.ACB_AUTOLOCK
+ if msDSUserAccountControlComputed & dsdb.UF_PASSWORD_EXPIRED:
+ expected_acb_info |= samr.ACB_PW_EXPIRED
+
+ self.assertEquals(uinfo3.acct_flags, expected_acb_info)
+ self.assertEquals(uinfo3.last_logon, lastLogon)
+ self.assertEquals(uinfo3.logon_count, logonCount)
expected_bad_password_count = 0
if badPwdCount is not None:
if effective_bad_password_count is None:
effective_bad_password_count = expected_bad_password_count
- self.assertEquals(uinfo3.acct_flags, expected_acb_info)
self.assertEquals(uinfo3.bad_password_count, expected_bad_password_count)
- self.assertEquals(uinfo3.last_logon, lastLogon)
- self.assertEquals(uinfo3.logon_count, logonCount)
- self.assertEquals(uinfo5.acct_flags, expected_acb_info)
- self.assertEquals(uinfo5.bad_password_count, effective_bad_password_count)
- self.assertEquals(uinfo5.last_logon, lastLogon)
- self.assertEquals(uinfo5.logon_count, logonCount)
+ if not badPwdCountOnly:
+ self.assertEquals(uinfo5.acct_flags, expected_acb_info)
+ self.assertEquals(uinfo5.bad_password_count, effective_bad_password_count)
+ self.assertEquals(uinfo5.last_logon, lastLogon)
+ self.assertEquals(uinfo5.logon_count, logonCount)
+
+ self.assertEquals(uinfo16.acct_flags, expected_acb_info)
- self.assertEquals(uinfo16.acct_flags, expected_acb_info)
+ self.assertEquals(uinfo21.acct_flags, expected_acb_info)
+ self.assertEquals(uinfo21.bad_password_count, effective_bad_password_count)
+ self.assertEquals(uinfo21.last_logon, lastLogon)
+ self.assertEquals(uinfo21.logon_count, logonCount)
- self.assertEquals(uinfo21.acct_flags, expected_acb_info)
- self.assertEquals(uinfo21.bad_password_count, effective_bad_password_count)
- self.assertEquals(uinfo21.last_logon, lastLogon)
- self.assertEquals(uinfo21.logon_count, logonCount)
# check LDAP again and make sure the samr.QueryUserInfo
# doesn't have any impact.
return res[0]['serverReference'][0]
+class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
+ counter = itertools.count(1).next
+
+ def _check_account_initial(self, dn):
+ self.force_replication()
+ return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
+
+ def _check_account(self, dn,
+ badPwdCount=None,
+ badPasswordTime=None,
+ logonCount=None,
+ lastLogon=None,
+ lastLogonTimestamp=None,
+ lockoutTime=None,
+ userAccountControl=None,
+ msDSUserAccountControlComputed=None,
+ effective_bad_password_count=None,
+ msg=None,
+ badPwdCountOnly=False):
+ # Wait for the RWDC to get any delayed messages
+ # e.g. SendToSam or KRB5 bad passwords via winbindd
+ if (self.kerberos and isinstance(badPasswordTime, tuple) or
+ badPwdCount == 0):
+ time.sleep(5)
+
+ return super(RodcRwdcCachedTests,
+ self)._check_account(dn, badPwdCount, badPasswordTime,
+ logonCount, lastLogon,
+ lastLogonTimestamp, lockoutTime,
+ userAccountControl,
+ msDSUserAccountControlComputed,
+ effective_bad_password_count, msg,
+ True)
+
+ def force_replication(self, base=None):
+ if base is None:
+ base = self.base_dn
+
+ # XXX feels like a horrendous way to do it.
+ credstring = '-U%s%%%s' % (CREDS.get_username(),
+ CREDS.get_password())
+ cmd = ['bin/samba-tool',
+ 'drs', 'replicate',
+ RODC, RWDC, base,
+ credstring,
+ '--sync-forced']
+
+ p = subprocess.Popen(cmd,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ if p.returncode:
+ print "failed with code %s" % p.returncode
+ print ' '.join(cmd)
+ print "stdout"
+ print stdout
+ print "stderr"
+ print stderr
+ raise RodcRwdcTestException()
+
+ def tearDown(self):
+ super(RodcRwdcCachedTests, self).tearDown()
+ set_auto_replication(RWDC, True)
+
+ def setUp(self):
+ self.kerberos = False # To be set later
+
+ self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
+ session_info=system_session(LP), lp=LP)
+
+ self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
+ session_info=system_session(LP), lp=LP)
+
+ # Define variables for BasePasswordTestCase
+ self.lp = LP
+ self.global_creds = CREDS
+ self.host = RWDC
+ self.host_url = 'ldap://%s' % RWDC
+ self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
+ credentials=self.global_creds, lp=self.lp)
+
+ super(RodcRwdcCachedTests, self).setUp()
+ self.host_url = 'ldap://%s' % RODC
+
+ self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
+ self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
+ self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
+
+ self.base_dn = self.rwdc_db.domain_dn()
+
+ root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
+ attrs=['dsServiceName'])
+ self.service = root[0]['dsServiceName'][0]
+ self.tag = uuid.uuid4().hex
+
+ self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
+ self.rwdc_db.set_dsheuristics("000000001")
+
+ set_auto_replication(RWDC, False)
+
+ # make sure DCs are synchronized before the test
+ self.force_replication()
+
+ def test_login_lockout_krb5(self):
+ username = self.lockout1krb5_creds.get_username()
+ userpass = self.lockout1krb5_creds.get_password()
+ userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+
+ preload_rodc_user(userdn)
+
+ self.kerberos = True
+
+ self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
+
+ res = self.rodc_db.search(self.rodc_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['msDS-RevealOnDemandGroup'])
+
+ group = res[0]['msDS-RevealOnDemandGroup'][0]
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.rwdc_db, group)
+ m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
+ self.rwdc_db.modify(m)
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb, self.base_dn)
+
+ self.account_lockout_duration = 10
+ account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
+
+ m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
+ ldb.FLAG_MOD_REPLACE,
+ "lockoutDuration")
+
+ self.lockout_observation_window = 10
+ lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
+
+ m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
+ ldb.FLAG_MOD_REPLACE,
+ "lockOutObservationWindow")
+
+ self.rwdc_db.modify(m)
+ self.force_replication()
+
+ self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
+
+ def test_login_lockout_ntlm(self):
+ username = self.lockout1ntlm_creds.get_username()
+ userpass = self.lockout1ntlm_creds.get_password()
+ userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
+
+ preload_rodc_user(userdn)
+
+ self.kerberos = False
+
+ self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
+
+ res = self.rodc_db.search(self.rodc_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['msDS-RevealOnDemandGroup'])
+
+ group = res[0]['msDS-RevealOnDemandGroup'][0]
+
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.rwdc_db, group)
+ m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
+ self.rwdc_db.modify(m)
+
+ self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
+
+ def _test_login_lockout_rodc_rwdc(self, creds, userdn):
+ username = creds.get_username()
+ userpass = creds.get_password()
+
+ # Open a second LDB connection with the user credentials. Use the
+ # command line credentials for informations like the domain, the realm
+ # and the workstation.
+ creds_lockout = self.insta_creds(creds)
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+
+ self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
+
+ badPasswordTime = 0
+ logonCount = 0
+ lastLogon = 0
+ lastLogonTimestamp=0
+ logoncount_relation = ''
+ lastlogon_relation = ''
+
+ res = self._check_account(userdn,
+ badPwdCount=1,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0,
+ msg='lastlogontimestamp with wrong password')
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ # Correct old password
+ creds_lockout.set_password(userpass)
+
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+
+ # lastLogonTimestamp should not change
+ # lastLogon increases if badPwdCount is non-zero (!)
+ res = self._check_account(userdn,
+ badPwdCount=0,
+ badPasswordTime=badPasswordTime,
+ logonCount=(logoncount_relation, logonCount),
+ lastLogon=('greater', lastLogon),
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0,
+ msg='LLTimestamp is updated to lastlogon')
+
+ logonCount = int(res[0]["logonCount"][0])
+ lastLogon = int(res[0]["lastLogon"][0])
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+
+ self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
+
+ res = self._check_account(userdn,
+ badPwdCount=1,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=2,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ print "two failed password change"
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=3,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ lockoutTime=("greater", badPasswordTime),
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+ lockoutTime = int(res[0]["lockoutTime"][0])
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=3,
+ badPasswordTime=badPasswordTime,
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ lockoutTime=lockoutTime,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=3,
+ badPasswordTime=badPasswordTime,
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ lockoutTime=lockoutTime,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
+
+ # The correct password, but we are locked out
+ creds_lockout.set_password(userpass)
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=3,
+ badPasswordTime=badPasswordTime,
+ logonCount=logonCount,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ lockoutTime=lockoutTime,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
+
+ # wait for the lockout to end
+ time.sleep(self.account_lockout_duration + 1)
+ print self.account_lockout_duration + 1
+
+ res = self._check_account(userdn,
+ badPwdCount=3, effective_bad_password_count=0,
+ badPasswordTime=badPasswordTime,
+ logonCount=logonCount,
+ lockoutTime=lockoutTime,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+
+ # The correct password after letting the timeout expire
+
+ creds_lockout.set_password(userpass)
+
+ creds_lockout2 = self.insta_creds(creds_lockout)
+
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
+ time.sleep(3)
+
+ res = self._check_account(userdn,
+ badPwdCount=0,
+ badPasswordTime=badPasswordTime,
+ logonCount=(logoncount_relation, logonCount),
+ lastLogon=(lastlogon_relation, lastLogon),
+ lastLogonTimestamp=lastLogonTimestamp,
+ lockoutTime=lockoutTime,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0,
+ msg="lastLogon is way off")
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=1,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lockoutTime=lockoutTime,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+
+ res = self._check_account(userdn,
+ badPwdCount=2,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lockoutTime=lockoutTime,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ time.sleep(self.lockout_observation_window + 1)
+
+ res = self._check_account(userdn,
+ badPwdCount=2, effective_bad_password_count=0,
+ badPasswordTime=badPasswordTime,
+ logonCount=logonCount,
+ lockoutTime=lockoutTime,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+
+ # The wrong password
+ creds_lockout.set_password("thatsAcomplPASS1x")
+ try:
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ res = self._check_account(userdn,
+ badPwdCount=1,
+ badPasswordTime=("greater", badPasswordTime),
+ logonCount=logonCount,
+ lockoutTime=lockoutTime,
+ lastLogon=lastLogon,
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
+ badPasswordTime = int(res[0]["badPasswordTime"][0])
+
+ # The correct password without letting the timeout expire
+ creds_lockout.set_password(userpass)
+ ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
+
+ res = self._check_account(userdn,
+ badPwdCount=0,
+ badPasswordTime=badPasswordTime,
+ logonCount=(logoncount_relation, logonCount),
+ lockoutTime=lockoutTime,
+ lastLogon=("greater", lastLogon),
+ lastLogonTimestamp=lastLogonTimestamp,
+ userAccountControl=
+ dsdb.UF_NORMAL_ACCOUNT,
+ msDSUserAccountControlComputed=0)
class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
counter = itertools.count(1).next