PEP8: fix E261: at least two spaces before inline comment
[ambi/samba-autobuild/.git] / source4 / dsdb / tests / python / rodc_rwdc.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
5
6 How does it work when the password is changed on the RWDC?
7 """
8
9 import optparse
10 import sys
11 import base64
12 import uuid
13 import subprocess
14 import itertools
15 import time
16
17 sys.path.insert(0, "bin/python")
18 import samba
19 import ldb
20
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
23
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
30
31 import password_lockout_base
32
33 def passwd_encode(pw):
34     return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
35
36
37 class RodcRwdcTestException(Exception):
38     pass
39
40
41 def make_creds(username, password, kerberos_state=None):
42     # use the global CREDS as a template
43     c = Credentials()
44     c.set_username(username)
45     c.set_password(password)
46     c.set_domain(CREDS.get_domain())
47     c.set_realm(CREDS.get_realm())
48     c.set_workstation(CREDS.get_workstation())
49
50     if kerberos_state is None:
51         kerberos_state = CREDS.get_kerberos_state()
52     c.set_kerberos_state(kerberos_state)
53
54     print('-' * 73)
55     if kerberos_state == MUST_USE_KERBEROS:
56         print("we seem to be using kerberos for %s %s" % (username, password))
57     elif kerberos_state == DONT_USE_KERBEROS:
58         print("NOT using kerberos for %s %s" % (username, password))
59     else:
60         print("kerberos state is %s" % kerberos_state)
61
62     c.set_gensec_features(c.get_gensec_features() |
63                           gensec.FEATURE_SEAL)
64     return c
65
66
67 def set_auto_replication(dc, allow):
68     credstring = '-U%s%%%s' % (CREDS.get_username(),
69                                CREDS.get_password())
70
71     on_or_off = '-' if allow else '+'
72
73     for opt in ['DISABLE_INBOUND_REPL',
74                 'DISABLE_OUTBOUND_REPL']:
75         cmd = ['bin/samba-tool',
76                'drs', 'options',
77                credstring, dc,
78                "--dsa-option=%s%s" % (on_or_off, opt)]
79
80         p = subprocess.Popen(cmd,
81                              stderr=subprocess.PIPE,
82                              stdout=subprocess.PIPE)
83         stdout, stderr = p.communicate()
84         if p.returncode:
85             if 'LDAP_REFERRAL' not in stderr:
86                 raise RodcRwdcTestException()
87             print("ignoring +%s REFERRAL error; assuming %s is RODC" %
88                    (opt, dc))
89
90
91 def preload_rodc_user(user_dn):
92     credstring = '-U%s%%%s' % (CREDS.get_username(),
93                                CREDS.get_password())
94
95     set_auto_replication(RWDC, True)
96     cmd = ['bin/samba-tool',
97            'rodc', 'preload',
98            user_dn,
99            credstring,
100            '--server', RWDC, ]
101
102     print(' '.join(cmd))
103     subprocess.check_call(cmd)
104     set_auto_replication(RWDC, False)
105
106
107
108 def get_server_ref_from_samdb(samdb):
109     server_name = samdb.get_serverName()
110     res = samdb.search(server_name,
111                        scope=ldb.SCOPE_BASE,
112                        attrs=['serverReference'])
113
114     return res[0]['serverReference'][0]
115
116 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
117     counter = itertools.count(1).next
118
119     def _check_account_initial(self, dn):
120         self.force_replication()
121         return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
122
123     def _check_account(self, dn,
124                        badPwdCount=None,
125                        badPasswordTime=None,
126                        logonCount=None,
127                        lastLogon=None,
128                        lastLogonTimestamp=None,
129                        lockoutTime=None,
130                        userAccountControl=None,
131                        msDSUserAccountControlComputed=None,
132                        effective_bad_password_count=None,
133                        msg=None,
134                        badPwdCountOnly=False):
135         # Wait for the RWDC to get any delayed messages
136         # e.g. SendToSam or KRB5 bad passwords via winbindd
137         if (self.kerberos and isinstance(badPasswordTime, tuple) or
138             badPwdCount == 0):
139             time.sleep(5)
140
141         return super(RodcRwdcCachedTests,
142                      self)._check_account(dn, badPwdCount, badPasswordTime,
143                                           logonCount, lastLogon,
144                                           lastLogonTimestamp, lockoutTime,
145                                           userAccountControl,
146                                           msDSUserAccountControlComputed,
147                                           effective_bad_password_count, msg,
148                                           True)
149
150     def force_replication(self, base=None):
151         if base is None:
152             base = self.base_dn
153
154         # XXX feels like a horrendous way to do it.
155         credstring = '-U%s%%%s' % (CREDS.get_username(),
156                                    CREDS.get_password())
157         cmd = ['bin/samba-tool',
158                'drs', 'replicate',
159                RODC, RWDC, base,
160                credstring,
161                '--sync-forced']
162
163         p = subprocess.Popen(cmd,
164                              stderr=subprocess.PIPE,
165                              stdout=subprocess.PIPE)
166         stdout, stderr = p.communicate()
167         if p.returncode:
168             print("failed with code %s" % p.returncode)
169             print(' '.join(cmd))
170             print("stdout")
171             print(stdout)
172             print("stderr")
173             print(stderr)
174             raise RodcRwdcTestException()
175
176     def _change_password(self, user_dn, old_password, new_password):
177         self.rwdc_db.modify_ldif(
178             "dn: %s\n"
179             "changetype: modify\n"
180             "delete: userPassword\n"
181             "userPassword: %s\n"
182             "add: userPassword\n"
183             "userPassword: %s\n" % (user_dn, old_password, new_password))
184
185     def tearDown(self):
186         super(RodcRwdcCachedTests, self).tearDown()
187         set_auto_replication(RWDC, True)
188
189     def setUp(self):
190         self.kerberos = False  # To be set later
191
192         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
193                              session_info=system_session(LP), lp=LP)
194
195         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
196                              session_info=system_session(LP), lp=LP)
197
198         # Define variables for BasePasswordTestCase
199         self.lp = LP
200         self.global_creds = CREDS
201         self.host = RWDC
202         self.host_url = 'ldap://%s' % RWDC
203         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
204                          credentials=self.global_creds, lp=self.lp)
205
206         super(RodcRwdcCachedTests, self).setUp()
207         self.host_url = 'ldap://%s' % RODC
208
209         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
210         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
211         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
212
213         self.base_dn = self.rwdc_db.domain_dn()
214
215         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
216                                    attrs=['dsServiceName'])
217         self.service = root[0]['dsServiceName'][0]
218         self.tag = uuid.uuid4().hex
219
220         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
221         self.rwdc_db.set_dsheuristics("000000001")
222
223         set_auto_replication(RWDC, False)
224
225         # make sure DCs are synchronized before the test
226         self.force_replication()
227
228     def delete_ldb_connections(self):
229         super(RodcRwdcCachedTests, self).delete_ldb_connections()
230         del self.rwdc_db
231         del self.rodc_db
232
233     def test_cache_and_flush_password(self):
234         username = self.lockout1krb5_creds.get_username()
235         userpass = self.lockout1krb5_creds.get_password()
236         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
237
238         ldb_system = SamDB(session_info=system_session(self.lp),
239                            credentials=self.global_creds, lp=self.lp)
240
241         res = ldb_system.search(userdn, attrs=['unicodePwd'])
242         self.assertFalse('unicodePwd' in res[0])
243
244         preload_rodc_user(userdn)
245
246         res = ldb_system.search(userdn, attrs=['unicodePwd'])
247         self.assertTrue('unicodePwd' in res[0])
248
249         newpass = userpass + '!'
250
251         # Forcing replication should blank out password (when changed)
252         self._change_password(userdn, userpass, newpass)
253         self.force_replication()
254
255         res = ldb_system.search(userdn, attrs=['unicodePwd'])
256         self.assertFalse('unicodePwd' in res[0])
257
258     def test_login_lockout_krb5(self):
259         username = self.lockout1krb5_creds.get_username()
260         userpass = self.lockout1krb5_creds.get_password()
261         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
262
263         preload_rodc_user(userdn)
264
265         self.kerberos = True
266
267         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
268
269         res = self.rodc_db.search(self.rodc_dn,
270                                   scope=ldb.SCOPE_BASE,
271                                   attrs=['msDS-RevealOnDemandGroup'])
272
273         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
274
275         m = ldb.Message()
276         m.dn = ldb.Dn(self.rwdc_db, group)
277         m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
278         self.rwdc_db.modify(m)
279
280         m = ldb.Message()
281         m.dn = ldb.Dn(self.ldb, self.base_dn)
282
283         self.account_lockout_duration = 10
284         account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
285
286         m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
287                                                   ldb.FLAG_MOD_REPLACE,
288                                                   "lockoutDuration")
289
290         self.lockout_observation_window = 10
291         lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
292
293         m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
294                                                            ldb.FLAG_MOD_REPLACE,
295                                                            "lockOutObservationWindow")
296
297         self.rwdc_db.modify(m)
298         self.force_replication()
299
300         self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
301
302     def test_login_lockout_ntlm(self):
303         username = self.lockout1ntlm_creds.get_username()
304         userpass = self.lockout1ntlm_creds.get_password()
305         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
306
307         preload_rodc_user(userdn)
308
309         self.kerberos = False
310
311         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
312
313         res = self.rodc_db.search(self.rodc_dn,
314                                   scope=ldb.SCOPE_BASE,
315                                   attrs=['msDS-RevealOnDemandGroup'])
316
317         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
318
319         m = ldb.Message()
320         m.dn = ldb.Dn(self.rwdc_db, group)
321         m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
322         self.rwdc_db.modify(m)
323
324         self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
325
326     def test_login_lockout_not_revealed(self):
327         '''Test that SendToSam is restricted by preloaded users/groups'''
328
329         username = self.lockout1ntlm_creds.get_username()
330         userpass = self.lockout1ntlm_creds.get_password()
331         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
332
333         # Preload but do not add to revealed group
334         preload_rodc_user(userdn)
335
336         self.kerberos = False
337
338         creds = self.lockout1ntlm_creds
339
340         # Open a second LDB connection with the user credentials. Use the
341         # command line credentials for informations like the domain, the realm
342         # and the workstation.
343         creds_lockout = self.insta_creds(creds)
344
345         # The wrong password
346         creds_lockout.set_password("thatsAcomplPASS1x")
347
348         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
349
350         badPasswordTime = 0
351         logonCount = 0
352         lastLogon = 0
353         lastLogonTimestamp = 0
354         logoncount_relation = ''
355         lastlogon_relation = ''
356
357         res = self._check_account(userdn,
358                                   badPwdCount=1,
359                                   badPasswordTime=("greater", badPasswordTime),
360                                   logonCount=logonCount,
361                                   lastLogon=lastLogon,
362                                   lastLogonTimestamp=lastLogonTimestamp,
363                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
364                                   msDSUserAccountControlComputed=0,
365                                   msg='lastlogontimestamp with wrong password')
366         badPasswordTime = int(res[0]["badPasswordTime"][0])
367
368         # BadPwdCount on RODC increases alongside RWDC
369         res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
370         self.assertTrue('badPwdCount' in res[0])
371         self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
372
373         # Correct old password
374         creds_lockout.set_password(userpass)
375
376         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
377
378         # Wait for potential SendToSam...
379         time.sleep(5)
380
381         # BadPwdCount on RODC decreases, but not the RWDC
382         res = self._check_account(userdn,
383                                   badPwdCount=1,
384                                   badPasswordTime=badPasswordTime,
385                                   logonCount=(logoncount_relation, logonCount),
386                                   lastLogon=('greater', lastLogon),
387                                   lastLogonTimestamp=lastLogonTimestamp,
388                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
389                                   msDSUserAccountControlComputed=0,
390                                   msg='badPwdCount not reset on RWDC')
391
392         res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
393         self.assertTrue('badPwdCount' in res[0])
394         self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
395
396     def _test_login_lockout_rodc_rwdc(self, creds, userdn):
397         username = creds.get_username()
398         userpass = creds.get_password()
399
400         # Open a second LDB connection with the user credentials. Use the
401         # command line credentials for informations like the domain, the realm
402         # and the workstation.
403         creds_lockout = self.insta_creds(creds)
404
405         # The wrong password
406         creds_lockout.set_password("thatsAcomplPASS1x")
407
408         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
409
410         badPasswordTime = 0
411         logonCount = 0
412         lastLogon = 0
413         lastLogonTimestamp = 0
414         logoncount_relation = ''
415         lastlogon_relation = ''
416
417         res = self._check_account(userdn,
418                                   badPwdCount=1,
419                                   badPasswordTime=("greater", badPasswordTime),
420                                   logonCount=logonCount,
421                                   lastLogon=lastLogon,
422                                   lastLogonTimestamp=lastLogonTimestamp,
423                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
424                                   msDSUserAccountControlComputed=0,
425                                   msg='lastlogontimestamp with wrong password')
426         badPasswordTime = int(res[0]["badPasswordTime"][0])
427
428         # Correct old password
429         creds_lockout.set_password(userpass)
430
431         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
432
433         # lastLogonTimestamp should not change
434         # lastLogon increases if badPwdCount is non-zero (!)
435         res = self._check_account(userdn,
436                                   badPwdCount=0,
437                                   badPasswordTime=badPasswordTime,
438                                   logonCount=(logoncount_relation, logonCount),
439                                   lastLogon=('greater', lastLogon),
440                                   lastLogonTimestamp=lastLogonTimestamp,
441                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
442                                   msDSUserAccountControlComputed=0,
443                                   msg='LLTimestamp is updated to lastlogon')
444
445         logonCount = int(res[0]["logonCount"][0])
446         lastLogon = int(res[0]["lastLogon"][0])
447
448         # The wrong password
449         creds_lockout.set_password("thatsAcomplPASS1x")
450
451         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
452
453         res = self._check_account(userdn,
454                                   badPwdCount=1,
455                                   badPasswordTime=("greater", badPasswordTime),
456                                   logonCount=logonCount,
457                                   lastLogon=lastLogon,
458                                   lastLogonTimestamp=lastLogonTimestamp,
459                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
460                                   msDSUserAccountControlComputed=0)
461         badPasswordTime = int(res[0]["badPasswordTime"][0])
462
463         # The wrong password
464         creds_lockout.set_password("thatsAcomplPASS1x")
465
466         try:
467             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
468             self.fail()
469
470         except LdbError as e1:
471             (num, msg) = e1.args
472             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
473
474         res = self._check_account(userdn,
475                                   badPwdCount=2,
476                                   badPasswordTime=("greater", badPasswordTime),
477                                   logonCount=logonCount,
478                                   lastLogon=lastLogon,
479                                   lastLogonTimestamp=lastLogonTimestamp,
480                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
481                                   msDSUserAccountControlComputed=0)
482         badPasswordTime = int(res[0]["badPasswordTime"][0])
483
484         print("two failed password change")
485
486         # The wrong password
487         creds_lockout.set_password("thatsAcomplPASS1x")
488
489         try:
490             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
491             self.fail()
492
493         except LdbError as e2:
494             (num, msg) = e2.args
495             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
496
497         res = self._check_account(userdn,
498                                   badPwdCount=3,
499                                   badPasswordTime=("greater", badPasswordTime),
500                                   logonCount=logonCount,
501                                   lastLogon=lastLogon,
502                                   lastLogonTimestamp=lastLogonTimestamp,
503                                   lockoutTime=("greater", badPasswordTime),
504                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
505                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
506         badPasswordTime = int(res[0]["badPasswordTime"][0])
507         lockoutTime = int(res[0]["lockoutTime"][0])
508
509         # The wrong password
510         creds_lockout.set_password("thatsAcomplPASS1x")
511         try:
512             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
513             self.fail()
514         except LdbError as e3:
515             (num, msg) = e3.args
516             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
517
518         res = self._check_account(userdn,
519                                   badPwdCount=3,
520                                   badPasswordTime=badPasswordTime,
521                                   logonCount=logonCount,
522                                   lastLogon=lastLogon,
523                                   lastLogonTimestamp=lastLogonTimestamp,
524                                   lockoutTime=lockoutTime,
525                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
526                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
527
528         # The wrong password
529         creds_lockout.set_password("thatsAcomplPASS1x")
530         try:
531             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
532             self.fail()
533         except LdbError as e4:
534             (num, msg) = e4.args
535             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
536
537         res = self._check_account(userdn,
538                                   badPwdCount=3,
539                                   badPasswordTime=badPasswordTime,
540                                   logonCount=logonCount,
541                                   lastLogon=lastLogon,
542                                   lastLogonTimestamp=lastLogonTimestamp,
543                                   lockoutTime=lockoutTime,
544                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
545                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
546
547         # The correct password, but we are locked out
548         creds_lockout.set_password(userpass)
549         try:
550             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
551             self.fail()
552         except LdbError as e5:
553             (num, msg) = e5.args
554             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
555
556         res = self._check_account(userdn,
557                                   badPwdCount=3,
558                                   badPasswordTime=badPasswordTime,
559                                   logonCount=logonCount,
560                                   lastLogon=lastLogon,
561                                   lastLogonTimestamp=lastLogonTimestamp,
562                                   lockoutTime=lockoutTime,
563                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
564                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
565
566         # wait for the lockout to end
567         time.sleep(self.account_lockout_duration + 1)
568         print(self.account_lockout_duration + 1)
569
570         res = self._check_account(userdn,
571                                   badPwdCount=3, effective_bad_password_count=0,
572                                   badPasswordTime=badPasswordTime,
573                                   logonCount=logonCount,
574                                   lockoutTime=lockoutTime,
575                                   lastLogon=lastLogon,
576                                   lastLogonTimestamp=lastLogonTimestamp,
577                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
578                                   msDSUserAccountControlComputed=0)
579
580         # The correct password after letting the timeout expire
581
582         creds_lockout.set_password(userpass)
583
584         creds_lockout2 = self.insta_creds(creds_lockout)
585
586         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
587         time.sleep(3)
588
589         res = self._check_account(userdn,
590                                   badPwdCount=0,
591                                   badPasswordTime=badPasswordTime,
592                                   logonCount=(logoncount_relation, logonCount),
593                                   lastLogon=(lastlogon_relation, lastLogon),
594                                   lastLogonTimestamp=lastLogonTimestamp,
595                                   lockoutTime=lockoutTime,
596                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
597                                   msDSUserAccountControlComputed=0,
598                                   msg="lastLogon is way off")
599
600         # The wrong password
601         creds_lockout.set_password("thatsAcomplPASS1x")
602         try:
603             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
604             self.fail()
605         except LdbError as e6:
606             (num, msg) = e6.args
607             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
608
609         res = self._check_account(userdn,
610                                   badPwdCount=1,
611                                   badPasswordTime=("greater", badPasswordTime),
612                                   logonCount=logonCount,
613                                   lockoutTime=lockoutTime,
614                                   lastLogon=lastLogon,
615                                   lastLogonTimestamp=lastLogonTimestamp,
616                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
617                                   msDSUserAccountControlComputed=0)
618         badPasswordTime = int(res[0]["badPasswordTime"][0])
619
620         # The wrong password
621         creds_lockout.set_password("thatsAcomplPASS1x")
622         try:
623             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
624             self.fail()
625         except LdbError as e7:
626             (num, msg) = e7.args
627             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
628
629         res = self._check_account(userdn,
630                                   badPwdCount=2,
631                                   badPasswordTime=("greater", badPasswordTime),
632                                   logonCount=logonCount,
633                                   lockoutTime=lockoutTime,
634                                   lastLogon=lastLogon,
635                                   lastLogonTimestamp=lastLogonTimestamp,
636                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
637                                   msDSUserAccountControlComputed=0)
638         badPasswordTime = int(res[0]["badPasswordTime"][0])
639
640         time.sleep(self.lockout_observation_window + 1)
641
642         res = self._check_account(userdn,
643                                   badPwdCount=2, effective_bad_password_count=0,
644                                   badPasswordTime=badPasswordTime,
645                                   logonCount=logonCount,
646                                   lockoutTime=lockoutTime,
647                                   lastLogon=lastLogon,
648                                   lastLogonTimestamp=lastLogonTimestamp,
649                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
650                                   msDSUserAccountControlComputed=0)
651
652         # The wrong password
653         creds_lockout.set_password("thatsAcomplPASS1x")
654         try:
655             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
656             self.fail()
657         except LdbError as e8:
658             (num, msg) = e8.args
659             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
660
661         res = self._check_account(userdn,
662                                   badPwdCount=1,
663                                   badPasswordTime=("greater", badPasswordTime),
664                                   logonCount=logonCount,
665                                   lockoutTime=lockoutTime,
666                                   lastLogon=lastLogon,
667                                   lastLogonTimestamp=lastLogonTimestamp,
668                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
669                                   msDSUserAccountControlComputed=0)
670         badPasswordTime = int(res[0]["badPasswordTime"][0])
671
672         # The correct password without letting the timeout expire
673         creds_lockout.set_password(userpass)
674         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
675
676         res = self._check_account(userdn,
677                                   badPwdCount=0,
678                                   badPasswordTime=badPasswordTime,
679                                   logonCount=(logoncount_relation, logonCount),
680                                   lockoutTime=lockoutTime,
681                                   lastLogon=("greater", lastLogon),
682                                   lastLogonTimestamp=lastLogonTimestamp,
683                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
684                                   msDSUserAccountControlComputed=0)
685
686 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
687     counter = itertools.count(1).next
688
689     def force_replication(self, base=None):
690         if base is None:
691             base = self.base_dn
692
693         # XXX feels like a horrendous way to do it.
694         credstring = '-U%s%%%s' % (CREDS.get_username(),
695                                    CREDS.get_password())
696         cmd = ['bin/samba-tool',
697                'drs', 'replicate',
698                RODC, RWDC, base,
699                credstring,
700                '--sync-forced']
701
702         p = subprocess.Popen(cmd,
703                              stderr=subprocess.PIPE,
704                              stdout=subprocess.PIPE)
705         stdout, stderr = p.communicate()
706         if p.returncode:
707             print("failed with code %s" % p.returncode)
708             print(' '.join(cmd))
709             print("stdout")
710             print(stdout)
711             print("stderr")
712             print(stderr)
713             raise RodcRwdcTestException()
714
715     def _check_account_initial(self, dn):
716         self.force_replication()
717         return super(RodcRwdcTests, self)._check_account_initial(dn)
718
719     def tearDown(self):
720         super(RodcRwdcTests, self).tearDown()
721         self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
722         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
723         set_auto_replication(RWDC, True)
724
725     def setUp(self):
726         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
727                              session_info=system_session(LP), lp=LP)
728
729         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
730                              session_info=system_session(LP), lp=LP)
731
732         # Define variables for BasePasswordTestCase
733         self.lp = LP
734         self.global_creds = CREDS
735         self.host = RWDC
736         self.host_url = 'ldap://%s' % RWDC
737         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
738                          credentials=self.global_creds, lp=self.lp)
739
740         super(RodcRwdcTests, self).setUp()
741         self.host = RODC
742         self.host_url = 'ldap://%s' % RODC
743         self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
744                          credentials=self.global_creds, lp=self.lp)
745
746         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
747         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
748         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
749
750         self.base_dn = self.rwdc_db.domain_dn()
751
752         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
753                                    attrs=['dsServiceName'])
754         self.service = root[0]['dsServiceName'][0]
755         self.tag = uuid.uuid4().hex
756
757         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
758         self.rwdc_db.set_dsheuristics("000000001")
759
760         set_auto_replication(RWDC, False)
761
762         # make sure DCs are synchronized before the test
763         self.force_replication()
764         self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
765         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
766
767     def delete_ldb_connections(self):
768         super(RodcRwdcTests, self).delete_ldb_connections()
769         del self.rwdc_db
770         del self.rodc_db
771
772     def assertReferral(self, fn, *args, **kwargs):
773         try:
774             fn(*args, **kwargs)
775             self.fail("failed to raise ldap referral")
776         except ldb.LdbError as e9:
777             (code, msg) = e9.args
778             self.assertEqual(code, ldb.ERR_REFERRAL,
779                              "expected referral, got %s %s" % (code, msg))
780
781     def _test_rodc_dsheuristics(self):
782         d = self.rodc_db.get_dsheuristics()
783         self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
784         self.assertReferral(self.rodc_db.set_dsheuristics, d)
785
786     def TEST_rodc_heuristics_kerberos(self):
787         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
788         self._test_rodc_dsheuristics()
789
790     def TEST_rodc_heuristics_ntlm(self):
791         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
792         self._test_rodc_dsheuristics()
793
794     def _test_add(self, objects, cross_ncs=False):
795         for o in objects:
796             dn = o['dn']
797             if cross_ncs:
798                 base = str(self.rwdc_db.get_config_basedn())
799                 controls = ["search_options:1:2"]
800                 cn = dn.split(',', 1)[0]
801                 expression = '(%s)' % cn
802             else:
803                 base = dn
804                 controls = []
805                 expression = None
806
807             try:
808                 res = self.rodc_db.search(base,
809                                           expression=expression,
810                                           scope=ldb.SCOPE_SUBTREE,
811                                           attrs=['dn'],
812                                           controls=controls)
813                 self.assertEqual(len(res), 0)
814             except ldb.LdbError as e:
815                 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
816                     raise
817
818             try:
819                 self.rwdc_db.add(o)
820             except ldb.LdbError as e:
821                 (ecode, emsg) = e.args
822                 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
823                           (ecode, emsg))
824
825             if cross_ncs:
826                 self.force_replication(base=base)
827             else:
828                 self.force_replication()
829
830             try:
831                 res = self.rodc_db.search(base,
832                                           expression=expression,
833                                           scope=ldb.SCOPE_SUBTREE,
834                                           attrs=['dn'],
835                                           controls=controls)
836                 self.assertEqual(len(res), 1)
837             except ldb.LdbError as e:
838                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
839                                     "replication seems to have failed")
840
841     def _test_add_replicated_objects(self, mode):
842         tag = "%s%s" % (self.tag, mode)
843         self._test_add([
844             {
845                 'dn': "ou=%s1,%s" % (tag, self.base_dn),
846                 "objectclass": "organizationalUnit"
847             },
848             {
849                 'dn': "cn=%s2,%s" % (tag, self.base_dn),
850                 "objectclass": "user"
851             },
852             {
853                 'dn': "cn=%s3,%s" % (tag, self.base_dn),
854                 "objectclass": "group"
855             },
856         ])
857         self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
858         self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
859         self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
860
861     def test_add_replicated_objects_kerberos(self):
862         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
863         self._test_add_replicated_objects('kerberos')
864
865     def test_add_replicated_objects_ntlm(self):
866         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
867         self._test_add_replicated_objects('ntlm')
868
869     def _test_add_replicated_connections(self, mode):
870         tag = "%s%s" % (self.tag, mode)
871         self._test_add([
872             {
873                 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
874                 "objectclass": "NTDSConnection",
875                 'enabledConnection': 'TRUE',
876                 'fromServer': self.base_dn,
877                 'options': '0'
878             },
879         ], cross_ncs=True)
880         self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
881
882     def test_add_replicated_connections_kerberos(self):
883         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
884         self._test_add_replicated_connections('kerberos')
885
886     def test_add_replicated_connections_ntlm(self):
887         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
888         self._test_add_replicated_connections('ntlm')
889
890     def _test_modify_replicated_attributes(self):
891         dn = 'CN=Guest,CN=Users,' + self.base_dn
892         value = self.tag
893         for attr in ['carLicense', 'middleName']:
894             m = ldb.Message()
895             m.dn = ldb.Dn(self.rwdc_db, dn)
896             m[attr] = ldb.MessageElement(value,
897                                          ldb.FLAG_MOD_REPLACE,
898                                          attr)
899             try:
900                 self.rwdc_db.modify(m)
901             except ldb.LdbError as e:
902                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
903                           (dn, attr, RWDC, e))
904
905             self.force_replication()
906
907             try:
908                 res = self.rodc_db.search(dn,
909                                           scope=ldb.SCOPE_SUBTREE,
910                                           attrs=[attr])
911                 results = [x[attr][0] for x in res]
912                 self.assertEqual(results, [value])
913             except ldb.LdbError as e:
914                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
915                                     "replication seems to have failed")
916
917     def test_modify_replicated_attributes_kerberos(self):
918         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
919         self._test_modify_replicated_attributes()
920
921     def test_modify_replicated_attributes_ntlm(self):
922         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
923         self._test_modify_replicated_attributes()
924
925     def _test_add_modify_delete(self):
926         dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
927         values = ["%s%s" % (i, self.tag) for i in range(3)]
928         attr = "carLicense"
929         self._test_add([
930             {
931                 'dn': dn,
932                 "objectclass": "user",
933                 attr: values[0]
934             },
935         ])
936         self.force_replication()
937         for value in values[1:]:
938
939             m = ldb.Message()
940             m.dn = ldb.Dn(self.rwdc_db, dn)
941             m[attr] = ldb.MessageElement(value,
942                                          ldb.FLAG_MOD_REPLACE,
943                                          attr)
944             try:
945                 self.rwdc_db.modify(m)
946             except ldb.LdbError as e:
947                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
948                           (dn, attr, RWDC, e))
949
950             self.force_replication()
951
952             try:
953                 res = self.rodc_db.search(dn,
954                                           scope=ldb.SCOPE_SUBTREE,
955                                           attrs=[attr])
956                 results = [x[attr][0] for x in res]
957                 self.assertEqual(results, [value])
958             except ldb.LdbError as e:
959                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
960                                     "replication seems to have failed")
961
962         self.rwdc_db.delete(dn)
963         self.force_replication()
964         try:
965             res = self.rodc_db.search(dn,
966                                       scope=ldb.SCOPE_SUBTREE,
967                                       attrs=[attr])
968             if len(res) > 0:
969                 self.fail("Failed to delete %s" % (dn))
970         except ldb.LdbError as e:
971             self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
972                              "Failed to delete %s" % (dn))
973
974     def test_add_modify_delete_kerberos(self):
975         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
976         self._test_add_modify_delete()
977
978     def test_add_modify_delete_ntlm(self):
979         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
980         self._test_add_modify_delete()
981
982     def _new_user(self):
983         username = "u%sX%s" % (self.tag[:12], self.counter())
984         password = 'password#1'
985         dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
986         o = {
987             'dn': dn,
988             "objectclass": "user",
989             'sAMAccountName': username,
990         }
991         try:
992             self.rwdc_db.add(o)
993         except ldb.LdbError as e:
994             self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
995
996         self.rwdc_db.modify_ldif("dn: %s\n"
997                                  "changetype: modify\n"
998                                  "delete: userPassword\n"
999                                  "add: userPassword\n"
1000                                  "userPassword: %s\n" % (dn, password))
1001         self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1002         return (dn, username, password)
1003
1004     def _change_password(self, user_dn, old_password, new_password):
1005         self.rwdc_db.modify_ldif(
1006             "dn: %s\n"
1007             "changetype: modify\n"
1008             "delete: userPassword\n"
1009             "userPassword: %s\n"
1010             "add: userPassword\n"
1011             "userPassword: %s\n" % (user_dn, old_password, new_password))
1012
1013     def try_ldap_logon(self, server, creds, errno=None):
1014         try:
1015             tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1016                           session_info=system_session(LP), lp=LP)
1017             if errno is not None:
1018                 self.fail("logon failed to fail with ldb error %s" % errno)
1019         except ldb.LdbError as e10:
1020             (code, msg) = e10.args
1021             if code != errno:
1022                 if errno is None:
1023                     self.fail("logon incorrectly raised ldb error (code=%s)" %
1024                               code)
1025                 else:
1026                     self.fail("logon failed to raise correct ldb error"
1027                               "Expected: %s Got: %s" %
1028                               (errno, code))
1029
1030
1031     def zero_min_password_age(self):
1032         min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1033         if min_pwd_age != 0:
1034             self.rwdc_db.set_minPwdAge('0')
1035
1036     def _test_ldap_change_password(self, errno=None):
1037         self.zero_min_password_age()
1038
1039         dn, username, password = self._new_user()
1040         creds1 = make_creds(username, password)
1041
1042         # With NTLM, this should fail on RODC before replication,
1043         # because the user isn't known.
1044         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1045         self.force_replication()
1046
1047         # Now the user is replicated to RODC, so logon should work
1048         self.try_ldap_logon(RODC, creds1)
1049
1050         passwords = ['password#%s' % i for i in range(1, 6)]
1051         for prev, password in zip(passwords[:-1], passwords[1:]):
1052             self._change_password(dn, prev, password)
1053
1054         # The password has changed enough times to make the old
1055         # password invalid (though with kerberos that doesn't matter).
1056         # For NTLM, the old creds should always fail
1057         self.try_ldap_logon(RODC, creds1, errno)
1058         self.try_ldap_logon(RWDC, creds1, errno)
1059
1060         creds2 = make_creds(username, password)
1061
1062         # new creds work straight away with NTLM, because although it
1063         # doesn't have the password, it knows the user and forwards
1064         # the query.
1065         self.try_ldap_logon(RODC, creds2)
1066         self.try_ldap_logon(RWDC, creds2)
1067
1068         self.force_replication()
1069
1070         # After another replication check RODC still works and fails,
1071         # as appropriate to various creds
1072         self.try_ldap_logon(RODC, creds2)
1073         self.try_ldap_logon(RODC, creds1, errno)
1074
1075         prev = password
1076         password = 'password#6'
1077         self._change_password(dn, prev, password)
1078         creds3 = make_creds(username, password)
1079
1080         # previous password should still work.
1081         self.try_ldap_logon(RWDC, creds2)
1082         self.try_ldap_logon(RODC, creds2)
1083
1084         # new password should still work.
1085         self.try_ldap_logon(RWDC, creds3)
1086         self.try_ldap_logon(RODC, creds3)
1087
1088         # old password should still fail (but not on kerberos).
1089         self.try_ldap_logon(RWDC, creds1, errno)
1090         self.try_ldap_logon(RODC, creds1, errno)
1091
1092     def test_ldap_change_password_kerberos(self):
1093         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1094         self._test_ldap_change_password()
1095
1096     def test_ldap_change_password_ntlm(self):
1097         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1098         self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1099
1100     def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1101         self.zero_min_password_age()
1102
1103         res = self.rodc_db.search(self.rodc_dn,
1104                                   scope=ldb.SCOPE_BASE,
1105                                   attrs=['msDS-RevealOnDemandGroup'])
1106
1107         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1108
1109         user_dn, username, password = self._new_user()
1110         creds1 = make_creds(username, password)
1111
1112         m = ldb.Message()
1113         m.dn = ldb.Dn(self.rwdc_db, group)
1114         m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1115         self.rwdc_db.modify(m)
1116
1117         # Against Windows, this will just forward if no account exists on the KDC
1118         # Therefore, this does not error on Windows.
1119         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1120
1121         self.force_replication()
1122
1123         # The proxy case
1124         self.try_ldap_logon(RODC, creds1)
1125         preload_rodc_user(user_dn)
1126
1127         # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1128         self.try_ldap_logon(RODC, creds1)
1129
1130         passwords = ['password#%s' % i for i in range(1, 6)]
1131         for prev, password in zip(passwords[:-1], passwords[1:]):
1132             self._change_password(user_dn, prev, password)
1133
1134         # The password has changed enough times to make the old
1135         # password invalid, but the RODC shouldn't know that.
1136         self.try_ldap_logon(RODC, creds1)
1137         self.try_ldap_logon(RWDC, creds1, errno)
1138
1139         creds2 = make_creds(username, password)
1140         self.try_ldap_logon(RWDC, creds2)
1141         # We can forward WRONG_PASSWORD over NTLM.
1142         # This SHOULD succeed.
1143         self.try_ldap_logon(RODC, creds2)
1144
1145
1146     def test_change_password_reveal_on_demand_ntlm(self):
1147         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1148         self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1149
1150     def test_change_password_reveal_on_demand_kerberos(self):
1151         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1152         self._test_ldap_change_password_reveal_on_demand()
1153
1154     def test_login_lockout_krb5(self):
1155         username = self.lockout1krb5_creds.get_username()
1156         userpass = self.lockout1krb5_creds.get_password()
1157         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1158
1159         preload_rodc_user(userdn)
1160
1161         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1162         fail_creds = self.insta_creds(self.template_creds,
1163                                       username=username,
1164                                       userpass=userpass + "X",
1165                                       kerberos_state=use_kerberos)
1166
1167         try:
1168             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1169             self.fail()
1170         except LdbError as e11:
1171             (num, msg) = e11.args
1172             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1173
1174         # Succeed to reset everything to 0
1175         success_creds = self.insta_creds(self.template_creds,
1176                                          username=username,
1177                                          userpass=userpass,
1178                                          kerberos_state=use_kerberos)
1179
1180         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1181
1182         self._test_login_lockout(self.lockout1krb5_creds)
1183
1184     def test_login_lockout_ntlm(self):
1185         username = self.lockout1ntlm_creds.get_username()
1186         userpass = self.lockout1ntlm_creds.get_password()
1187         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1188
1189         preload_rodc_user(userdn)
1190
1191         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1192         fail_creds = self.insta_creds(self.template_creds,
1193                                       username=username,
1194                                       userpass=userpass + "X",
1195                                       kerberos_state=use_kerberos)
1196
1197         try:
1198             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1199             self.fail()
1200         except LdbError as e12:
1201             (num, msg) = e12.args
1202             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1203
1204         # Succeed to reset everything to 0
1205         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1206
1207         self._test_login_lockout(self.lockout1ntlm_creds)
1208
1209     def test_multiple_logon_krb5(self):
1210         username = self.lockout1krb5_creds.get_username()
1211         userpass = self.lockout1krb5_creds.get_password()
1212         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1213
1214         preload_rodc_user(userdn)
1215
1216         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1217         fail_creds = self.insta_creds(self.template_creds,
1218                                       username=username,
1219                                       userpass=userpass + "X",
1220                                       kerberos_state=use_kerberos)
1221
1222         try:
1223             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1224             self.fail()
1225         except LdbError as e13:
1226             (num, msg) = e13.args
1227             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1228
1229         # Succeed to reset everything to 0
1230         success_creds = self.insta_creds(self.template_creds,
1231                                          username=username,
1232                                          userpass=userpass,
1233                                          kerberos_state=use_kerberos)
1234
1235         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1236
1237         self._test_multiple_logon(self.lockout1krb5_creds)
1238
1239     def test_multiple_logon_ntlm(self):
1240         username = self.lockout1ntlm_creds.get_username()
1241         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1242         userpass = self.lockout1ntlm_creds.get_password()
1243
1244         preload_rodc_user(userdn)
1245
1246         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1247         fail_creds = self.insta_creds(self.template_creds,
1248                                       username=username,
1249                                       userpass=userpass + "X",
1250                                       kerberos_state=use_kerberos)
1251
1252         try:
1253             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1254             self.fail()
1255         except LdbError as e14:
1256             (num, msg) = e14.args
1257             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1258
1259         # Succeed to reset everything to 0
1260         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1261
1262         self._test_multiple_logon(self.lockout1ntlm_creds)
1263
1264 def main():
1265     global RODC, RWDC, CREDS, LP
1266     parser = optparse.OptionParser(
1267         "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1268
1269     sambaopts = options.SambaOptions(parser)
1270     versionopts = options.VersionOptions(parser)
1271     credopts = options.CredentialsOptions(parser)
1272     subunitopts = SubunitOptions(parser)
1273
1274     parser.add_option_group(sambaopts)
1275     parser.add_option_group(versionopts)
1276     parser.add_option_group(credopts)
1277     parser.add_option_group(subunitopts)
1278
1279     opts, args = parser.parse_args()
1280
1281     LP = sambaopts.get_loadparm()
1282     CREDS = credopts.get_credentials(LP)
1283     CREDS.set_gensec_features(CREDS.get_gensec_features() |
1284                               gensec.FEATURE_SEAL)
1285
1286     try:
1287         RODC, RWDC = args
1288     except ValueError:
1289         parser.print_usage()
1290         sys.exit(1)
1291
1292     set_auto_replication(RWDC, True)
1293     try:
1294         TestProgram(module=__name__, opts=subunitopts)
1295     finally:
1296         set_auto_replication(RWDC, True)
1297
1298 main()