PEP8: fix E302: expected 2 blank lines, found 1
[bbaumbach/samba-autobuild/.git] / source4 / dsdb / tests / python / rodc_rwdc.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
5
6 How does it work when the password is changed on the RWDC?
7 """
8
9 import optparse
10 import sys
11 import base64
12 import uuid
13 import subprocess
14 import itertools
15 import time
16
17 sys.path.insert(0, "bin/python")
18 import samba
19 import ldb
20
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
23
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
30
31 import password_lockout_base
32
33
34 def passwd_encode(pw):
35     return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
36
37
38 class RodcRwdcTestException(Exception):
39     pass
40
41
42 def make_creds(username, password, kerberos_state=None):
43     # use the global CREDS as a template
44     c = Credentials()
45     c.set_username(username)
46     c.set_password(password)
47     c.set_domain(CREDS.get_domain())
48     c.set_realm(CREDS.get_realm())
49     c.set_workstation(CREDS.get_workstation())
50
51     if kerberos_state is None:
52         kerberos_state = CREDS.get_kerberos_state()
53     c.set_kerberos_state(kerberos_state)
54
55     print('-' * 73)
56     if kerberos_state == MUST_USE_KERBEROS:
57         print("we seem to be using kerberos for %s %s" % (username, password))
58     elif kerberos_state == DONT_USE_KERBEROS:
59         print("NOT using kerberos for %s %s" % (username, password))
60     else:
61         print("kerberos state is %s" % kerberos_state)
62
63     c.set_gensec_features(c.get_gensec_features() |
64                           gensec.FEATURE_SEAL)
65     return c
66
67
68 def set_auto_replication(dc, allow):
69     credstring = '-U%s%%%s' % (CREDS.get_username(),
70                                CREDS.get_password())
71
72     on_or_off = '-' if allow else '+'
73
74     for opt in ['DISABLE_INBOUND_REPL',
75                 'DISABLE_OUTBOUND_REPL']:
76         cmd = ['bin/samba-tool',
77                'drs', 'options',
78                credstring, dc,
79                "--dsa-option=%s%s" % (on_or_off, opt)]
80
81         p = subprocess.Popen(cmd,
82                              stderr=subprocess.PIPE,
83                              stdout=subprocess.PIPE)
84         stdout, stderr = p.communicate()
85         if p.returncode:
86             if 'LDAP_REFERRAL' not in stderr:
87                 raise RodcRwdcTestException()
88             print("ignoring +%s REFERRAL error; assuming %s is RODC" %
89                    (opt, dc))
90
91
92 def preload_rodc_user(user_dn):
93     credstring = '-U%s%%%s' % (CREDS.get_username(),
94                                CREDS.get_password())
95
96     set_auto_replication(RWDC, True)
97     cmd = ['bin/samba-tool',
98            'rodc', 'preload',
99            user_dn,
100            credstring,
101            '--server', RWDC, ]
102
103     print(' '.join(cmd))
104     subprocess.check_call(cmd)
105     set_auto_replication(RWDC, False)
106
107
108
109 def get_server_ref_from_samdb(samdb):
110     server_name = samdb.get_serverName()
111     res = samdb.search(server_name,
112                        scope=ldb.SCOPE_BASE,
113                        attrs=['serverReference'])
114
115     return res[0]['serverReference'][0]
116
117
118 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
119     counter = itertools.count(1).next
120
121     def _check_account_initial(self, dn):
122         self.force_replication()
123         return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
124
125     def _check_account(self, dn,
126                        badPwdCount=None,
127                        badPasswordTime=None,
128                        logonCount=None,
129                        lastLogon=None,
130                        lastLogonTimestamp=None,
131                        lockoutTime=None,
132                        userAccountControl=None,
133                        msDSUserAccountControlComputed=None,
134                        effective_bad_password_count=None,
135                        msg=None,
136                        badPwdCountOnly=False):
137         # Wait for the RWDC to get any delayed messages
138         # e.g. SendToSam or KRB5 bad passwords via winbindd
139         if (self.kerberos and isinstance(badPasswordTime, tuple) or
140             badPwdCount == 0):
141             time.sleep(5)
142
143         return super(RodcRwdcCachedTests,
144                      self)._check_account(dn, badPwdCount, badPasswordTime,
145                                           logonCount, lastLogon,
146                                           lastLogonTimestamp, lockoutTime,
147                                           userAccountControl,
148                                           msDSUserAccountControlComputed,
149                                           effective_bad_password_count, msg,
150                                           True)
151
152     def force_replication(self, base=None):
153         if base is None:
154             base = self.base_dn
155
156         # XXX feels like a horrendous way to do it.
157         credstring = '-U%s%%%s' % (CREDS.get_username(),
158                                    CREDS.get_password())
159         cmd = ['bin/samba-tool',
160                'drs', 'replicate',
161                RODC, RWDC, base,
162                credstring,
163                '--sync-forced']
164
165         p = subprocess.Popen(cmd,
166                              stderr=subprocess.PIPE,
167                              stdout=subprocess.PIPE)
168         stdout, stderr = p.communicate()
169         if p.returncode:
170             print("failed with code %s" % p.returncode)
171             print(' '.join(cmd))
172             print("stdout")
173             print(stdout)
174             print("stderr")
175             print(stderr)
176             raise RodcRwdcTestException()
177
178     def _change_password(self, user_dn, old_password, new_password):
179         self.rwdc_db.modify_ldif(
180             "dn: %s\n"
181             "changetype: modify\n"
182             "delete: userPassword\n"
183             "userPassword: %s\n"
184             "add: userPassword\n"
185             "userPassword: %s\n" % (user_dn, old_password, new_password))
186
187     def tearDown(self):
188         super(RodcRwdcCachedTests, self).tearDown()
189         set_auto_replication(RWDC, True)
190
191     def setUp(self):
192         self.kerberos = False  # To be set later
193
194         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
195                              session_info=system_session(LP), lp=LP)
196
197         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
198                              session_info=system_session(LP), lp=LP)
199
200         # Define variables for BasePasswordTestCase
201         self.lp = LP
202         self.global_creds = CREDS
203         self.host = RWDC
204         self.host_url = 'ldap://%s' % RWDC
205         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
206                          credentials=self.global_creds, lp=self.lp)
207
208         super(RodcRwdcCachedTests, self).setUp()
209         self.host_url = 'ldap://%s' % RODC
210
211         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
212         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
213         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
214
215         self.base_dn = self.rwdc_db.domain_dn()
216
217         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
218                                    attrs=['dsServiceName'])
219         self.service = root[0]['dsServiceName'][0]
220         self.tag = uuid.uuid4().hex
221
222         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
223         self.rwdc_db.set_dsheuristics("000000001")
224
225         set_auto_replication(RWDC, False)
226
227         # make sure DCs are synchronized before the test
228         self.force_replication()
229
230     def delete_ldb_connections(self):
231         super(RodcRwdcCachedTests, self).delete_ldb_connections()
232         del self.rwdc_db
233         del self.rodc_db
234
235     def test_cache_and_flush_password(self):
236         username = self.lockout1krb5_creds.get_username()
237         userpass = self.lockout1krb5_creds.get_password()
238         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
239
240         ldb_system = SamDB(session_info=system_session(self.lp),
241                            credentials=self.global_creds, lp=self.lp)
242
243         res = ldb_system.search(userdn, attrs=['unicodePwd'])
244         self.assertFalse('unicodePwd' in res[0])
245
246         preload_rodc_user(userdn)
247
248         res = ldb_system.search(userdn, attrs=['unicodePwd'])
249         self.assertTrue('unicodePwd' in res[0])
250
251         newpass = userpass + '!'
252
253         # Forcing replication should blank out password (when changed)
254         self._change_password(userdn, userpass, newpass)
255         self.force_replication()
256
257         res = ldb_system.search(userdn, attrs=['unicodePwd'])
258         self.assertFalse('unicodePwd' in res[0])
259
260     def test_login_lockout_krb5(self):
261         username = self.lockout1krb5_creds.get_username()
262         userpass = self.lockout1krb5_creds.get_password()
263         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
264
265         preload_rodc_user(userdn)
266
267         self.kerberos = True
268
269         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
270
271         res = self.rodc_db.search(self.rodc_dn,
272                                   scope=ldb.SCOPE_BASE,
273                                   attrs=['msDS-RevealOnDemandGroup'])
274
275         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
276
277         m = ldb.Message()
278         m.dn = ldb.Dn(self.rwdc_db, group)
279         m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
280         self.rwdc_db.modify(m)
281
282         m = ldb.Message()
283         m.dn = ldb.Dn(self.ldb, self.base_dn)
284
285         self.account_lockout_duration = 10
286         account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
287
288         m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
289                                                   ldb.FLAG_MOD_REPLACE,
290                                                   "lockoutDuration")
291
292         self.lockout_observation_window = 10
293         lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
294
295         m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
296                                                            ldb.FLAG_MOD_REPLACE,
297                                                            "lockOutObservationWindow")
298
299         self.rwdc_db.modify(m)
300         self.force_replication()
301
302         self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
303
304     def test_login_lockout_ntlm(self):
305         username = self.lockout1ntlm_creds.get_username()
306         userpass = self.lockout1ntlm_creds.get_password()
307         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
308
309         preload_rodc_user(userdn)
310
311         self.kerberos = False
312
313         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
314
315         res = self.rodc_db.search(self.rodc_dn,
316                                   scope=ldb.SCOPE_BASE,
317                                   attrs=['msDS-RevealOnDemandGroup'])
318
319         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
320
321         m = ldb.Message()
322         m.dn = ldb.Dn(self.rwdc_db, group)
323         m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
324         self.rwdc_db.modify(m)
325
326         self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
327
328     def test_login_lockout_not_revealed(self):
329         '''Test that SendToSam is restricted by preloaded users/groups'''
330
331         username = self.lockout1ntlm_creds.get_username()
332         userpass = self.lockout1ntlm_creds.get_password()
333         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
334
335         # Preload but do not add to revealed group
336         preload_rodc_user(userdn)
337
338         self.kerberos = False
339
340         creds = self.lockout1ntlm_creds
341
342         # Open a second LDB connection with the user credentials. Use the
343         # command line credentials for informations like the domain, the realm
344         # and the workstation.
345         creds_lockout = self.insta_creds(creds)
346
347         # The wrong password
348         creds_lockout.set_password("thatsAcomplPASS1x")
349
350         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
351
352         badPasswordTime = 0
353         logonCount = 0
354         lastLogon = 0
355         lastLogonTimestamp = 0
356         logoncount_relation = ''
357         lastlogon_relation = ''
358
359         res = self._check_account(userdn,
360                                   badPwdCount=1,
361                                   badPasswordTime=("greater", badPasswordTime),
362                                   logonCount=logonCount,
363                                   lastLogon=lastLogon,
364                                   lastLogonTimestamp=lastLogonTimestamp,
365                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
366                                   msDSUserAccountControlComputed=0,
367                                   msg='lastlogontimestamp with wrong password')
368         badPasswordTime = int(res[0]["badPasswordTime"][0])
369
370         # BadPwdCount on RODC increases alongside RWDC
371         res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
372         self.assertTrue('badPwdCount' in res[0])
373         self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
374
375         # Correct old password
376         creds_lockout.set_password(userpass)
377
378         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
379
380         # Wait for potential SendToSam...
381         time.sleep(5)
382
383         # BadPwdCount on RODC decreases, but not the RWDC
384         res = self._check_account(userdn,
385                                   badPwdCount=1,
386                                   badPasswordTime=badPasswordTime,
387                                   logonCount=(logoncount_relation, logonCount),
388                                   lastLogon=('greater', lastLogon),
389                                   lastLogonTimestamp=lastLogonTimestamp,
390                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
391                                   msDSUserAccountControlComputed=0,
392                                   msg='badPwdCount not reset on RWDC')
393
394         res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
395         self.assertTrue('badPwdCount' in res[0])
396         self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
397
398     def _test_login_lockout_rodc_rwdc(self, creds, userdn):
399         username = creds.get_username()
400         userpass = creds.get_password()
401
402         # Open a second LDB connection with the user credentials. Use the
403         # command line credentials for informations like the domain, the realm
404         # and the workstation.
405         creds_lockout = self.insta_creds(creds)
406
407         # The wrong password
408         creds_lockout.set_password("thatsAcomplPASS1x")
409
410         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
411
412         badPasswordTime = 0
413         logonCount = 0
414         lastLogon = 0
415         lastLogonTimestamp = 0
416         logoncount_relation = ''
417         lastlogon_relation = ''
418
419         res = self._check_account(userdn,
420                                   badPwdCount=1,
421                                   badPasswordTime=("greater", badPasswordTime),
422                                   logonCount=logonCount,
423                                   lastLogon=lastLogon,
424                                   lastLogonTimestamp=lastLogonTimestamp,
425                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
426                                   msDSUserAccountControlComputed=0,
427                                   msg='lastlogontimestamp with wrong password')
428         badPasswordTime = int(res[0]["badPasswordTime"][0])
429
430         # Correct old password
431         creds_lockout.set_password(userpass)
432
433         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
434
435         # lastLogonTimestamp should not change
436         # lastLogon increases if badPwdCount is non-zero (!)
437         res = self._check_account(userdn,
438                                   badPwdCount=0,
439                                   badPasswordTime=badPasswordTime,
440                                   logonCount=(logoncount_relation, logonCount),
441                                   lastLogon=('greater', lastLogon),
442                                   lastLogonTimestamp=lastLogonTimestamp,
443                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
444                                   msDSUserAccountControlComputed=0,
445                                   msg='LLTimestamp is updated to lastlogon')
446
447         logonCount = int(res[0]["logonCount"][0])
448         lastLogon = int(res[0]["lastLogon"][0])
449
450         # The wrong password
451         creds_lockout.set_password("thatsAcomplPASS1x")
452
453         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
454
455         res = self._check_account(userdn,
456                                   badPwdCount=1,
457                                   badPasswordTime=("greater", badPasswordTime),
458                                   logonCount=logonCount,
459                                   lastLogon=lastLogon,
460                                   lastLogonTimestamp=lastLogonTimestamp,
461                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
462                                   msDSUserAccountControlComputed=0)
463         badPasswordTime = int(res[0]["badPasswordTime"][0])
464
465         # The wrong password
466         creds_lockout.set_password("thatsAcomplPASS1x")
467
468         try:
469             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
470             self.fail()
471
472         except LdbError as e1:
473             (num, msg) = e1.args
474             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
475
476         res = self._check_account(userdn,
477                                   badPwdCount=2,
478                                   badPasswordTime=("greater", badPasswordTime),
479                                   logonCount=logonCount,
480                                   lastLogon=lastLogon,
481                                   lastLogonTimestamp=lastLogonTimestamp,
482                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
483                                   msDSUserAccountControlComputed=0)
484         badPasswordTime = int(res[0]["badPasswordTime"][0])
485
486         print("two failed password change")
487
488         # The wrong password
489         creds_lockout.set_password("thatsAcomplPASS1x")
490
491         try:
492             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
493             self.fail()
494
495         except LdbError as e2:
496             (num, msg) = e2.args
497             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
498
499         res = self._check_account(userdn,
500                                   badPwdCount=3,
501                                   badPasswordTime=("greater", badPasswordTime),
502                                   logonCount=logonCount,
503                                   lastLogon=lastLogon,
504                                   lastLogonTimestamp=lastLogonTimestamp,
505                                   lockoutTime=("greater", badPasswordTime),
506                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
507                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
508         badPasswordTime = int(res[0]["badPasswordTime"][0])
509         lockoutTime = int(res[0]["lockoutTime"][0])
510
511         # The wrong password
512         creds_lockout.set_password("thatsAcomplPASS1x")
513         try:
514             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
515             self.fail()
516         except LdbError as e3:
517             (num, msg) = e3.args
518             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
519
520         res = self._check_account(userdn,
521                                   badPwdCount=3,
522                                   badPasswordTime=badPasswordTime,
523                                   logonCount=logonCount,
524                                   lastLogon=lastLogon,
525                                   lastLogonTimestamp=lastLogonTimestamp,
526                                   lockoutTime=lockoutTime,
527                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
528                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
529
530         # The wrong password
531         creds_lockout.set_password("thatsAcomplPASS1x")
532         try:
533             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
534             self.fail()
535         except LdbError as e4:
536             (num, msg) = e4.args
537             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
538
539         res = self._check_account(userdn,
540                                   badPwdCount=3,
541                                   badPasswordTime=badPasswordTime,
542                                   logonCount=logonCount,
543                                   lastLogon=lastLogon,
544                                   lastLogonTimestamp=lastLogonTimestamp,
545                                   lockoutTime=lockoutTime,
546                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
547                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
548
549         # The correct password, but we are locked out
550         creds_lockout.set_password(userpass)
551         try:
552             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
553             self.fail()
554         except LdbError as e5:
555             (num, msg) = e5.args
556             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
557
558         res = self._check_account(userdn,
559                                   badPwdCount=3,
560                                   badPasswordTime=badPasswordTime,
561                                   logonCount=logonCount,
562                                   lastLogon=lastLogon,
563                                   lastLogonTimestamp=lastLogonTimestamp,
564                                   lockoutTime=lockoutTime,
565                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
566                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
567
568         # wait for the lockout to end
569         time.sleep(self.account_lockout_duration + 1)
570         print(self.account_lockout_duration + 1)
571
572         res = self._check_account(userdn,
573                                   badPwdCount=3, effective_bad_password_count=0,
574                                   badPasswordTime=badPasswordTime,
575                                   logonCount=logonCount,
576                                   lockoutTime=lockoutTime,
577                                   lastLogon=lastLogon,
578                                   lastLogonTimestamp=lastLogonTimestamp,
579                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
580                                   msDSUserAccountControlComputed=0)
581
582         # The correct password after letting the timeout expire
583
584         creds_lockout.set_password(userpass)
585
586         creds_lockout2 = self.insta_creds(creds_lockout)
587
588         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
589         time.sleep(3)
590
591         res = self._check_account(userdn,
592                                   badPwdCount=0,
593                                   badPasswordTime=badPasswordTime,
594                                   logonCount=(logoncount_relation, logonCount),
595                                   lastLogon=(lastlogon_relation, lastLogon),
596                                   lastLogonTimestamp=lastLogonTimestamp,
597                                   lockoutTime=lockoutTime,
598                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
599                                   msDSUserAccountControlComputed=0,
600                                   msg="lastLogon is way off")
601
602         # The wrong password
603         creds_lockout.set_password("thatsAcomplPASS1x")
604         try:
605             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
606             self.fail()
607         except LdbError as e6:
608             (num, msg) = e6.args
609             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
610
611         res = self._check_account(userdn,
612                                   badPwdCount=1,
613                                   badPasswordTime=("greater", badPasswordTime),
614                                   logonCount=logonCount,
615                                   lockoutTime=lockoutTime,
616                                   lastLogon=lastLogon,
617                                   lastLogonTimestamp=lastLogonTimestamp,
618                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
619                                   msDSUserAccountControlComputed=0)
620         badPasswordTime = int(res[0]["badPasswordTime"][0])
621
622         # The wrong password
623         creds_lockout.set_password("thatsAcomplPASS1x")
624         try:
625             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
626             self.fail()
627         except LdbError as e7:
628             (num, msg) = e7.args
629             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
630
631         res = self._check_account(userdn,
632                                   badPwdCount=2,
633                                   badPasswordTime=("greater", badPasswordTime),
634                                   logonCount=logonCount,
635                                   lockoutTime=lockoutTime,
636                                   lastLogon=lastLogon,
637                                   lastLogonTimestamp=lastLogonTimestamp,
638                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
639                                   msDSUserAccountControlComputed=0)
640         badPasswordTime = int(res[0]["badPasswordTime"][0])
641
642         time.sleep(self.lockout_observation_window + 1)
643
644         res = self._check_account(userdn,
645                                   badPwdCount=2, effective_bad_password_count=0,
646                                   badPasswordTime=badPasswordTime,
647                                   logonCount=logonCount,
648                                   lockoutTime=lockoutTime,
649                                   lastLogon=lastLogon,
650                                   lastLogonTimestamp=lastLogonTimestamp,
651                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
652                                   msDSUserAccountControlComputed=0)
653
654         # The wrong password
655         creds_lockout.set_password("thatsAcomplPASS1x")
656         try:
657             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
658             self.fail()
659         except LdbError as e8:
660             (num, msg) = e8.args
661             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
662
663         res = self._check_account(userdn,
664                                   badPwdCount=1,
665                                   badPasswordTime=("greater", badPasswordTime),
666                                   logonCount=logonCount,
667                                   lockoutTime=lockoutTime,
668                                   lastLogon=lastLogon,
669                                   lastLogonTimestamp=lastLogonTimestamp,
670                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
671                                   msDSUserAccountControlComputed=0)
672         badPasswordTime = int(res[0]["badPasswordTime"][0])
673
674         # The correct password without letting the timeout expire
675         creds_lockout.set_password(userpass)
676         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
677
678         res = self._check_account(userdn,
679                                   badPwdCount=0,
680                                   badPasswordTime=badPasswordTime,
681                                   logonCount=(logoncount_relation, logonCount),
682                                   lockoutTime=lockoutTime,
683                                   lastLogon=("greater", lastLogon),
684                                   lastLogonTimestamp=lastLogonTimestamp,
685                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
686                                   msDSUserAccountControlComputed=0)
687
688
689 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
690     counter = itertools.count(1).next
691
692     def force_replication(self, base=None):
693         if base is None:
694             base = self.base_dn
695
696         # XXX feels like a horrendous way to do it.
697         credstring = '-U%s%%%s' % (CREDS.get_username(),
698                                    CREDS.get_password())
699         cmd = ['bin/samba-tool',
700                'drs', 'replicate',
701                RODC, RWDC, base,
702                credstring,
703                '--sync-forced']
704
705         p = subprocess.Popen(cmd,
706                              stderr=subprocess.PIPE,
707                              stdout=subprocess.PIPE)
708         stdout, stderr = p.communicate()
709         if p.returncode:
710             print("failed with code %s" % p.returncode)
711             print(' '.join(cmd))
712             print("stdout")
713             print(stdout)
714             print("stderr")
715             print(stderr)
716             raise RodcRwdcTestException()
717
718     def _check_account_initial(self, dn):
719         self.force_replication()
720         return super(RodcRwdcTests, self)._check_account_initial(dn)
721
722     def tearDown(self):
723         super(RodcRwdcTests, self).tearDown()
724         self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
725         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
726         set_auto_replication(RWDC, True)
727
728     def setUp(self):
729         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
730                              session_info=system_session(LP), lp=LP)
731
732         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
733                              session_info=system_session(LP), lp=LP)
734
735         # Define variables for BasePasswordTestCase
736         self.lp = LP
737         self.global_creds = CREDS
738         self.host = RWDC
739         self.host_url = 'ldap://%s' % RWDC
740         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
741                          credentials=self.global_creds, lp=self.lp)
742
743         super(RodcRwdcTests, self).setUp()
744         self.host = RODC
745         self.host_url = 'ldap://%s' % RODC
746         self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
747                          credentials=self.global_creds, lp=self.lp)
748
749         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
750         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
751         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
752
753         self.base_dn = self.rwdc_db.domain_dn()
754
755         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
756                                    attrs=['dsServiceName'])
757         self.service = root[0]['dsServiceName'][0]
758         self.tag = uuid.uuid4().hex
759
760         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
761         self.rwdc_db.set_dsheuristics("000000001")
762
763         set_auto_replication(RWDC, False)
764
765         # make sure DCs are synchronized before the test
766         self.force_replication()
767         self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
768         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
769
770     def delete_ldb_connections(self):
771         super(RodcRwdcTests, self).delete_ldb_connections()
772         del self.rwdc_db
773         del self.rodc_db
774
775     def assertReferral(self, fn, *args, **kwargs):
776         try:
777             fn(*args, **kwargs)
778             self.fail("failed to raise ldap referral")
779         except ldb.LdbError as e9:
780             (code, msg) = e9.args
781             self.assertEqual(code, ldb.ERR_REFERRAL,
782                              "expected referral, got %s %s" % (code, msg))
783
784     def _test_rodc_dsheuristics(self):
785         d = self.rodc_db.get_dsheuristics()
786         self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
787         self.assertReferral(self.rodc_db.set_dsheuristics, d)
788
789     def TEST_rodc_heuristics_kerberos(self):
790         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
791         self._test_rodc_dsheuristics()
792
793     def TEST_rodc_heuristics_ntlm(self):
794         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
795         self._test_rodc_dsheuristics()
796
797     def _test_add(self, objects, cross_ncs=False):
798         for o in objects:
799             dn = o['dn']
800             if cross_ncs:
801                 base = str(self.rwdc_db.get_config_basedn())
802                 controls = ["search_options:1:2"]
803                 cn = dn.split(',', 1)[0]
804                 expression = '(%s)' % cn
805             else:
806                 base = dn
807                 controls = []
808                 expression = None
809
810             try:
811                 res = self.rodc_db.search(base,
812                                           expression=expression,
813                                           scope=ldb.SCOPE_SUBTREE,
814                                           attrs=['dn'],
815                                           controls=controls)
816                 self.assertEqual(len(res), 0)
817             except ldb.LdbError as e:
818                 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
819                     raise
820
821             try:
822                 self.rwdc_db.add(o)
823             except ldb.LdbError as e:
824                 (ecode, emsg) = e.args
825                 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
826                           (ecode, emsg))
827
828             if cross_ncs:
829                 self.force_replication(base=base)
830             else:
831                 self.force_replication()
832
833             try:
834                 res = self.rodc_db.search(base,
835                                           expression=expression,
836                                           scope=ldb.SCOPE_SUBTREE,
837                                           attrs=['dn'],
838                                           controls=controls)
839                 self.assertEqual(len(res), 1)
840             except ldb.LdbError as e:
841                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
842                                     "replication seems to have failed")
843
844     def _test_add_replicated_objects(self, mode):
845         tag = "%s%s" % (self.tag, mode)
846         self._test_add([
847             {
848                 'dn': "ou=%s1,%s" % (tag, self.base_dn),
849                 "objectclass": "organizationalUnit"
850             },
851             {
852                 'dn': "cn=%s2,%s" % (tag, self.base_dn),
853                 "objectclass": "user"
854             },
855             {
856                 'dn': "cn=%s3,%s" % (tag, self.base_dn),
857                 "objectclass": "group"
858             },
859         ])
860         self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
861         self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
862         self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
863
864     def test_add_replicated_objects_kerberos(self):
865         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
866         self._test_add_replicated_objects('kerberos')
867
868     def test_add_replicated_objects_ntlm(self):
869         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
870         self._test_add_replicated_objects('ntlm')
871
872     def _test_add_replicated_connections(self, mode):
873         tag = "%s%s" % (self.tag, mode)
874         self._test_add([
875             {
876                 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
877                 "objectclass": "NTDSConnection",
878                 'enabledConnection': 'TRUE',
879                 'fromServer': self.base_dn,
880                 'options': '0'
881             },
882         ], cross_ncs=True)
883         self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
884
885     def test_add_replicated_connections_kerberos(self):
886         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
887         self._test_add_replicated_connections('kerberos')
888
889     def test_add_replicated_connections_ntlm(self):
890         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
891         self._test_add_replicated_connections('ntlm')
892
893     def _test_modify_replicated_attributes(self):
894         dn = 'CN=Guest,CN=Users,' + self.base_dn
895         value = self.tag
896         for attr in ['carLicense', 'middleName']:
897             m = ldb.Message()
898             m.dn = ldb.Dn(self.rwdc_db, dn)
899             m[attr] = ldb.MessageElement(value,
900                                          ldb.FLAG_MOD_REPLACE,
901                                          attr)
902             try:
903                 self.rwdc_db.modify(m)
904             except ldb.LdbError as e:
905                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
906                           (dn, attr, RWDC, e))
907
908             self.force_replication()
909
910             try:
911                 res = self.rodc_db.search(dn,
912                                           scope=ldb.SCOPE_SUBTREE,
913                                           attrs=[attr])
914                 results = [x[attr][0] for x in res]
915                 self.assertEqual(results, [value])
916             except ldb.LdbError as e:
917                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
918                                     "replication seems to have failed")
919
920     def test_modify_replicated_attributes_kerberos(self):
921         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
922         self._test_modify_replicated_attributes()
923
924     def test_modify_replicated_attributes_ntlm(self):
925         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
926         self._test_modify_replicated_attributes()
927
928     def _test_add_modify_delete(self):
929         dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
930         values = ["%s%s" % (i, self.tag) for i in range(3)]
931         attr = "carLicense"
932         self._test_add([
933             {
934                 'dn': dn,
935                 "objectclass": "user",
936                 attr: values[0]
937             },
938         ])
939         self.force_replication()
940         for value in values[1:]:
941
942             m = ldb.Message()
943             m.dn = ldb.Dn(self.rwdc_db, dn)
944             m[attr] = ldb.MessageElement(value,
945                                          ldb.FLAG_MOD_REPLACE,
946                                          attr)
947             try:
948                 self.rwdc_db.modify(m)
949             except ldb.LdbError as e:
950                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
951                           (dn, attr, RWDC, e))
952
953             self.force_replication()
954
955             try:
956                 res = self.rodc_db.search(dn,
957                                           scope=ldb.SCOPE_SUBTREE,
958                                           attrs=[attr])
959                 results = [x[attr][0] for x in res]
960                 self.assertEqual(results, [value])
961             except ldb.LdbError as e:
962                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
963                                     "replication seems to have failed")
964
965         self.rwdc_db.delete(dn)
966         self.force_replication()
967         try:
968             res = self.rodc_db.search(dn,
969                                       scope=ldb.SCOPE_SUBTREE,
970                                       attrs=[attr])
971             if len(res) > 0:
972                 self.fail("Failed to delete %s" % (dn))
973         except ldb.LdbError as e:
974             self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
975                              "Failed to delete %s" % (dn))
976
977     def test_add_modify_delete_kerberos(self):
978         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
979         self._test_add_modify_delete()
980
981     def test_add_modify_delete_ntlm(self):
982         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
983         self._test_add_modify_delete()
984
985     def _new_user(self):
986         username = "u%sX%s" % (self.tag[:12], self.counter())
987         password = 'password#1'
988         dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
989         o = {
990             'dn': dn,
991             "objectclass": "user",
992             'sAMAccountName': username,
993         }
994         try:
995             self.rwdc_db.add(o)
996         except ldb.LdbError as e:
997             self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
998
999         self.rwdc_db.modify_ldif("dn: %s\n"
1000                                  "changetype: modify\n"
1001                                  "delete: userPassword\n"
1002                                  "add: userPassword\n"
1003                                  "userPassword: %s\n" % (dn, password))
1004         self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1005         return (dn, username, password)
1006
1007     def _change_password(self, user_dn, old_password, new_password):
1008         self.rwdc_db.modify_ldif(
1009             "dn: %s\n"
1010             "changetype: modify\n"
1011             "delete: userPassword\n"
1012             "userPassword: %s\n"
1013             "add: userPassword\n"
1014             "userPassword: %s\n" % (user_dn, old_password, new_password))
1015
1016     def try_ldap_logon(self, server, creds, errno=None):
1017         try:
1018             tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1019                           session_info=system_session(LP), lp=LP)
1020             if errno is not None:
1021                 self.fail("logon failed to fail with ldb error %s" % errno)
1022         except ldb.LdbError as e10:
1023             (code, msg) = e10.args
1024             if code != errno:
1025                 if errno is None:
1026                     self.fail("logon incorrectly raised ldb error (code=%s)" %
1027                               code)
1028                 else:
1029                     self.fail("logon failed to raise correct ldb error"
1030                               "Expected: %s Got: %s" %
1031                               (errno, code))
1032
1033
1034     def zero_min_password_age(self):
1035         min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1036         if min_pwd_age != 0:
1037             self.rwdc_db.set_minPwdAge('0')
1038
1039     def _test_ldap_change_password(self, errno=None):
1040         self.zero_min_password_age()
1041
1042         dn, username, password = self._new_user()
1043         creds1 = make_creds(username, password)
1044
1045         # With NTLM, this should fail on RODC before replication,
1046         # because the user isn't known.
1047         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1048         self.force_replication()
1049
1050         # Now the user is replicated to RODC, so logon should work
1051         self.try_ldap_logon(RODC, creds1)
1052
1053         passwords = ['password#%s' % i for i in range(1, 6)]
1054         for prev, password in zip(passwords[:-1], passwords[1:]):
1055             self._change_password(dn, prev, password)
1056
1057         # The password has changed enough times to make the old
1058         # password invalid (though with kerberos that doesn't matter).
1059         # For NTLM, the old creds should always fail
1060         self.try_ldap_logon(RODC, creds1, errno)
1061         self.try_ldap_logon(RWDC, creds1, errno)
1062
1063         creds2 = make_creds(username, password)
1064
1065         # new creds work straight away with NTLM, because although it
1066         # doesn't have the password, it knows the user and forwards
1067         # the query.
1068         self.try_ldap_logon(RODC, creds2)
1069         self.try_ldap_logon(RWDC, creds2)
1070
1071         self.force_replication()
1072
1073         # After another replication check RODC still works and fails,
1074         # as appropriate to various creds
1075         self.try_ldap_logon(RODC, creds2)
1076         self.try_ldap_logon(RODC, creds1, errno)
1077
1078         prev = password
1079         password = 'password#6'
1080         self._change_password(dn, prev, password)
1081         creds3 = make_creds(username, password)
1082
1083         # previous password should still work.
1084         self.try_ldap_logon(RWDC, creds2)
1085         self.try_ldap_logon(RODC, creds2)
1086
1087         # new password should still work.
1088         self.try_ldap_logon(RWDC, creds3)
1089         self.try_ldap_logon(RODC, creds3)
1090
1091         # old password should still fail (but not on kerberos).
1092         self.try_ldap_logon(RWDC, creds1, errno)
1093         self.try_ldap_logon(RODC, creds1, errno)
1094
1095     def test_ldap_change_password_kerberos(self):
1096         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1097         self._test_ldap_change_password()
1098
1099     def test_ldap_change_password_ntlm(self):
1100         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1101         self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1102
1103     def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1104         self.zero_min_password_age()
1105
1106         res = self.rodc_db.search(self.rodc_dn,
1107                                   scope=ldb.SCOPE_BASE,
1108                                   attrs=['msDS-RevealOnDemandGroup'])
1109
1110         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1111
1112         user_dn, username, password = self._new_user()
1113         creds1 = make_creds(username, password)
1114
1115         m = ldb.Message()
1116         m.dn = ldb.Dn(self.rwdc_db, group)
1117         m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1118         self.rwdc_db.modify(m)
1119
1120         # Against Windows, this will just forward if no account exists on the KDC
1121         # Therefore, this does not error on Windows.
1122         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1123
1124         self.force_replication()
1125
1126         # The proxy case
1127         self.try_ldap_logon(RODC, creds1)
1128         preload_rodc_user(user_dn)
1129
1130         # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1131         self.try_ldap_logon(RODC, creds1)
1132
1133         passwords = ['password#%s' % i for i in range(1, 6)]
1134         for prev, password in zip(passwords[:-1], passwords[1:]):
1135             self._change_password(user_dn, prev, password)
1136
1137         # The password has changed enough times to make the old
1138         # password invalid, but the RODC shouldn't know that.
1139         self.try_ldap_logon(RODC, creds1)
1140         self.try_ldap_logon(RWDC, creds1, errno)
1141
1142         creds2 = make_creds(username, password)
1143         self.try_ldap_logon(RWDC, creds2)
1144         # We can forward WRONG_PASSWORD over NTLM.
1145         # This SHOULD succeed.
1146         self.try_ldap_logon(RODC, creds2)
1147
1148
1149     def test_change_password_reveal_on_demand_ntlm(self):
1150         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1151         self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1152
1153     def test_change_password_reveal_on_demand_kerberos(self):
1154         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1155         self._test_ldap_change_password_reveal_on_demand()
1156
1157     def test_login_lockout_krb5(self):
1158         username = self.lockout1krb5_creds.get_username()
1159         userpass = self.lockout1krb5_creds.get_password()
1160         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1161
1162         preload_rodc_user(userdn)
1163
1164         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1165         fail_creds = self.insta_creds(self.template_creds,
1166                                       username=username,
1167                                       userpass=userpass + "X",
1168                                       kerberos_state=use_kerberos)
1169
1170         try:
1171             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1172             self.fail()
1173         except LdbError as e11:
1174             (num, msg) = e11.args
1175             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1176
1177         # Succeed to reset everything to 0
1178         success_creds = self.insta_creds(self.template_creds,
1179                                          username=username,
1180                                          userpass=userpass,
1181                                          kerberos_state=use_kerberos)
1182
1183         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1184
1185         self._test_login_lockout(self.lockout1krb5_creds)
1186
1187     def test_login_lockout_ntlm(self):
1188         username = self.lockout1ntlm_creds.get_username()
1189         userpass = self.lockout1ntlm_creds.get_password()
1190         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1191
1192         preload_rodc_user(userdn)
1193
1194         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1195         fail_creds = self.insta_creds(self.template_creds,
1196                                       username=username,
1197                                       userpass=userpass + "X",
1198                                       kerberos_state=use_kerberos)
1199
1200         try:
1201             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1202             self.fail()
1203         except LdbError as e12:
1204             (num, msg) = e12.args
1205             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1206
1207         # Succeed to reset everything to 0
1208         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1209
1210         self._test_login_lockout(self.lockout1ntlm_creds)
1211
1212     def test_multiple_logon_krb5(self):
1213         username = self.lockout1krb5_creds.get_username()
1214         userpass = self.lockout1krb5_creds.get_password()
1215         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1216
1217         preload_rodc_user(userdn)
1218
1219         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1220         fail_creds = self.insta_creds(self.template_creds,
1221                                       username=username,
1222                                       userpass=userpass + "X",
1223                                       kerberos_state=use_kerberos)
1224
1225         try:
1226             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1227             self.fail()
1228         except LdbError as e13:
1229             (num, msg) = e13.args
1230             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1231
1232         # Succeed to reset everything to 0
1233         success_creds = self.insta_creds(self.template_creds,
1234                                          username=username,
1235                                          userpass=userpass,
1236                                          kerberos_state=use_kerberos)
1237
1238         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1239
1240         self._test_multiple_logon(self.lockout1krb5_creds)
1241
1242     def test_multiple_logon_ntlm(self):
1243         username = self.lockout1ntlm_creds.get_username()
1244         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1245         userpass = self.lockout1ntlm_creds.get_password()
1246
1247         preload_rodc_user(userdn)
1248
1249         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1250         fail_creds = self.insta_creds(self.template_creds,
1251                                       username=username,
1252                                       userpass=userpass + "X",
1253                                       kerberos_state=use_kerberos)
1254
1255         try:
1256             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1257             self.fail()
1258         except LdbError as e14:
1259             (num, msg) = e14.args
1260             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1261
1262         # Succeed to reset everything to 0
1263         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1264
1265         self._test_multiple_logon(self.lockout1ntlm_creds)
1266
1267
1268 def main():
1269     global RODC, RWDC, CREDS, LP
1270     parser = optparse.OptionParser(
1271         "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1272
1273     sambaopts = options.SambaOptions(parser)
1274     versionopts = options.VersionOptions(parser)
1275     credopts = options.CredentialsOptions(parser)
1276     subunitopts = SubunitOptions(parser)
1277
1278     parser.add_option_group(sambaopts)
1279     parser.add_option_group(versionopts)
1280     parser.add_option_group(credopts)
1281     parser.add_option_group(subunitopts)
1282
1283     opts, args = parser.parse_args()
1284
1285     LP = sambaopts.get_loadparm()
1286     CREDS = credopts.get_credentials(LP)
1287     CREDS.set_gensec_features(CREDS.get_gensec_features() |
1288                               gensec.FEATURE_SEAL)
1289
1290     try:
1291         RODC, RWDC = args
1292     except ValueError:
1293         parser.print_usage()
1294         sys.exit(1)
1295
1296     set_auto_replication(RWDC, True)
1297     try:
1298         TestProgram(module=__name__, opts=subunitopts)
1299     finally:
1300         set_auto_replication(RWDC, True)
1301
1302 main()