PEP8: fix E303: too many blank lines (2)
[garming/samba-autobuild/.git] / source4 / dsdb / tests / python / rodc_rwdc.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
5
6 How does it work when the password is changed on the RWDC?
7 """
8
9 import optparse
10 import sys
11 import base64
12 import uuid
13 import subprocess
14 import itertools
15 import time
16
17 sys.path.insert(0, "bin/python")
18 import samba
19 import ldb
20
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
23
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
30
31 import password_lockout_base
32
33
34 def passwd_encode(pw):
35     return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
36
37
38 class RodcRwdcTestException(Exception):
39     pass
40
41
42 def make_creds(username, password, kerberos_state=None):
43     # use the global CREDS as a template
44     c = Credentials()
45     c.set_username(username)
46     c.set_password(password)
47     c.set_domain(CREDS.get_domain())
48     c.set_realm(CREDS.get_realm())
49     c.set_workstation(CREDS.get_workstation())
50
51     if kerberos_state is None:
52         kerberos_state = CREDS.get_kerberos_state()
53     c.set_kerberos_state(kerberos_state)
54
55     print('-' * 73)
56     if kerberos_state == MUST_USE_KERBEROS:
57         print("we seem to be using kerberos for %s %s" % (username, password))
58     elif kerberos_state == DONT_USE_KERBEROS:
59         print("NOT using kerberos for %s %s" % (username, password))
60     else:
61         print("kerberos state is %s" % kerberos_state)
62
63     c.set_gensec_features(c.get_gensec_features() |
64                           gensec.FEATURE_SEAL)
65     return c
66
67
68 def set_auto_replication(dc, allow):
69     credstring = '-U%s%%%s' % (CREDS.get_username(),
70                                CREDS.get_password())
71
72     on_or_off = '-' if allow else '+'
73
74     for opt in ['DISABLE_INBOUND_REPL',
75                 'DISABLE_OUTBOUND_REPL']:
76         cmd = ['bin/samba-tool',
77                'drs', 'options',
78                credstring, dc,
79                "--dsa-option=%s%s" % (on_or_off, opt)]
80
81         p = subprocess.Popen(cmd,
82                              stderr=subprocess.PIPE,
83                              stdout=subprocess.PIPE)
84         stdout, stderr = p.communicate()
85         if p.returncode:
86             if 'LDAP_REFERRAL' not in stderr:
87                 raise RodcRwdcTestException()
88             print("ignoring +%s REFERRAL error; assuming %s is RODC" %
89                    (opt, dc))
90
91
92 def preload_rodc_user(user_dn):
93     credstring = '-U%s%%%s' % (CREDS.get_username(),
94                                CREDS.get_password())
95
96     set_auto_replication(RWDC, True)
97     cmd = ['bin/samba-tool',
98            'rodc', 'preload',
99            user_dn,
100            credstring,
101            '--server', RWDC, ]
102
103     print(' '.join(cmd))
104     subprocess.check_call(cmd)
105     set_auto_replication(RWDC, False)
106
107
108 def get_server_ref_from_samdb(samdb):
109     server_name = samdb.get_serverName()
110     res = samdb.search(server_name,
111                        scope=ldb.SCOPE_BASE,
112                        attrs=['serverReference'])
113
114     return res[0]['serverReference'][0]
115
116
117 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
118     counter = itertools.count(1).next
119
120     def _check_account_initial(self, dn):
121         self.force_replication()
122         return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
123
124     def _check_account(self, dn,
125                        badPwdCount=None,
126                        badPasswordTime=None,
127                        logonCount=None,
128                        lastLogon=None,
129                        lastLogonTimestamp=None,
130                        lockoutTime=None,
131                        userAccountControl=None,
132                        msDSUserAccountControlComputed=None,
133                        effective_bad_password_count=None,
134                        msg=None,
135                        badPwdCountOnly=False):
136         # Wait for the RWDC to get any delayed messages
137         # e.g. SendToSam or KRB5 bad passwords via winbindd
138         if (self.kerberos and isinstance(badPasswordTime, tuple) or
139             badPwdCount == 0):
140             time.sleep(5)
141
142         return super(RodcRwdcCachedTests,
143                      self)._check_account(dn, badPwdCount, badPasswordTime,
144                                           logonCount, lastLogon,
145                                           lastLogonTimestamp, lockoutTime,
146                                           userAccountControl,
147                                           msDSUserAccountControlComputed,
148                                           effective_bad_password_count, msg,
149                                           True)
150
151     def force_replication(self, base=None):
152         if base is None:
153             base = self.base_dn
154
155         # XXX feels like a horrendous way to do it.
156         credstring = '-U%s%%%s' % (CREDS.get_username(),
157                                    CREDS.get_password())
158         cmd = ['bin/samba-tool',
159                'drs', 'replicate',
160                RODC, RWDC, base,
161                credstring,
162                '--sync-forced']
163
164         p = subprocess.Popen(cmd,
165                              stderr=subprocess.PIPE,
166                              stdout=subprocess.PIPE)
167         stdout, stderr = p.communicate()
168         if p.returncode:
169             print("failed with code %s" % p.returncode)
170             print(' '.join(cmd))
171             print("stdout")
172             print(stdout)
173             print("stderr")
174             print(stderr)
175             raise RodcRwdcTestException()
176
177     def _change_password(self, user_dn, old_password, new_password):
178         self.rwdc_db.modify_ldif(
179             "dn: %s\n"
180             "changetype: modify\n"
181             "delete: userPassword\n"
182             "userPassword: %s\n"
183             "add: userPassword\n"
184             "userPassword: %s\n" % (user_dn, old_password, new_password))
185
186     def tearDown(self):
187         super(RodcRwdcCachedTests, self).tearDown()
188         set_auto_replication(RWDC, True)
189
190     def setUp(self):
191         self.kerberos = False  # To be set later
192
193         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
194                              session_info=system_session(LP), lp=LP)
195
196         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
197                              session_info=system_session(LP), lp=LP)
198
199         # Define variables for BasePasswordTestCase
200         self.lp = LP
201         self.global_creds = CREDS
202         self.host = RWDC
203         self.host_url = 'ldap://%s' % RWDC
204         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
205                          credentials=self.global_creds, lp=self.lp)
206
207         super(RodcRwdcCachedTests, self).setUp()
208         self.host_url = 'ldap://%s' % RODC
209
210         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
211         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
212         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
213
214         self.base_dn = self.rwdc_db.domain_dn()
215
216         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
217                                    attrs=['dsServiceName'])
218         self.service = root[0]['dsServiceName'][0]
219         self.tag = uuid.uuid4().hex
220
221         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
222         self.rwdc_db.set_dsheuristics("000000001")
223
224         set_auto_replication(RWDC, False)
225
226         # make sure DCs are synchronized before the test
227         self.force_replication()
228
229     def delete_ldb_connections(self):
230         super(RodcRwdcCachedTests, self).delete_ldb_connections()
231         del self.rwdc_db
232         del self.rodc_db
233
234     def test_cache_and_flush_password(self):
235         username = self.lockout1krb5_creds.get_username()
236         userpass = self.lockout1krb5_creds.get_password()
237         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
238
239         ldb_system = SamDB(session_info=system_session(self.lp),
240                            credentials=self.global_creds, lp=self.lp)
241
242         res = ldb_system.search(userdn, attrs=['unicodePwd'])
243         self.assertFalse('unicodePwd' in res[0])
244
245         preload_rodc_user(userdn)
246
247         res = ldb_system.search(userdn, attrs=['unicodePwd'])
248         self.assertTrue('unicodePwd' in res[0])
249
250         newpass = userpass + '!'
251
252         # Forcing replication should blank out password (when changed)
253         self._change_password(userdn, userpass, newpass)
254         self.force_replication()
255
256         res = ldb_system.search(userdn, attrs=['unicodePwd'])
257         self.assertFalse('unicodePwd' in res[0])
258
259     def test_login_lockout_krb5(self):
260         username = self.lockout1krb5_creds.get_username()
261         userpass = self.lockout1krb5_creds.get_password()
262         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
263
264         preload_rodc_user(userdn)
265
266         self.kerberos = True
267
268         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
269
270         res = self.rodc_db.search(self.rodc_dn,
271                                   scope=ldb.SCOPE_BASE,
272                                   attrs=['msDS-RevealOnDemandGroup'])
273
274         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
275
276         m = ldb.Message()
277         m.dn = ldb.Dn(self.rwdc_db, group)
278         m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
279         self.rwdc_db.modify(m)
280
281         m = ldb.Message()
282         m.dn = ldb.Dn(self.ldb, self.base_dn)
283
284         self.account_lockout_duration = 10
285         account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
286
287         m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
288                                                   ldb.FLAG_MOD_REPLACE,
289                                                   "lockoutDuration")
290
291         self.lockout_observation_window = 10
292         lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
293
294         m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
295                                                            ldb.FLAG_MOD_REPLACE,
296                                                            "lockOutObservationWindow")
297
298         self.rwdc_db.modify(m)
299         self.force_replication()
300
301         self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
302
303     def test_login_lockout_ntlm(self):
304         username = self.lockout1ntlm_creds.get_username()
305         userpass = self.lockout1ntlm_creds.get_password()
306         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
307
308         preload_rodc_user(userdn)
309
310         self.kerberos = False
311
312         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
313
314         res = self.rodc_db.search(self.rodc_dn,
315                                   scope=ldb.SCOPE_BASE,
316                                   attrs=['msDS-RevealOnDemandGroup'])
317
318         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
319
320         m = ldb.Message()
321         m.dn = ldb.Dn(self.rwdc_db, group)
322         m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
323         self.rwdc_db.modify(m)
324
325         self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
326
327     def test_login_lockout_not_revealed(self):
328         '''Test that SendToSam is restricted by preloaded users/groups'''
329
330         username = self.lockout1ntlm_creds.get_username()
331         userpass = self.lockout1ntlm_creds.get_password()
332         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
333
334         # Preload but do not add to revealed group
335         preload_rodc_user(userdn)
336
337         self.kerberos = False
338
339         creds = self.lockout1ntlm_creds
340
341         # Open a second LDB connection with the user credentials. Use the
342         # command line credentials for informations like the domain, the realm
343         # and the workstation.
344         creds_lockout = self.insta_creds(creds)
345
346         # The wrong password
347         creds_lockout.set_password("thatsAcomplPASS1x")
348
349         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
350
351         badPasswordTime = 0
352         logonCount = 0
353         lastLogon = 0
354         lastLogonTimestamp = 0
355         logoncount_relation = ''
356         lastlogon_relation = ''
357
358         res = self._check_account(userdn,
359                                   badPwdCount=1,
360                                   badPasswordTime=("greater", badPasswordTime),
361                                   logonCount=logonCount,
362                                   lastLogon=lastLogon,
363                                   lastLogonTimestamp=lastLogonTimestamp,
364                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
365                                   msDSUserAccountControlComputed=0,
366                                   msg='lastlogontimestamp with wrong password')
367         badPasswordTime = int(res[0]["badPasswordTime"][0])
368
369         # BadPwdCount on RODC increases alongside RWDC
370         res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
371         self.assertTrue('badPwdCount' in res[0])
372         self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
373
374         # Correct old password
375         creds_lockout.set_password(userpass)
376
377         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
378
379         # Wait for potential SendToSam...
380         time.sleep(5)
381
382         # BadPwdCount on RODC decreases, but not the RWDC
383         res = self._check_account(userdn,
384                                   badPwdCount=1,
385                                   badPasswordTime=badPasswordTime,
386                                   logonCount=(logoncount_relation, logonCount),
387                                   lastLogon=('greater', lastLogon),
388                                   lastLogonTimestamp=lastLogonTimestamp,
389                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
390                                   msDSUserAccountControlComputed=0,
391                                   msg='badPwdCount not reset on RWDC')
392
393         res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
394         self.assertTrue('badPwdCount' in res[0])
395         self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
396
397     def _test_login_lockout_rodc_rwdc(self, creds, userdn):
398         username = creds.get_username()
399         userpass = creds.get_password()
400
401         # Open a second LDB connection with the user credentials. Use the
402         # command line credentials for informations like the domain, the realm
403         # and the workstation.
404         creds_lockout = self.insta_creds(creds)
405
406         # The wrong password
407         creds_lockout.set_password("thatsAcomplPASS1x")
408
409         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
410
411         badPasswordTime = 0
412         logonCount = 0
413         lastLogon = 0
414         lastLogonTimestamp = 0
415         logoncount_relation = ''
416         lastlogon_relation = ''
417
418         res = self._check_account(userdn,
419                                   badPwdCount=1,
420                                   badPasswordTime=("greater", badPasswordTime),
421                                   logonCount=logonCount,
422                                   lastLogon=lastLogon,
423                                   lastLogonTimestamp=lastLogonTimestamp,
424                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
425                                   msDSUserAccountControlComputed=0,
426                                   msg='lastlogontimestamp with wrong password')
427         badPasswordTime = int(res[0]["badPasswordTime"][0])
428
429         # Correct old password
430         creds_lockout.set_password(userpass)
431
432         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
433
434         # lastLogonTimestamp should not change
435         # lastLogon increases if badPwdCount is non-zero (!)
436         res = self._check_account(userdn,
437                                   badPwdCount=0,
438                                   badPasswordTime=badPasswordTime,
439                                   logonCount=(logoncount_relation, logonCount),
440                                   lastLogon=('greater', lastLogon),
441                                   lastLogonTimestamp=lastLogonTimestamp,
442                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
443                                   msDSUserAccountControlComputed=0,
444                                   msg='LLTimestamp is updated to lastlogon')
445
446         logonCount = int(res[0]["logonCount"][0])
447         lastLogon = int(res[0]["lastLogon"][0])
448
449         # The wrong password
450         creds_lockout.set_password("thatsAcomplPASS1x")
451
452         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
453
454         res = self._check_account(userdn,
455                                   badPwdCount=1,
456                                   badPasswordTime=("greater", badPasswordTime),
457                                   logonCount=logonCount,
458                                   lastLogon=lastLogon,
459                                   lastLogonTimestamp=lastLogonTimestamp,
460                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
461                                   msDSUserAccountControlComputed=0)
462         badPasswordTime = int(res[0]["badPasswordTime"][0])
463
464         # The wrong password
465         creds_lockout.set_password("thatsAcomplPASS1x")
466
467         try:
468             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
469             self.fail()
470
471         except LdbError as e1:
472             (num, msg) = e1.args
473             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
474
475         res = self._check_account(userdn,
476                                   badPwdCount=2,
477                                   badPasswordTime=("greater", badPasswordTime),
478                                   logonCount=logonCount,
479                                   lastLogon=lastLogon,
480                                   lastLogonTimestamp=lastLogonTimestamp,
481                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
482                                   msDSUserAccountControlComputed=0)
483         badPasswordTime = int(res[0]["badPasswordTime"][0])
484
485         print("two failed password change")
486
487         # The wrong password
488         creds_lockout.set_password("thatsAcomplPASS1x")
489
490         try:
491             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
492             self.fail()
493
494         except LdbError as e2:
495             (num, msg) = e2.args
496             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
497
498         res = self._check_account(userdn,
499                                   badPwdCount=3,
500                                   badPasswordTime=("greater", badPasswordTime),
501                                   logonCount=logonCount,
502                                   lastLogon=lastLogon,
503                                   lastLogonTimestamp=lastLogonTimestamp,
504                                   lockoutTime=("greater", badPasswordTime),
505                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
506                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
507         badPasswordTime = int(res[0]["badPasswordTime"][0])
508         lockoutTime = int(res[0]["lockoutTime"][0])
509
510         # The wrong password
511         creds_lockout.set_password("thatsAcomplPASS1x")
512         try:
513             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
514             self.fail()
515         except LdbError as e3:
516             (num, msg) = e3.args
517             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
518
519         res = self._check_account(userdn,
520                                   badPwdCount=3,
521                                   badPasswordTime=badPasswordTime,
522                                   logonCount=logonCount,
523                                   lastLogon=lastLogon,
524                                   lastLogonTimestamp=lastLogonTimestamp,
525                                   lockoutTime=lockoutTime,
526                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
527                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
528
529         # The wrong password
530         creds_lockout.set_password("thatsAcomplPASS1x")
531         try:
532             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
533             self.fail()
534         except LdbError as e4:
535             (num, msg) = e4.args
536             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
537
538         res = self._check_account(userdn,
539                                   badPwdCount=3,
540                                   badPasswordTime=badPasswordTime,
541                                   logonCount=logonCount,
542                                   lastLogon=lastLogon,
543                                   lastLogonTimestamp=lastLogonTimestamp,
544                                   lockoutTime=lockoutTime,
545                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
546                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
547
548         # The correct password, but we are locked out
549         creds_lockout.set_password(userpass)
550         try:
551             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
552             self.fail()
553         except LdbError as e5:
554             (num, msg) = e5.args
555             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
556
557         res = self._check_account(userdn,
558                                   badPwdCount=3,
559                                   badPasswordTime=badPasswordTime,
560                                   logonCount=logonCount,
561                                   lastLogon=lastLogon,
562                                   lastLogonTimestamp=lastLogonTimestamp,
563                                   lockoutTime=lockoutTime,
564                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
565                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
566
567         # wait for the lockout to end
568         time.sleep(self.account_lockout_duration + 1)
569         print(self.account_lockout_duration + 1)
570
571         res = self._check_account(userdn,
572                                   badPwdCount=3, effective_bad_password_count=0,
573                                   badPasswordTime=badPasswordTime,
574                                   logonCount=logonCount,
575                                   lockoutTime=lockoutTime,
576                                   lastLogon=lastLogon,
577                                   lastLogonTimestamp=lastLogonTimestamp,
578                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
579                                   msDSUserAccountControlComputed=0)
580
581         # The correct password after letting the timeout expire
582
583         creds_lockout.set_password(userpass)
584
585         creds_lockout2 = self.insta_creds(creds_lockout)
586
587         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
588         time.sleep(3)
589
590         res = self._check_account(userdn,
591                                   badPwdCount=0,
592                                   badPasswordTime=badPasswordTime,
593                                   logonCount=(logoncount_relation, logonCount),
594                                   lastLogon=(lastlogon_relation, lastLogon),
595                                   lastLogonTimestamp=lastLogonTimestamp,
596                                   lockoutTime=lockoutTime,
597                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
598                                   msDSUserAccountControlComputed=0,
599                                   msg="lastLogon is way off")
600
601         # The wrong password
602         creds_lockout.set_password("thatsAcomplPASS1x")
603         try:
604             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
605             self.fail()
606         except LdbError as e6:
607             (num, msg) = e6.args
608             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
609
610         res = self._check_account(userdn,
611                                   badPwdCount=1,
612                                   badPasswordTime=("greater", badPasswordTime),
613                                   logonCount=logonCount,
614                                   lockoutTime=lockoutTime,
615                                   lastLogon=lastLogon,
616                                   lastLogonTimestamp=lastLogonTimestamp,
617                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
618                                   msDSUserAccountControlComputed=0)
619         badPasswordTime = int(res[0]["badPasswordTime"][0])
620
621         # The wrong password
622         creds_lockout.set_password("thatsAcomplPASS1x")
623         try:
624             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
625             self.fail()
626         except LdbError as e7:
627             (num, msg) = e7.args
628             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
629
630         res = self._check_account(userdn,
631                                   badPwdCount=2,
632                                   badPasswordTime=("greater", badPasswordTime),
633                                   logonCount=logonCount,
634                                   lockoutTime=lockoutTime,
635                                   lastLogon=lastLogon,
636                                   lastLogonTimestamp=lastLogonTimestamp,
637                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
638                                   msDSUserAccountControlComputed=0)
639         badPasswordTime = int(res[0]["badPasswordTime"][0])
640
641         time.sleep(self.lockout_observation_window + 1)
642
643         res = self._check_account(userdn,
644                                   badPwdCount=2, effective_bad_password_count=0,
645                                   badPasswordTime=badPasswordTime,
646                                   logonCount=logonCount,
647                                   lockoutTime=lockoutTime,
648                                   lastLogon=lastLogon,
649                                   lastLogonTimestamp=lastLogonTimestamp,
650                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
651                                   msDSUserAccountControlComputed=0)
652
653         # The wrong password
654         creds_lockout.set_password("thatsAcomplPASS1x")
655         try:
656             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
657             self.fail()
658         except LdbError as e8:
659             (num, msg) = e8.args
660             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
661
662         res = self._check_account(userdn,
663                                   badPwdCount=1,
664                                   badPasswordTime=("greater", badPasswordTime),
665                                   logonCount=logonCount,
666                                   lockoutTime=lockoutTime,
667                                   lastLogon=lastLogon,
668                                   lastLogonTimestamp=lastLogonTimestamp,
669                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
670                                   msDSUserAccountControlComputed=0)
671         badPasswordTime = int(res[0]["badPasswordTime"][0])
672
673         # The correct password without letting the timeout expire
674         creds_lockout.set_password(userpass)
675         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
676
677         res = self._check_account(userdn,
678                                   badPwdCount=0,
679                                   badPasswordTime=badPasswordTime,
680                                   logonCount=(logoncount_relation, logonCount),
681                                   lockoutTime=lockoutTime,
682                                   lastLogon=("greater", lastLogon),
683                                   lastLogonTimestamp=lastLogonTimestamp,
684                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
685                                   msDSUserAccountControlComputed=0)
686
687
688 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
689     counter = itertools.count(1).next
690
691     def force_replication(self, base=None):
692         if base is None:
693             base = self.base_dn
694
695         # XXX feels like a horrendous way to do it.
696         credstring = '-U%s%%%s' % (CREDS.get_username(),
697                                    CREDS.get_password())
698         cmd = ['bin/samba-tool',
699                'drs', 'replicate',
700                RODC, RWDC, base,
701                credstring,
702                '--sync-forced']
703
704         p = subprocess.Popen(cmd,
705                              stderr=subprocess.PIPE,
706                              stdout=subprocess.PIPE)
707         stdout, stderr = p.communicate()
708         if p.returncode:
709             print("failed with code %s" % p.returncode)
710             print(' '.join(cmd))
711             print("stdout")
712             print(stdout)
713             print("stderr")
714             print(stderr)
715             raise RodcRwdcTestException()
716
717     def _check_account_initial(self, dn):
718         self.force_replication()
719         return super(RodcRwdcTests, self)._check_account_initial(dn)
720
721     def tearDown(self):
722         super(RodcRwdcTests, self).tearDown()
723         self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
724         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
725         set_auto_replication(RWDC, True)
726
727     def setUp(self):
728         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
729                              session_info=system_session(LP), lp=LP)
730
731         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
732                              session_info=system_session(LP), lp=LP)
733
734         # Define variables for BasePasswordTestCase
735         self.lp = LP
736         self.global_creds = CREDS
737         self.host = RWDC
738         self.host_url = 'ldap://%s' % RWDC
739         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
740                          credentials=self.global_creds, lp=self.lp)
741
742         super(RodcRwdcTests, self).setUp()
743         self.host = RODC
744         self.host_url = 'ldap://%s' % RODC
745         self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
746                          credentials=self.global_creds, lp=self.lp)
747
748         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
749         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
750         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
751
752         self.base_dn = self.rwdc_db.domain_dn()
753
754         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
755                                    attrs=['dsServiceName'])
756         self.service = root[0]['dsServiceName'][0]
757         self.tag = uuid.uuid4().hex
758
759         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
760         self.rwdc_db.set_dsheuristics("000000001")
761
762         set_auto_replication(RWDC, False)
763
764         # make sure DCs are synchronized before the test
765         self.force_replication()
766         self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
767         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
768
769     def delete_ldb_connections(self):
770         super(RodcRwdcTests, self).delete_ldb_connections()
771         del self.rwdc_db
772         del self.rodc_db
773
774     def assertReferral(self, fn, *args, **kwargs):
775         try:
776             fn(*args, **kwargs)
777             self.fail("failed to raise ldap referral")
778         except ldb.LdbError as e9:
779             (code, msg) = e9.args
780             self.assertEqual(code, ldb.ERR_REFERRAL,
781                              "expected referral, got %s %s" % (code, msg))
782
783     def _test_rodc_dsheuristics(self):
784         d = self.rodc_db.get_dsheuristics()
785         self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
786         self.assertReferral(self.rodc_db.set_dsheuristics, d)
787
788     def TEST_rodc_heuristics_kerberos(self):
789         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
790         self._test_rodc_dsheuristics()
791
792     def TEST_rodc_heuristics_ntlm(self):
793         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
794         self._test_rodc_dsheuristics()
795
796     def _test_add(self, objects, cross_ncs=False):
797         for o in objects:
798             dn = o['dn']
799             if cross_ncs:
800                 base = str(self.rwdc_db.get_config_basedn())
801                 controls = ["search_options:1:2"]
802                 cn = dn.split(',', 1)[0]
803                 expression = '(%s)' % cn
804             else:
805                 base = dn
806                 controls = []
807                 expression = None
808
809             try:
810                 res = self.rodc_db.search(base,
811                                           expression=expression,
812                                           scope=ldb.SCOPE_SUBTREE,
813                                           attrs=['dn'],
814                                           controls=controls)
815                 self.assertEqual(len(res), 0)
816             except ldb.LdbError as e:
817                 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
818                     raise
819
820             try:
821                 self.rwdc_db.add(o)
822             except ldb.LdbError as e:
823                 (ecode, emsg) = e.args
824                 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
825                           (ecode, emsg))
826
827             if cross_ncs:
828                 self.force_replication(base=base)
829             else:
830                 self.force_replication()
831
832             try:
833                 res = self.rodc_db.search(base,
834                                           expression=expression,
835                                           scope=ldb.SCOPE_SUBTREE,
836                                           attrs=['dn'],
837                                           controls=controls)
838                 self.assertEqual(len(res), 1)
839             except ldb.LdbError as e:
840                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
841                                     "replication seems to have failed")
842
843     def _test_add_replicated_objects(self, mode):
844         tag = "%s%s" % (self.tag, mode)
845         self._test_add([
846             {
847                 'dn': "ou=%s1,%s" % (tag, self.base_dn),
848                 "objectclass": "organizationalUnit"
849             },
850             {
851                 'dn': "cn=%s2,%s" % (tag, self.base_dn),
852                 "objectclass": "user"
853             },
854             {
855                 'dn': "cn=%s3,%s" % (tag, self.base_dn),
856                 "objectclass": "group"
857             },
858         ])
859         self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
860         self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
861         self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
862
863     def test_add_replicated_objects_kerberos(self):
864         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
865         self._test_add_replicated_objects('kerberos')
866
867     def test_add_replicated_objects_ntlm(self):
868         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
869         self._test_add_replicated_objects('ntlm')
870
871     def _test_add_replicated_connections(self, mode):
872         tag = "%s%s" % (self.tag, mode)
873         self._test_add([
874             {
875                 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
876                 "objectclass": "NTDSConnection",
877                 'enabledConnection': 'TRUE',
878                 'fromServer': self.base_dn,
879                 'options': '0'
880             },
881         ], cross_ncs=True)
882         self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
883
884     def test_add_replicated_connections_kerberos(self):
885         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
886         self._test_add_replicated_connections('kerberos')
887
888     def test_add_replicated_connections_ntlm(self):
889         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
890         self._test_add_replicated_connections('ntlm')
891
892     def _test_modify_replicated_attributes(self):
893         dn = 'CN=Guest,CN=Users,' + self.base_dn
894         value = self.tag
895         for attr in ['carLicense', 'middleName']:
896             m = ldb.Message()
897             m.dn = ldb.Dn(self.rwdc_db, dn)
898             m[attr] = ldb.MessageElement(value,
899                                          ldb.FLAG_MOD_REPLACE,
900                                          attr)
901             try:
902                 self.rwdc_db.modify(m)
903             except ldb.LdbError as e:
904                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
905                           (dn, attr, RWDC, e))
906
907             self.force_replication()
908
909             try:
910                 res = self.rodc_db.search(dn,
911                                           scope=ldb.SCOPE_SUBTREE,
912                                           attrs=[attr])
913                 results = [x[attr][0] for x in res]
914                 self.assertEqual(results, [value])
915             except ldb.LdbError as e:
916                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
917                                     "replication seems to have failed")
918
919     def test_modify_replicated_attributes_kerberos(self):
920         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
921         self._test_modify_replicated_attributes()
922
923     def test_modify_replicated_attributes_ntlm(self):
924         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
925         self._test_modify_replicated_attributes()
926
927     def _test_add_modify_delete(self):
928         dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
929         values = ["%s%s" % (i, self.tag) for i in range(3)]
930         attr = "carLicense"
931         self._test_add([
932             {
933                 'dn': dn,
934                 "objectclass": "user",
935                 attr: values[0]
936             },
937         ])
938         self.force_replication()
939         for value in values[1:]:
940
941             m = ldb.Message()
942             m.dn = ldb.Dn(self.rwdc_db, dn)
943             m[attr] = ldb.MessageElement(value,
944                                          ldb.FLAG_MOD_REPLACE,
945                                          attr)
946             try:
947                 self.rwdc_db.modify(m)
948             except ldb.LdbError as e:
949                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
950                           (dn, attr, RWDC, e))
951
952             self.force_replication()
953
954             try:
955                 res = self.rodc_db.search(dn,
956                                           scope=ldb.SCOPE_SUBTREE,
957                                           attrs=[attr])
958                 results = [x[attr][0] for x in res]
959                 self.assertEqual(results, [value])
960             except ldb.LdbError as e:
961                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
962                                     "replication seems to have failed")
963
964         self.rwdc_db.delete(dn)
965         self.force_replication()
966         try:
967             res = self.rodc_db.search(dn,
968                                       scope=ldb.SCOPE_SUBTREE,
969                                       attrs=[attr])
970             if len(res) > 0:
971                 self.fail("Failed to delete %s" % (dn))
972         except ldb.LdbError as e:
973             self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
974                              "Failed to delete %s" % (dn))
975
976     def test_add_modify_delete_kerberos(self):
977         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
978         self._test_add_modify_delete()
979
980     def test_add_modify_delete_ntlm(self):
981         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
982         self._test_add_modify_delete()
983
984     def _new_user(self):
985         username = "u%sX%s" % (self.tag[:12], self.counter())
986         password = 'password#1'
987         dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
988         o = {
989             'dn': dn,
990             "objectclass": "user",
991             'sAMAccountName': username,
992         }
993         try:
994             self.rwdc_db.add(o)
995         except ldb.LdbError as e:
996             self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
997
998         self.rwdc_db.modify_ldif("dn: %s\n"
999                                  "changetype: modify\n"
1000                                  "delete: userPassword\n"
1001                                  "add: userPassword\n"
1002                                  "userPassword: %s\n" % (dn, password))
1003         self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1004         return (dn, username, password)
1005
1006     def _change_password(self, user_dn, old_password, new_password):
1007         self.rwdc_db.modify_ldif(
1008             "dn: %s\n"
1009             "changetype: modify\n"
1010             "delete: userPassword\n"
1011             "userPassword: %s\n"
1012             "add: userPassword\n"
1013             "userPassword: %s\n" % (user_dn, old_password, new_password))
1014
1015     def try_ldap_logon(self, server, creds, errno=None):
1016         try:
1017             tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1018                           session_info=system_session(LP), lp=LP)
1019             if errno is not None:
1020                 self.fail("logon failed to fail with ldb error %s" % errno)
1021         except ldb.LdbError as e10:
1022             (code, msg) = e10.args
1023             if code != errno:
1024                 if errno is None:
1025                     self.fail("logon incorrectly raised ldb error (code=%s)" %
1026                               code)
1027                 else:
1028                     self.fail("logon failed to raise correct ldb error"
1029                               "Expected: %s Got: %s" %
1030                               (errno, code))
1031
1032     def zero_min_password_age(self):
1033         min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1034         if min_pwd_age != 0:
1035             self.rwdc_db.set_minPwdAge('0')
1036
1037     def _test_ldap_change_password(self, errno=None):
1038         self.zero_min_password_age()
1039
1040         dn, username, password = self._new_user()
1041         creds1 = make_creds(username, password)
1042
1043         # With NTLM, this should fail on RODC before replication,
1044         # because the user isn't known.
1045         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1046         self.force_replication()
1047
1048         # Now the user is replicated to RODC, so logon should work
1049         self.try_ldap_logon(RODC, creds1)
1050
1051         passwords = ['password#%s' % i for i in range(1, 6)]
1052         for prev, password in zip(passwords[:-1], passwords[1:]):
1053             self._change_password(dn, prev, password)
1054
1055         # The password has changed enough times to make the old
1056         # password invalid (though with kerberos that doesn't matter).
1057         # For NTLM, the old creds should always fail
1058         self.try_ldap_logon(RODC, creds1, errno)
1059         self.try_ldap_logon(RWDC, creds1, errno)
1060
1061         creds2 = make_creds(username, password)
1062
1063         # new creds work straight away with NTLM, because although it
1064         # doesn't have the password, it knows the user and forwards
1065         # the query.
1066         self.try_ldap_logon(RODC, creds2)
1067         self.try_ldap_logon(RWDC, creds2)
1068
1069         self.force_replication()
1070
1071         # After another replication check RODC still works and fails,
1072         # as appropriate to various creds
1073         self.try_ldap_logon(RODC, creds2)
1074         self.try_ldap_logon(RODC, creds1, errno)
1075
1076         prev = password
1077         password = 'password#6'
1078         self._change_password(dn, prev, password)
1079         creds3 = make_creds(username, password)
1080
1081         # previous password should still work.
1082         self.try_ldap_logon(RWDC, creds2)
1083         self.try_ldap_logon(RODC, creds2)
1084
1085         # new password should still work.
1086         self.try_ldap_logon(RWDC, creds3)
1087         self.try_ldap_logon(RODC, creds3)
1088
1089         # old password should still fail (but not on kerberos).
1090         self.try_ldap_logon(RWDC, creds1, errno)
1091         self.try_ldap_logon(RODC, creds1, errno)
1092
1093     def test_ldap_change_password_kerberos(self):
1094         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1095         self._test_ldap_change_password()
1096
1097     def test_ldap_change_password_ntlm(self):
1098         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1099         self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1100
1101     def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1102         self.zero_min_password_age()
1103
1104         res = self.rodc_db.search(self.rodc_dn,
1105                                   scope=ldb.SCOPE_BASE,
1106                                   attrs=['msDS-RevealOnDemandGroup'])
1107
1108         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1109
1110         user_dn, username, password = self._new_user()
1111         creds1 = make_creds(username, password)
1112
1113         m = ldb.Message()
1114         m.dn = ldb.Dn(self.rwdc_db, group)
1115         m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1116         self.rwdc_db.modify(m)
1117
1118         # Against Windows, this will just forward if no account exists on the KDC
1119         # Therefore, this does not error on Windows.
1120         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1121
1122         self.force_replication()
1123
1124         # The proxy case
1125         self.try_ldap_logon(RODC, creds1)
1126         preload_rodc_user(user_dn)
1127
1128         # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1129         self.try_ldap_logon(RODC, creds1)
1130
1131         passwords = ['password#%s' % i for i in range(1, 6)]
1132         for prev, password in zip(passwords[:-1], passwords[1:]):
1133             self._change_password(user_dn, prev, password)
1134
1135         # The password has changed enough times to make the old
1136         # password invalid, but the RODC shouldn't know that.
1137         self.try_ldap_logon(RODC, creds1)
1138         self.try_ldap_logon(RWDC, creds1, errno)
1139
1140         creds2 = make_creds(username, password)
1141         self.try_ldap_logon(RWDC, creds2)
1142         # We can forward WRONG_PASSWORD over NTLM.
1143         # This SHOULD succeed.
1144         self.try_ldap_logon(RODC, creds2)
1145
1146     def test_change_password_reveal_on_demand_ntlm(self):
1147         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1148         self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1149
1150     def test_change_password_reveal_on_demand_kerberos(self):
1151         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1152         self._test_ldap_change_password_reveal_on_demand()
1153
1154     def test_login_lockout_krb5(self):
1155         username = self.lockout1krb5_creds.get_username()
1156         userpass = self.lockout1krb5_creds.get_password()
1157         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1158
1159         preload_rodc_user(userdn)
1160
1161         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1162         fail_creds = self.insta_creds(self.template_creds,
1163                                       username=username,
1164                                       userpass=userpass + "X",
1165                                       kerberos_state=use_kerberos)
1166
1167         try:
1168             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1169             self.fail()
1170         except LdbError as e11:
1171             (num, msg) = e11.args
1172             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1173
1174         # Succeed to reset everything to 0
1175         success_creds = self.insta_creds(self.template_creds,
1176                                          username=username,
1177                                          userpass=userpass,
1178                                          kerberos_state=use_kerberos)
1179
1180         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1181
1182         self._test_login_lockout(self.lockout1krb5_creds)
1183
1184     def test_login_lockout_ntlm(self):
1185         username = self.lockout1ntlm_creds.get_username()
1186         userpass = self.lockout1ntlm_creds.get_password()
1187         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1188
1189         preload_rodc_user(userdn)
1190
1191         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1192         fail_creds = self.insta_creds(self.template_creds,
1193                                       username=username,
1194                                       userpass=userpass + "X",
1195                                       kerberos_state=use_kerberos)
1196
1197         try:
1198             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1199             self.fail()
1200         except LdbError as e12:
1201             (num, msg) = e12.args
1202             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1203
1204         # Succeed to reset everything to 0
1205         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1206
1207         self._test_login_lockout(self.lockout1ntlm_creds)
1208
1209     def test_multiple_logon_krb5(self):
1210         username = self.lockout1krb5_creds.get_username()
1211         userpass = self.lockout1krb5_creds.get_password()
1212         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1213
1214         preload_rodc_user(userdn)
1215
1216         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1217         fail_creds = self.insta_creds(self.template_creds,
1218                                       username=username,
1219                                       userpass=userpass + "X",
1220                                       kerberos_state=use_kerberos)
1221
1222         try:
1223             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1224             self.fail()
1225         except LdbError as e13:
1226             (num, msg) = e13.args
1227             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1228
1229         # Succeed to reset everything to 0
1230         success_creds = self.insta_creds(self.template_creds,
1231                                          username=username,
1232                                          userpass=userpass,
1233                                          kerberos_state=use_kerberos)
1234
1235         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1236
1237         self._test_multiple_logon(self.lockout1krb5_creds)
1238
1239     def test_multiple_logon_ntlm(self):
1240         username = self.lockout1ntlm_creds.get_username()
1241         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1242         userpass = self.lockout1ntlm_creds.get_password()
1243
1244         preload_rodc_user(userdn)
1245
1246         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1247         fail_creds = self.insta_creds(self.template_creds,
1248                                       username=username,
1249                                       userpass=userpass + "X",
1250                                       kerberos_state=use_kerberos)
1251
1252         try:
1253             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1254             self.fail()
1255         except LdbError as e14:
1256             (num, msg) = e14.args
1257             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1258
1259         # Succeed to reset everything to 0
1260         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1261
1262         self._test_multiple_logon(self.lockout1ntlm_creds)
1263
1264
1265 def main():
1266     global RODC, RWDC, CREDS, LP
1267     parser = optparse.OptionParser(
1268         "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1269
1270     sambaopts = options.SambaOptions(parser)
1271     versionopts = options.VersionOptions(parser)
1272     credopts = options.CredentialsOptions(parser)
1273     subunitopts = SubunitOptions(parser)
1274
1275     parser.add_option_group(sambaopts)
1276     parser.add_option_group(versionopts)
1277     parser.add_option_group(credopts)
1278     parser.add_option_group(subunitopts)
1279
1280     opts, args = parser.parse_args()
1281
1282     LP = sambaopts.get_loadparm()
1283     CREDS = credopts.get_credentials(LP)
1284     CREDS.set_gensec_features(CREDS.get_gensec_features() |
1285                               gensec.FEATURE_SEAL)
1286
1287     try:
1288         RODC, RWDC = args
1289     except ValueError:
1290         parser.print_usage()
1291         sys.exit(1)
1292
1293     set_auto_replication(RWDC, True)
1294     try:
1295         TestProgram(module=__name__, opts=subunitopts)
1296     finally:
1297         set_auto_replication(RWDC, True)
1298
1299 main()