PEP8: fix E127: continuation line over-indented for visual indent
[samba.git] / source4 / dsdb / tests / python / rodc_rwdc.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
5
6 How does it work when the password is changed on the RWDC?
7 """
8
9 import optparse
10 import sys
11 import base64
12 import uuid
13 import subprocess
14 import itertools
15 import time
16
17 sys.path.insert(0, "bin/python")
18 import samba
19 import ldb
20
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
23
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
30
31 import password_lockout_base
32
33 def passwd_encode(pw):
34     return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
35
36
37 class RodcRwdcTestException(Exception):
38     pass
39
40
41 def make_creds(username, password, kerberos_state=None):
42     # use the global CREDS as a template
43     c = Credentials()
44     c.set_username(username)
45     c.set_password(password)
46     c.set_domain(CREDS.get_domain())
47     c.set_realm(CREDS.get_realm())
48     c.set_workstation(CREDS.get_workstation())
49
50     if kerberos_state is None:
51         kerberos_state = CREDS.get_kerberos_state()
52     c.set_kerberos_state(kerberos_state)
53
54     print('-' * 73)
55     if kerberos_state == MUST_USE_KERBEROS:
56         print("we seem to be using kerberos for %s %s" % (username, password))
57     elif kerberos_state == DONT_USE_KERBEROS:
58         print("NOT using kerberos for %s %s" % (username, password))
59     else:
60         print("kerberos state is %s" % kerberos_state)
61
62     c.set_gensec_features(c.get_gensec_features() |
63                           gensec.FEATURE_SEAL)
64     return c
65
66
67 def set_auto_replication(dc, allow):
68     credstring = '-U%s%%%s' % (CREDS.get_username(),
69                                CREDS.get_password())
70
71     on_or_off = '-' if allow else '+'
72
73     for opt in ['DISABLE_INBOUND_REPL',
74                 'DISABLE_OUTBOUND_REPL']:
75         cmd = ['bin/samba-tool',
76                'drs', 'options',
77                credstring, dc,
78                "--dsa-option=%s%s" % (on_or_off, opt)]
79
80         p = subprocess.Popen(cmd,
81                              stderr=subprocess.PIPE,
82                              stdout=subprocess.PIPE)
83         stdout, stderr = p.communicate()
84         if p.returncode:
85             if 'LDAP_REFERRAL' not in stderr:
86                 raise RodcRwdcTestException()
87             print ("ignoring +%s REFERRAL error; assuming %s is RODC" %
88                    (opt, dc))
89
90
91 def preload_rodc_user(user_dn):
92     credstring = '-U%s%%%s' % (CREDS.get_username(),
93                                CREDS.get_password())
94
95     set_auto_replication(RWDC, True)
96     cmd = ['bin/samba-tool',
97            'rodc', 'preload',
98            user_dn,
99            credstring,
100            '--server', RWDC,]
101
102     print(' '.join(cmd))
103     subprocess.check_call(cmd)
104     set_auto_replication(RWDC, False)
105
106
107
108 def get_server_ref_from_samdb(samdb):
109     server_name = samdb.get_serverName()
110     res = samdb.search(server_name,
111                        scope=ldb.SCOPE_BASE,
112                        attrs=['serverReference'])
113
114     return res[0]['serverReference'][0]
115
116 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
117     counter = itertools.count(1).next
118
119     def _check_account_initial(self, dn):
120         self.force_replication()
121         return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
122
123     def _check_account(self, dn,
124                        badPwdCount=None,
125                        badPasswordTime=None,
126                        logonCount=None,
127                        lastLogon=None,
128                        lastLogonTimestamp=None,
129                        lockoutTime=None,
130                        userAccountControl=None,
131                        msDSUserAccountControlComputed=None,
132                        effective_bad_password_count=None,
133                        msg=None,
134                        badPwdCountOnly=False):
135         # Wait for the RWDC to get any delayed messages
136         # e.g. SendToSam or KRB5 bad passwords via winbindd
137         if (self.kerberos and isinstance(badPasswordTime, tuple) or
138             badPwdCount == 0):
139             time.sleep(5)
140
141         return super(RodcRwdcCachedTests,
142                      self)._check_account(dn, badPwdCount, badPasswordTime,
143                                           logonCount, lastLogon,
144                                           lastLogonTimestamp, lockoutTime,
145                                           userAccountControl,
146                                           msDSUserAccountControlComputed,
147                                           effective_bad_password_count, msg,
148                                           True)
149
150     def force_replication(self, base=None):
151         if base is None:
152             base = self.base_dn
153
154         # XXX feels like a horrendous way to do it.
155         credstring = '-U%s%%%s' % (CREDS.get_username(),
156                                    CREDS.get_password())
157         cmd = ['bin/samba-tool',
158                'drs', 'replicate',
159                RODC, RWDC, base,
160                credstring,
161                '--sync-forced']
162
163         p = subprocess.Popen(cmd,
164                              stderr=subprocess.PIPE,
165                              stdout=subprocess.PIPE)
166         stdout, stderr = p.communicate()
167         if p.returncode:
168             print("failed with code %s" % p.returncode)
169             print(' '.join(cmd))
170             print("stdout")
171             print(stdout)
172             print("stderr")
173             print(stderr)
174             raise RodcRwdcTestException()
175
176     def _change_password(self, user_dn, old_password, new_password):
177         self.rwdc_db.modify_ldif(
178             "dn: %s\n"
179             "changetype: modify\n"
180             "delete: userPassword\n"
181             "userPassword: %s\n"
182             "add: userPassword\n"
183             "userPassword: %s\n" % (user_dn, old_password, new_password))
184
185     def tearDown(self):
186         super(RodcRwdcCachedTests, self).tearDown()
187         set_auto_replication(RWDC, True)
188
189     def setUp(self):
190         self.kerberos = False # To be set later
191
192         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
193                              session_info=system_session(LP), lp=LP)
194
195         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
196                              session_info=system_session(LP), lp=LP)
197
198         # Define variables for BasePasswordTestCase
199         self.lp = LP
200         self.global_creds = CREDS
201         self.host = RWDC
202         self.host_url = 'ldap://%s' % RWDC
203         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
204                          credentials=self.global_creds, lp=self.lp)
205
206         super(RodcRwdcCachedTests, self).setUp()
207         self.host_url = 'ldap://%s' % RODC
208
209         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
210         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
211         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
212
213         self.base_dn = self.rwdc_db.domain_dn()
214
215         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
216                                    attrs=['dsServiceName'])
217         self.service = root[0]['dsServiceName'][0]
218         self.tag = uuid.uuid4().hex
219
220         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
221         self.rwdc_db.set_dsheuristics("000000001")
222
223         set_auto_replication(RWDC, False)
224
225         # make sure DCs are synchronized before the test
226         self.force_replication()
227
228     def delete_ldb_connections(self):
229         super(RodcRwdcCachedTests, self).delete_ldb_connections()
230         del self.rwdc_db
231         del self.rodc_db
232
233     def test_cache_and_flush_password(self):
234         username = self.lockout1krb5_creds.get_username()
235         userpass = self.lockout1krb5_creds.get_password()
236         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
237
238         ldb_system = SamDB(session_info=system_session(self.lp),
239                            credentials=self.global_creds, lp=self.lp)
240
241         res = ldb_system.search(userdn, attrs=['unicodePwd'])
242         self.assertFalse('unicodePwd' in res[0])
243
244         preload_rodc_user(userdn)
245
246         res = ldb_system.search(userdn, attrs=['unicodePwd'])
247         self.assertTrue('unicodePwd' in res[0])
248
249         newpass = userpass + '!'
250
251         # Forcing replication should blank out password (when changed)
252         self._change_password(userdn, userpass, newpass)
253         self.force_replication()
254
255         res = ldb_system.search(userdn, attrs=['unicodePwd'])
256         self.assertFalse('unicodePwd' in res[0])
257
258     def test_login_lockout_krb5(self):
259         username = self.lockout1krb5_creds.get_username()
260         userpass = self.lockout1krb5_creds.get_password()
261         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
262
263         preload_rodc_user(userdn)
264
265         self.kerberos = True
266
267         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
268
269         res = self.rodc_db.search(self.rodc_dn,
270                                   scope=ldb.SCOPE_BASE,
271                                   attrs=['msDS-RevealOnDemandGroup'])
272
273         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
274
275         m = ldb.Message()
276         m.dn = ldb.Dn(self.rwdc_db, group)
277         m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
278         self.rwdc_db.modify(m)
279
280         m = ldb.Message()
281         m.dn = ldb.Dn(self.ldb, self.base_dn)
282
283         self.account_lockout_duration = 10
284         account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
285
286         m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
287                                                   ldb.FLAG_MOD_REPLACE,
288                                                   "lockoutDuration")
289
290         self.lockout_observation_window = 10
291         lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
292
293         m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
294                                                            ldb.FLAG_MOD_REPLACE,
295                                                            "lockOutObservationWindow")
296
297         self.rwdc_db.modify(m)
298         self.force_replication()
299
300         self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
301
302     def test_login_lockout_ntlm(self):
303         username = self.lockout1ntlm_creds.get_username()
304         userpass = self.lockout1ntlm_creds.get_password()
305         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
306
307         preload_rodc_user(userdn)
308
309         self.kerberos = False
310
311         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
312
313         res = self.rodc_db.search(self.rodc_dn,
314                                   scope=ldb.SCOPE_BASE,
315                                   attrs=['msDS-RevealOnDemandGroup'])
316
317         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
318
319         m = ldb.Message()
320         m.dn = ldb.Dn(self.rwdc_db, group)
321         m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
322         self.rwdc_db.modify(m)
323
324         self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
325
326     def test_login_lockout_not_revealed(self):
327         '''Test that SendToSam is restricted by preloaded users/groups'''
328
329         username = self.lockout1ntlm_creds.get_username()
330         userpass = self.lockout1ntlm_creds.get_password()
331         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
332
333         # Preload but do not add to revealed group
334         preload_rodc_user(userdn)
335
336         self.kerberos = False
337
338         creds = self.lockout1ntlm_creds
339
340         # Open a second LDB connection with the user credentials. Use the
341         # command line credentials for informations like the domain, the realm
342         # and the workstation.
343         creds_lockout = self.insta_creds(creds)
344
345         # The wrong password
346         creds_lockout.set_password("thatsAcomplPASS1x")
347
348         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
349
350         badPasswordTime = 0
351         logonCount = 0
352         lastLogon = 0
353         lastLogonTimestamp=0
354         logoncount_relation = ''
355         lastlogon_relation = ''
356
357         res = self._check_account(userdn,
358                                   badPwdCount=1,
359                                   badPasswordTime=("greater", badPasswordTime),
360                                   logonCount=logonCount,
361                                   lastLogon=lastLogon,
362                                   lastLogonTimestamp=lastLogonTimestamp,
363                                   userAccountControl=
364                                   dsdb.UF_NORMAL_ACCOUNT,
365                                   msDSUserAccountControlComputed=0,
366                                   msg='lastlogontimestamp with wrong password')
367         badPasswordTime = int(res[0]["badPasswordTime"][0])
368
369         # BadPwdCount on RODC increases alongside RWDC
370         res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
371         self.assertTrue('badPwdCount' in res[0])
372         self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
373
374         # Correct old password
375         creds_lockout.set_password(userpass)
376
377         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
378
379         # Wait for potential SendToSam...
380         time.sleep(5)
381
382         # BadPwdCount on RODC decreases, but not the RWDC
383         res = self._check_account(userdn,
384                                   badPwdCount=1,
385                                   badPasswordTime=badPasswordTime,
386                                   logonCount=(logoncount_relation, logonCount),
387                                   lastLogon=('greater', lastLogon),
388                                   lastLogonTimestamp=lastLogonTimestamp,
389                                   userAccountControl=
390                                   dsdb.UF_NORMAL_ACCOUNT,
391                                   msDSUserAccountControlComputed=0,
392                                   msg='badPwdCount not reset on RWDC')
393
394         res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
395         self.assertTrue('badPwdCount' in res[0])
396         self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
397
398     def _test_login_lockout_rodc_rwdc(self, creds, userdn):
399         username = creds.get_username()
400         userpass = creds.get_password()
401
402         # Open a second LDB connection with the user credentials. Use the
403         # command line credentials for informations like the domain, the realm
404         # and the workstation.
405         creds_lockout = self.insta_creds(creds)
406
407         # The wrong password
408         creds_lockout.set_password("thatsAcomplPASS1x")
409
410         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
411
412         badPasswordTime = 0
413         logonCount = 0
414         lastLogon = 0
415         lastLogonTimestamp=0
416         logoncount_relation = ''
417         lastlogon_relation = ''
418
419         res = self._check_account(userdn,
420                                   badPwdCount=1,
421                                   badPasswordTime=("greater", badPasswordTime),
422                                   logonCount=logonCount,
423                                   lastLogon=lastLogon,
424                                   lastLogonTimestamp=lastLogonTimestamp,
425                                   userAccountControl=
426                                   dsdb.UF_NORMAL_ACCOUNT,
427                                   msDSUserAccountControlComputed=0,
428                                   msg='lastlogontimestamp with wrong password')
429         badPasswordTime = int(res[0]["badPasswordTime"][0])
430
431         # Correct old password
432         creds_lockout.set_password(userpass)
433
434         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
435
436         # lastLogonTimestamp should not change
437         # lastLogon increases if badPwdCount is non-zero (!)
438         res = self._check_account(userdn,
439                                   badPwdCount=0,
440                                   badPasswordTime=badPasswordTime,
441                                   logonCount=(logoncount_relation, logonCount),
442                                   lastLogon=('greater', lastLogon),
443                                   lastLogonTimestamp=lastLogonTimestamp,
444                                   userAccountControl=
445                                   dsdb.UF_NORMAL_ACCOUNT,
446                                   msDSUserAccountControlComputed=0,
447                                   msg='LLTimestamp is updated to lastlogon')
448
449         logonCount = int(res[0]["logonCount"][0])
450         lastLogon = int(res[0]["lastLogon"][0])
451
452         # The wrong password
453         creds_lockout.set_password("thatsAcomplPASS1x")
454
455         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
456
457         res = self._check_account(userdn,
458                                   badPwdCount=1,
459                                   badPasswordTime=("greater", badPasswordTime),
460                                   logonCount=logonCount,
461                                   lastLogon=lastLogon,
462                                   lastLogonTimestamp=lastLogonTimestamp,
463                                   userAccountControl=
464                                   dsdb.UF_NORMAL_ACCOUNT,
465                                   msDSUserAccountControlComputed=0)
466         badPasswordTime = int(res[0]["badPasswordTime"][0])
467
468         # The wrong password
469         creds_lockout.set_password("thatsAcomplPASS1x")
470
471         try:
472             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
473             self.fail()
474
475         except LdbError as e1:
476             (num, msg) = e1.args
477             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
478
479         res = self._check_account(userdn,
480                                   badPwdCount=2,
481                                   badPasswordTime=("greater", badPasswordTime),
482                                   logonCount=logonCount,
483                                   lastLogon=lastLogon,
484                                   lastLogonTimestamp=lastLogonTimestamp,
485                                   userAccountControl=
486                                   dsdb.UF_NORMAL_ACCOUNT,
487                                   msDSUserAccountControlComputed=0)
488         badPasswordTime = int(res[0]["badPasswordTime"][0])
489
490         print("two failed password change")
491
492         # The wrong password
493         creds_lockout.set_password("thatsAcomplPASS1x")
494
495         try:
496             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
497             self.fail()
498
499         except LdbError as e2:
500             (num, msg) = e2.args
501             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
502
503         res = self._check_account(userdn,
504                                   badPwdCount=3,
505                                   badPasswordTime=("greater", badPasswordTime),
506                                   logonCount=logonCount,
507                                   lastLogon=lastLogon,
508                                   lastLogonTimestamp=lastLogonTimestamp,
509                                   lockoutTime=("greater", badPasswordTime),
510                                   userAccountControl=
511                                   dsdb.UF_NORMAL_ACCOUNT,
512                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
513         badPasswordTime = int(res[0]["badPasswordTime"][0])
514         lockoutTime = int(res[0]["lockoutTime"][0])
515
516         # The wrong password
517         creds_lockout.set_password("thatsAcomplPASS1x")
518         try:
519             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
520             self.fail()
521         except LdbError as e3:
522             (num, msg) = e3.args
523             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
524
525         res = self._check_account(userdn,
526                                   badPwdCount=3,
527                                   badPasswordTime=badPasswordTime,
528                                   logonCount=logonCount,
529                                   lastLogon=lastLogon,
530                                   lastLogonTimestamp=lastLogonTimestamp,
531                                   lockoutTime=lockoutTime,
532                                   userAccountControl=
533                                   dsdb.UF_NORMAL_ACCOUNT,
534                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
535
536         # The wrong password
537         creds_lockout.set_password("thatsAcomplPASS1x")
538         try:
539             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
540             self.fail()
541         except LdbError as e4:
542             (num, msg) = e4.args
543             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
544
545         res = self._check_account(userdn,
546                                   badPwdCount=3,
547                                   badPasswordTime=badPasswordTime,
548                                   logonCount=logonCount,
549                                   lastLogon=lastLogon,
550                                   lastLogonTimestamp=lastLogonTimestamp,
551                                   lockoutTime=lockoutTime,
552                                   userAccountControl=
553                                   dsdb.UF_NORMAL_ACCOUNT,
554                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
555
556         # The correct password, but we are locked out
557         creds_lockout.set_password(userpass)
558         try:
559             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
560             self.fail()
561         except LdbError as e5:
562             (num, msg) = e5.args
563             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
564
565         res = self._check_account(userdn,
566                                   badPwdCount=3,
567                                   badPasswordTime=badPasswordTime,
568                                   logonCount=logonCount,
569                                   lastLogon=lastLogon,
570                                   lastLogonTimestamp=lastLogonTimestamp,
571                                   lockoutTime=lockoutTime,
572                                   userAccountControl=
573                                   dsdb.UF_NORMAL_ACCOUNT,
574                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
575
576         # wait for the lockout to end
577         time.sleep(self.account_lockout_duration + 1)
578         print(self.account_lockout_duration + 1)
579
580         res = self._check_account(userdn,
581                                   badPwdCount=3, effective_bad_password_count=0,
582                                   badPasswordTime=badPasswordTime,
583                                   logonCount=logonCount,
584                                   lockoutTime=lockoutTime,
585                                   lastLogon=lastLogon,
586                                   lastLogonTimestamp=lastLogonTimestamp,
587                                   userAccountControl=
588                                   dsdb.UF_NORMAL_ACCOUNT,
589                                   msDSUserAccountControlComputed=0)
590
591         # The correct password after letting the timeout expire
592
593         creds_lockout.set_password(userpass)
594
595         creds_lockout2 = self.insta_creds(creds_lockout)
596
597         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
598         time.sleep(3)
599
600         res = self._check_account(userdn,
601                                   badPwdCount=0,
602                                   badPasswordTime=badPasswordTime,
603                                   logonCount=(logoncount_relation, logonCount),
604                                   lastLogon=(lastlogon_relation, lastLogon),
605                                   lastLogonTimestamp=lastLogonTimestamp,
606                                   lockoutTime=lockoutTime,
607                                   userAccountControl=
608                                   dsdb.UF_NORMAL_ACCOUNT,
609                                   msDSUserAccountControlComputed=0,
610                                   msg="lastLogon is way off")
611
612         # The wrong password
613         creds_lockout.set_password("thatsAcomplPASS1x")
614         try:
615             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
616             self.fail()
617         except LdbError as e6:
618             (num, msg) = e6.args
619             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
620
621         res = self._check_account(userdn,
622                                   badPwdCount=1,
623                                   badPasswordTime=("greater", badPasswordTime),
624                                   logonCount=logonCount,
625                                   lockoutTime=lockoutTime,
626                                   lastLogon=lastLogon,
627                                   lastLogonTimestamp=lastLogonTimestamp,
628                                   userAccountControl=
629                                   dsdb.UF_NORMAL_ACCOUNT,
630                                   msDSUserAccountControlComputed=0)
631         badPasswordTime = int(res[0]["badPasswordTime"][0])
632
633         # The wrong password
634         creds_lockout.set_password("thatsAcomplPASS1x")
635         try:
636             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
637             self.fail()
638         except LdbError as e7:
639             (num, msg) = e7.args
640             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
641
642         res = self._check_account(userdn,
643                                   badPwdCount=2,
644                                   badPasswordTime=("greater", badPasswordTime),
645                                   logonCount=logonCount,
646                                   lockoutTime=lockoutTime,
647                                   lastLogon=lastLogon,
648                                   lastLogonTimestamp=lastLogonTimestamp,
649                                   userAccountControl=
650                                   dsdb.UF_NORMAL_ACCOUNT,
651                                   msDSUserAccountControlComputed=0)
652         badPasswordTime = int(res[0]["badPasswordTime"][0])
653
654         time.sleep(self.lockout_observation_window + 1)
655
656         res = self._check_account(userdn,
657                                   badPwdCount=2, effective_bad_password_count=0,
658                                   badPasswordTime=badPasswordTime,
659                                   logonCount=logonCount,
660                                   lockoutTime=lockoutTime,
661                                   lastLogon=lastLogon,
662                                   lastLogonTimestamp=lastLogonTimestamp,
663                                   userAccountControl=
664                                   dsdb.UF_NORMAL_ACCOUNT,
665                                   msDSUserAccountControlComputed=0)
666
667         # The wrong password
668         creds_lockout.set_password("thatsAcomplPASS1x")
669         try:
670             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
671             self.fail()
672         except LdbError as e8:
673             (num, msg) = e8.args
674             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
675
676         res = self._check_account(userdn,
677                                   badPwdCount=1,
678                                   badPasswordTime=("greater", badPasswordTime),
679                                   logonCount=logonCount,
680                                   lockoutTime=lockoutTime,
681                                   lastLogon=lastLogon,
682                                   lastLogonTimestamp=lastLogonTimestamp,
683                                   userAccountControl=
684                                   dsdb.UF_NORMAL_ACCOUNT,
685                                   msDSUserAccountControlComputed=0)
686         badPasswordTime = int(res[0]["badPasswordTime"][0])
687
688         # The correct password without letting the timeout expire
689         creds_lockout.set_password(userpass)
690         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
691
692         res = self._check_account(userdn,
693                                   badPwdCount=0,
694                                   badPasswordTime=badPasswordTime,
695                                   logonCount=(logoncount_relation, logonCount),
696                                   lockoutTime=lockoutTime,
697                                   lastLogon=("greater", lastLogon),
698                                   lastLogonTimestamp=lastLogonTimestamp,
699                                   userAccountControl=
700                                   dsdb.UF_NORMAL_ACCOUNT,
701                                   msDSUserAccountControlComputed=0)
702
703 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
704     counter = itertools.count(1).next
705
706     def force_replication(self, base=None):
707         if base is None:
708             base = self.base_dn
709
710         # XXX feels like a horrendous way to do it.
711         credstring = '-U%s%%%s' % (CREDS.get_username(),
712                                    CREDS.get_password())
713         cmd = ['bin/samba-tool',
714                'drs', 'replicate',
715                RODC, RWDC, base,
716                credstring,
717                '--sync-forced']
718
719         p = subprocess.Popen(cmd,
720                              stderr=subprocess.PIPE,
721                              stdout=subprocess.PIPE)
722         stdout, stderr = p.communicate()
723         if p.returncode:
724             print("failed with code %s" % p.returncode)
725             print(' '.join(cmd))
726             print("stdout")
727             print(stdout)
728             print("stderr")
729             print(stderr)
730             raise RodcRwdcTestException()
731
732     def _check_account_initial(self, dn):
733         self.force_replication()
734         return super(RodcRwdcTests, self)._check_account_initial(dn)
735
736     def tearDown(self):
737         super(RodcRwdcTests, self).tearDown()
738         self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
739         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
740         set_auto_replication(RWDC, True)
741
742     def setUp(self):
743         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
744                              session_info=system_session(LP), lp=LP)
745
746         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
747                              session_info=system_session(LP), lp=LP)
748
749         # Define variables for BasePasswordTestCase
750         self.lp = LP
751         self.global_creds = CREDS
752         self.host = RWDC
753         self.host_url = 'ldap://%s' % RWDC
754         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
755                          credentials=self.global_creds, lp=self.lp)
756
757         super(RodcRwdcTests, self).setUp()
758         self.host = RODC
759         self.host_url = 'ldap://%s' % RODC
760         self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
761                          credentials=self.global_creds, lp=self.lp)
762
763         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
764         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
765         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
766
767         self.base_dn = self.rwdc_db.domain_dn()
768
769         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
770                                    attrs=['dsServiceName'])
771         self.service = root[0]['dsServiceName'][0]
772         self.tag = uuid.uuid4().hex
773
774         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
775         self.rwdc_db.set_dsheuristics("000000001")
776
777         set_auto_replication(RWDC, False)
778
779         # make sure DCs are synchronized before the test
780         self.force_replication()
781         self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
782         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
783
784     def delete_ldb_connections(self):
785         super(RodcRwdcTests, self).delete_ldb_connections()
786         del self.rwdc_db
787         del self.rodc_db
788
789     def assertReferral(self, fn, *args, **kwargs):
790         try:
791             fn(*args, **kwargs)
792             self.fail("failed to raise ldap referral")
793         except ldb.LdbError as e9:
794             (code, msg) = e9.args
795             self.assertEqual(code, ldb.ERR_REFERRAL,
796                              "expected referral, got %s %s" % (code, msg))
797
798     def _test_rodc_dsheuristics(self):
799         d = self.rodc_db.get_dsheuristics()
800         self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
801         self.assertReferral(self.rodc_db.set_dsheuristics, d)
802
803     def TEST_rodc_heuristics_kerberos(self):
804         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
805         self._test_rodc_dsheuristics()
806
807     def TEST_rodc_heuristics_ntlm(self):
808         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
809         self._test_rodc_dsheuristics()
810
811     def _test_add(self, objects, cross_ncs=False):
812         for o in objects:
813             dn = o['dn']
814             if cross_ncs:
815                 base = str(self.rwdc_db.get_config_basedn())
816                 controls = ["search_options:1:2"]
817                 cn = dn.split(',', 1)[0]
818                 expression = '(%s)' % cn
819             else:
820                 base = dn
821                 controls = []
822                 expression = None
823
824             try:
825                 res = self.rodc_db.search(base,
826                                           expression=expression,
827                                           scope=ldb.SCOPE_SUBTREE,
828                                           attrs=['dn'],
829                                           controls=controls)
830                 self.assertEqual(len(res), 0)
831             except ldb.LdbError as e:
832                 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
833                     raise
834
835             try:
836                 self.rwdc_db.add(o)
837             except ldb.LdbError as e:
838                 (ecode, emsg) = e.args
839                 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
840                           (ecode, emsg))
841
842             if cross_ncs:
843                 self.force_replication(base=base)
844             else:
845                 self.force_replication()
846
847             try:
848                 res = self.rodc_db.search(base,
849                                           expression=expression,
850                                           scope=ldb.SCOPE_SUBTREE,
851                                           attrs=['dn'],
852                                           controls=controls)
853                 self.assertEqual(len(res), 1)
854             except ldb.LdbError as e:
855                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
856                                     "replication seems to have failed")
857
858     def _test_add_replicated_objects(self, mode):
859         tag = "%s%s" % (self.tag, mode)
860         self._test_add([
861             {
862                 'dn': "ou=%s1,%s" % (tag, self.base_dn),
863                 "objectclass": "organizationalUnit"
864             },
865             {
866                 'dn': "cn=%s2,%s" % (tag, self.base_dn),
867                 "objectclass": "user"
868             },
869             {
870                 'dn': "cn=%s3,%s" % (tag, self.base_dn),
871                 "objectclass": "group"
872             },
873         ])
874         self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
875         self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
876         self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
877
878     def test_add_replicated_objects_kerberos(self):
879         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
880         self._test_add_replicated_objects('kerberos')
881
882     def test_add_replicated_objects_ntlm(self):
883         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
884         self._test_add_replicated_objects('ntlm')
885
886     def _test_add_replicated_connections(self, mode):
887         tag = "%s%s" % (self.tag, mode)
888         self._test_add([
889             {
890                 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
891                 "objectclass": "NTDSConnection",
892                 'enabledConnection': 'TRUE',
893                 'fromServer': self.base_dn,
894                 'options': '0'
895             },
896         ], cross_ncs=True)
897         self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
898
899     def test_add_replicated_connections_kerberos(self):
900         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
901         self._test_add_replicated_connections('kerberos')
902
903     def test_add_replicated_connections_ntlm(self):
904         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
905         self._test_add_replicated_connections('ntlm')
906
907     def _test_modify_replicated_attributes(self):
908         dn = 'CN=Guest,CN=Users,' + self.base_dn
909         value = self.tag
910         for attr in ['carLicense', 'middleName']:
911             m = ldb.Message()
912             m.dn = ldb.Dn(self.rwdc_db, dn)
913             m[attr] = ldb.MessageElement(value,
914                                          ldb.FLAG_MOD_REPLACE,
915                                          attr)
916             try:
917                 self.rwdc_db.modify(m)
918             except ldb.LdbError as e:
919                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
920                           (dn, attr, RWDC, e))
921
922             self.force_replication()
923
924             try:
925                 res = self.rodc_db.search(dn,
926                                           scope=ldb.SCOPE_SUBTREE,
927                                           attrs=[attr])
928                 results = [x[attr][0] for x in res]
929                 self.assertEqual(results, [value])
930             except ldb.LdbError as e:
931                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
932                                     "replication seems to have failed")
933
934     def test_modify_replicated_attributes_kerberos(self):
935         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
936         self._test_modify_replicated_attributes()
937
938     def test_modify_replicated_attributes_ntlm(self):
939         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
940         self._test_modify_replicated_attributes()
941
942     def _test_add_modify_delete(self):
943         dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
944         values = ["%s%s" % (i, self.tag) for i in range(3)]
945         attr = "carLicense"
946         self._test_add([
947             {
948                 'dn': dn,
949                 "objectclass": "user",
950                 attr: values[0]
951             },
952         ])
953         self.force_replication()
954         for value in values[1:]:
955
956             m = ldb.Message()
957             m.dn = ldb.Dn(self.rwdc_db, dn)
958             m[attr] = ldb.MessageElement(value,
959                                          ldb.FLAG_MOD_REPLACE,
960                                          attr)
961             try:
962                 self.rwdc_db.modify(m)
963             except ldb.LdbError as e:
964                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
965                           (dn, attr, RWDC, e))
966
967             self.force_replication()
968
969             try:
970                 res = self.rodc_db.search(dn,
971                                           scope=ldb.SCOPE_SUBTREE,
972                                           attrs=[attr])
973                 results = [x[attr][0] for x in res]
974                 self.assertEqual(results, [value])
975             except ldb.LdbError as e:
976                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
977                                     "replication seems to have failed")
978
979         self.rwdc_db.delete(dn)
980         self.force_replication()
981         try:
982             res = self.rodc_db.search(dn,
983                                       scope=ldb.SCOPE_SUBTREE,
984                                       attrs=[attr])
985             if len(res) > 0:
986                 self.fail("Failed to delete %s" % (dn))
987         except ldb.LdbError as e:
988             self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
989                              "Failed to delete %s" % (dn))
990
991     def test_add_modify_delete_kerberos(self):
992         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
993         self._test_add_modify_delete()
994
995     def test_add_modify_delete_ntlm(self):
996         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
997         self._test_add_modify_delete()
998
999     def _new_user(self):
1000         username = "u%sX%s" % (self.tag[:12], self.counter())
1001         password = 'password#1'
1002         dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
1003         o = {
1004             'dn': dn,
1005             "objectclass": "user",
1006             'sAMAccountName': username,
1007         }
1008         try:
1009             self.rwdc_db.add(o)
1010         except ldb.LdbError as e:
1011             self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
1012
1013         self.rwdc_db.modify_ldif("dn: %s\n"
1014                                  "changetype: modify\n"
1015                                  "delete: userPassword\n"
1016                                  "add: userPassword\n"
1017                                  "userPassword: %s\n" % (dn, password))
1018         self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1019         return (dn, username, password)
1020
1021     def _change_password(self, user_dn, old_password, new_password):
1022         self.rwdc_db.modify_ldif(
1023             "dn: %s\n"
1024             "changetype: modify\n"
1025             "delete: userPassword\n"
1026             "userPassword: %s\n"
1027             "add: userPassword\n"
1028             "userPassword: %s\n" % (user_dn, old_password, new_password))
1029
1030     def try_ldap_logon(self, server, creds, errno=None):
1031         try:
1032             tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1033                           session_info=system_session(LP), lp=LP)
1034             if errno is not None:
1035                 self.fail("logon failed to fail with ldb error %s" % errno)
1036         except ldb.LdbError as e10:
1037             (code, msg) = e10.args
1038             if code != errno:
1039                 if errno is None:
1040                     self.fail("logon incorrectly raised ldb error (code=%s)" %
1041                               code)
1042                 else:
1043                     self.fail("logon failed to raise correct ldb error"
1044                               "Expected: %s Got: %s" %
1045                               (errno, code))
1046
1047
1048     def zero_min_password_age(self):
1049         min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1050         if min_pwd_age != 0:
1051             self.rwdc_db.set_minPwdAge('0')
1052
1053     def _test_ldap_change_password(self, errno=None):
1054         self.zero_min_password_age()
1055
1056         dn, username, password = self._new_user()
1057         creds1 = make_creds(username, password)
1058
1059         # With NTLM, this should fail on RODC before replication,
1060         # because the user isn't known.
1061         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1062         self.force_replication()
1063
1064         # Now the user is replicated to RODC, so logon should work
1065         self.try_ldap_logon(RODC, creds1)
1066
1067         passwords = ['password#%s' % i for i in range(1, 6)]
1068         for prev, password in zip(passwords[:-1], passwords[1:]):
1069             self._change_password(dn, prev, password)
1070
1071         # The password has changed enough times to make the old
1072         # password invalid (though with kerberos that doesn't matter).
1073         # For NTLM, the old creds should always fail
1074         self.try_ldap_logon(RODC, creds1, errno)
1075         self.try_ldap_logon(RWDC, creds1, errno)
1076
1077         creds2 = make_creds(username, password)
1078
1079         # new creds work straight away with NTLM, because although it
1080         # doesn't have the password, it knows the user and forwards
1081         # the query.
1082         self.try_ldap_logon(RODC, creds2)
1083         self.try_ldap_logon(RWDC, creds2)
1084
1085         self.force_replication()
1086
1087         # After another replication check RODC still works and fails,
1088         # as appropriate to various creds
1089         self.try_ldap_logon(RODC, creds2)
1090         self.try_ldap_logon(RODC, creds1, errno)
1091
1092         prev = password
1093         password = 'password#6'
1094         self._change_password(dn, prev, password)
1095         creds3 = make_creds(username, password)
1096
1097         # previous password should still work.
1098         self.try_ldap_logon(RWDC, creds2)
1099         self.try_ldap_logon(RODC, creds2)
1100
1101         # new password should still work.
1102         self.try_ldap_logon(RWDC, creds3)
1103         self.try_ldap_logon(RODC, creds3)
1104
1105         # old password should still fail (but not on kerberos).
1106         self.try_ldap_logon(RWDC, creds1, errno)
1107         self.try_ldap_logon(RODC, creds1, errno)
1108
1109     def test_ldap_change_password_kerberos(self):
1110         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1111         self._test_ldap_change_password()
1112
1113     def test_ldap_change_password_ntlm(self):
1114         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1115         self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1116
1117     def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1118         self.zero_min_password_age()
1119
1120         res = self.rodc_db.search(self.rodc_dn,
1121                                   scope=ldb.SCOPE_BASE,
1122                                   attrs=['msDS-RevealOnDemandGroup'])
1123
1124         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1125
1126         user_dn, username, password = self._new_user()
1127         creds1 = make_creds(username, password)
1128
1129         m = ldb.Message()
1130         m.dn = ldb.Dn(self.rwdc_db, group)
1131         m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1132         self.rwdc_db.modify(m)
1133
1134         # Against Windows, this will just forward if no account exists on the KDC
1135         # Therefore, this does not error on Windows.
1136         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1137
1138         self.force_replication()
1139
1140         # The proxy case
1141         self.try_ldap_logon(RODC, creds1)
1142         preload_rodc_user(user_dn)
1143
1144         # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1145         self.try_ldap_logon(RODC, creds1)
1146
1147         passwords = ['password#%s' % i for i in range(1, 6)]
1148         for prev, password in zip(passwords[:-1], passwords[1:]):
1149             self._change_password(user_dn, prev, password)
1150
1151         # The password has changed enough times to make the old
1152         # password invalid, but the RODC shouldn't know that.
1153         self.try_ldap_logon(RODC, creds1)
1154         self.try_ldap_logon(RWDC, creds1, errno)
1155
1156         creds2 = make_creds(username, password)
1157         self.try_ldap_logon(RWDC, creds2)
1158         # We can forward WRONG_PASSWORD over NTLM.
1159         # This SHOULD succeed.
1160         self.try_ldap_logon(RODC, creds2)
1161
1162
1163     def test_change_password_reveal_on_demand_ntlm(self):
1164         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1165         self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1166
1167     def test_change_password_reveal_on_demand_kerberos(self):
1168         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1169         self._test_ldap_change_password_reveal_on_demand()
1170
1171     def test_login_lockout_krb5(self):
1172         username = self.lockout1krb5_creds.get_username()
1173         userpass = self.lockout1krb5_creds.get_password()
1174         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1175
1176         preload_rodc_user(userdn)
1177
1178         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1179         fail_creds = self.insta_creds(self.template_creds,
1180                                       username=username,
1181                                       userpass=userpass+"X",
1182                                       kerberos_state=use_kerberos)
1183
1184         try:
1185             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1186             self.fail()
1187         except LdbError as e11:
1188             (num, msg) = e11.args
1189             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1190
1191         # Succeed to reset everything to 0
1192         success_creds = self.insta_creds(self.template_creds,
1193                                          username=username,
1194                                          userpass=userpass,
1195                                          kerberos_state=use_kerberos)
1196
1197         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1198
1199         self._test_login_lockout(self.lockout1krb5_creds)
1200
1201     def test_login_lockout_ntlm(self):
1202         username = self.lockout1ntlm_creds.get_username()
1203         userpass = self.lockout1ntlm_creds.get_password()
1204         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1205
1206         preload_rodc_user(userdn)
1207
1208         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1209         fail_creds = self.insta_creds(self.template_creds,
1210                                       username=username,
1211                                       userpass=userpass+"X",
1212                                       kerberos_state=use_kerberos)
1213
1214         try:
1215             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1216             self.fail()
1217         except LdbError as e12:
1218             (num, msg) = e12.args
1219             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1220
1221         # Succeed to reset everything to 0
1222         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1223
1224         self._test_login_lockout(self.lockout1ntlm_creds)
1225
1226     def test_multiple_logon_krb5(self):
1227         username = self.lockout1krb5_creds.get_username()
1228         userpass = self.lockout1krb5_creds.get_password()
1229         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1230
1231         preload_rodc_user(userdn)
1232
1233         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1234         fail_creds = self.insta_creds(self.template_creds,
1235                                       username=username,
1236                                       userpass=userpass+"X",
1237                                       kerberos_state=use_kerberos)
1238
1239         try:
1240             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1241             self.fail()
1242         except LdbError as e13:
1243             (num, msg) = e13.args
1244             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1245
1246         # Succeed to reset everything to 0
1247         success_creds = self.insta_creds(self.template_creds,
1248                                          username=username,
1249                                          userpass=userpass,
1250                                          kerberos_state=use_kerberos)
1251
1252         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1253
1254         self._test_multiple_logon(self.lockout1krb5_creds)
1255
1256     def test_multiple_logon_ntlm(self):
1257         username = self.lockout1ntlm_creds.get_username()
1258         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1259         userpass = self.lockout1ntlm_creds.get_password()
1260
1261         preload_rodc_user(userdn)
1262
1263         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1264         fail_creds = self.insta_creds(self.template_creds,
1265                                       username=username,
1266                                       userpass=userpass+"X",
1267                                       kerberos_state=use_kerberos)
1268
1269         try:
1270             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1271             self.fail()
1272         except LdbError as e14:
1273             (num, msg) = e14.args
1274             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1275
1276         # Succeed to reset everything to 0
1277         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1278
1279         self._test_multiple_logon(self.lockout1ntlm_creds)
1280
1281 def main():
1282     global RODC, RWDC, CREDS, LP
1283     parser = optparse.OptionParser(
1284         "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1285
1286     sambaopts = options.SambaOptions(parser)
1287     versionopts = options.VersionOptions(parser)
1288     credopts = options.CredentialsOptions(parser)
1289     subunitopts = SubunitOptions(parser)
1290
1291     parser.add_option_group(sambaopts)
1292     parser.add_option_group(versionopts)
1293     parser.add_option_group(credopts)
1294     parser.add_option_group(subunitopts)
1295
1296     opts, args = parser.parse_args()
1297
1298     LP = sambaopts.get_loadparm()
1299     CREDS = credopts.get_credentials(LP)
1300     CREDS.set_gensec_features(CREDS.get_gensec_features() |
1301                               gensec.FEATURE_SEAL)
1302
1303     try:
1304         RODC, RWDC = args
1305     except ValueError:
1306         parser.print_usage()
1307         sys.exit(1)
1308
1309     set_auto_replication(RWDC, True)
1310     try:
1311         TestProgram(module=__name__, opts=subunitopts)
1312     finally:
1313         set_auto_replication(RWDC, True)
1314
1315 main()