c2c41634b8c5f117d516e5b634808ee824d472f6
[nivanova/samba-autobuild/.git] / source4 / dsdb / tests / python / rodc_rwdc.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 from __future__ import print_function
4 """Test communication of credentials etc, between an RODC and a RWDC.
5
6 How does it work when the password is changed on the RWDC?
7 """
8
9 import optparse
10 import sys
11 import base64
12 import uuid
13 import subprocess
14 import itertools
15 import time
16
17 sys.path.insert(0, "bin/python")
18 import samba
19 import ldb
20
21 from samba.tests.subunitrun import SubunitOptions, TestProgram
22 import samba.getopt as options
23
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
27 from samba import gensec, dsdb
28 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
29 from samba.dcerpc import security, samr
30 import os
31
32 import password_lockout_base
33
34 def adjust_cmd_for_py_version(parts):
35     if os.getenv("PYTHON", None):
36         parts.insert(0, os.environ["PYTHON"])
37     return parts
38
39 def passwd_encode(pw):
40     return base64.b64encode(('"%s"' % pw).encode('utf-16-le')).decode('utf8')
41
42
43 class RodcRwdcTestException(Exception):
44     pass
45
46
47 def make_creds(username, password, kerberos_state=None):
48     # use the global CREDS as a template
49     c = Credentials()
50     c.set_username(username)
51     c.set_password(password)
52     c.set_domain(CREDS.get_domain())
53     c.set_realm(CREDS.get_realm())
54     c.set_workstation(CREDS.get_workstation())
55
56     if kerberos_state is None:
57         kerberos_state = CREDS.get_kerberos_state()
58     c.set_kerberos_state(kerberos_state)
59
60     print('-' * 73)
61     if kerberos_state == MUST_USE_KERBEROS:
62         print("we seem to be using kerberos for %s %s" % (username, password))
63     elif kerberos_state == DONT_USE_KERBEROS:
64         print("NOT using kerberos for %s %s" % (username, password))
65     else:
66         print("kerberos state is %s" % kerberos_state)
67
68     c.set_gensec_features(c.get_gensec_features() |
69                           gensec.FEATURE_SEAL)
70     return c
71
72
73 def set_auto_replication(dc, allow):
74     credstring = '-U%s%%%s' % (CREDS.get_username(),
75                                CREDS.get_password())
76
77     on_or_off = '-' if allow else '+'
78
79     for opt in ['DISABLE_INBOUND_REPL',
80                 'DISABLE_OUTBOUND_REPL']:
81         cmd = adjust_cmd_for_py_version(['bin/samba-tool',
82                'drs', 'options',
83                credstring, dc,
84                "--dsa-option=%s%s" % (on_or_off, opt)])
85
86         p = subprocess.Popen(cmd,
87                              stderr=subprocess.PIPE,
88                              stdout=subprocess.PIPE)
89         stdout, stderr = p.communicate()
90         if p.returncode:
91             if b'LDAP_REFERRAL' not in stderr:
92                 raise RodcRwdcTestException()
93             print("ignoring +%s REFERRAL error; assuming %s is RODC" %
94                   (opt, dc))
95
96
97 def preload_rodc_user(user_dn):
98     credstring = '-U%s%%%s' % (CREDS.get_username(),
99                                CREDS.get_password())
100
101     set_auto_replication(RWDC, True)
102     cmd = adjust_cmd_for_py_version(['bin/samba-tool',
103            'rodc', 'preload',
104            user_dn,
105            credstring,
106            '--server', RWDC, ])
107
108     print(' '.join(cmd))
109     subprocess.check_call(cmd)
110     set_auto_replication(RWDC, False)
111
112
113 def get_server_ref_from_samdb(samdb):
114     server_name = samdb.get_serverName()
115     res = samdb.search(server_name,
116                        scope=ldb.SCOPE_BASE,
117                        attrs=['serverReference'])
118
119     return res[0]['serverReference'][0]
120
121
122 class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase):
123
124     def _check_account_initial(self, dn):
125         self.force_replication()
126         return super(RodcRwdcCachedTests, self)._check_account_initial(dn)
127
128     def _check_account(self, dn,
129                        badPwdCount=None,
130                        badPasswordTime=None,
131                        logonCount=None,
132                        lastLogon=None,
133                        lastLogonTimestamp=None,
134                        lockoutTime=None,
135                        userAccountControl=None,
136                        msDSUserAccountControlComputed=None,
137                        effective_bad_password_count=None,
138                        msg=None,
139                        badPwdCountOnly=False):
140         # Wait for the RWDC to get any delayed messages
141         # e.g. SendToSam or KRB5 bad passwords via winbindd
142         if (self.kerberos and isinstance(badPasswordTime, tuple) or
143             badPwdCount == 0):
144             time.sleep(5)
145
146         return super(RodcRwdcCachedTests,
147                      self)._check_account(dn, badPwdCount, badPasswordTime,
148                                           logonCount, lastLogon,
149                                           lastLogonTimestamp, lockoutTime,
150                                           userAccountControl,
151                                           msDSUserAccountControlComputed,
152                                           effective_bad_password_count, msg,
153                                           True)
154
155     def force_replication(self, base=None):
156         if base is None:
157             base = self.base_dn
158
159         # XXX feels like a horrendous way to do it.
160         credstring = '-U%s%%%s' % (CREDS.get_username(),
161                                    CREDS.get_password())
162         cmd = adjust_cmd_for_py_version(['bin/samba-tool',
163                'drs', 'replicate',
164                RODC, RWDC, base,
165                credstring,
166                '--sync-forced'])
167
168         p = subprocess.Popen(cmd,
169                              stderr=subprocess.PIPE,
170                              stdout=subprocess.PIPE)
171         stdout, stderr = p.communicate()
172         if p.returncode:
173             print("failed with code %s" % p.returncode)
174             print(' '.join(cmd))
175             print("stdout")
176             print(stdout)
177             print("stderr")
178             print(stderr)
179             raise RodcRwdcTestException()
180
181     def _change_password(self, user_dn, old_password, new_password):
182         self.rwdc_db.modify_ldif(
183             "dn: %s\n"
184             "changetype: modify\n"
185             "delete: userPassword\n"
186             "userPassword: %s\n"
187             "add: userPassword\n"
188             "userPassword: %s\n" % (user_dn, old_password, new_password))
189
190     def tearDown(self):
191         super(RodcRwdcCachedTests, self).tearDown()
192         set_auto_replication(RWDC, True)
193
194     def setUp(self):
195         self.kerberos = False  # To be set later
196
197         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
198                              session_info=system_session(LP), lp=LP)
199
200         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
201                              session_info=system_session(LP), lp=LP)
202
203         # Define variables for BasePasswordTestCase
204         self.lp = LP
205         self.global_creds = CREDS
206         self.host = RWDC
207         self.host_url = 'ldap://%s' % RWDC
208         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
209                          credentials=self.global_creds, lp=self.lp)
210
211         super(RodcRwdcCachedTests, self).setUp()
212         self.host_url = 'ldap://%s' % RODC
213
214         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
215         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
216         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
217
218         self.base_dn = self.rwdc_db.domain_dn()
219
220         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
221                                    attrs=['dsServiceName'])
222         self.service = root[0]['dsServiceName'][0]
223         self.tag = uuid.uuid4().hex
224
225         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
226         self.rwdc_db.set_dsheuristics("000000001")
227
228         set_auto_replication(RWDC, False)
229
230         # make sure DCs are synchronized before the test
231         self.force_replication()
232
233     def delete_ldb_connections(self):
234         super(RodcRwdcCachedTests, self).delete_ldb_connections()
235         del self.rwdc_db
236         del self.rodc_db
237
238     def test_cache_and_flush_password(self):
239         username = self.lockout1krb5_creds.get_username()
240         userpass = self.lockout1krb5_creds.get_password()
241         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
242
243         ldb_system = SamDB(session_info=system_session(self.lp),
244                            credentials=self.global_creds, lp=self.lp)
245
246         res = ldb_system.search(userdn, attrs=['unicodePwd'])
247         self.assertFalse('unicodePwd' in res[0])
248
249         preload_rodc_user(userdn)
250
251         res = ldb_system.search(userdn, attrs=['unicodePwd'])
252         self.assertTrue('unicodePwd' in res[0])
253
254         newpass = userpass + '!'
255
256         # Forcing replication should blank out password (when changed)
257         self._change_password(userdn, userpass, newpass)
258         self.force_replication()
259
260         res = ldb_system.search(userdn, attrs=['unicodePwd'])
261         self.assertFalse('unicodePwd' in res[0])
262
263     def test_login_lockout_krb5(self):
264         username = self.lockout1krb5_creds.get_username()
265         userpass = self.lockout1krb5_creds.get_password()
266         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
267
268         preload_rodc_user(userdn)
269
270         self.kerberos = True
271
272         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
273
274         res = self.rodc_db.search(self.rodc_dn,
275                                   scope=ldb.SCOPE_BASE,
276                                   attrs=['msDS-RevealOnDemandGroup'])
277
278         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
279
280         m = ldb.Message()
281         m.dn = ldb.Dn(self.rwdc_db, group)
282         m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
283         self.rwdc_db.modify(m)
284
285         m = ldb.Message()
286         m.dn = ldb.Dn(self.ldb, self.base_dn)
287
288         self.account_lockout_duration = 10
289         account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7))
290
291         m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks),
292                                                   ldb.FLAG_MOD_REPLACE,
293                                                   "lockoutDuration")
294
295         self.lockout_observation_window = 10
296         lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7))
297
298         m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks),
299                                                            ldb.FLAG_MOD_REPLACE,
300                                                            "lockOutObservationWindow")
301
302         self.rwdc_db.modify(m)
303         self.force_replication()
304
305         self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn)
306
307     def test_login_lockout_ntlm(self):
308         username = self.lockout1ntlm_creds.get_username()
309         userpass = self.lockout1ntlm_creds.get_password()
310         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
311
312         preload_rodc_user(userdn)
313
314         self.kerberos = False
315
316         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
317
318         res = self.rodc_db.search(self.rodc_dn,
319                                   scope=ldb.SCOPE_BASE,
320                                   attrs=['msDS-RevealOnDemandGroup'])
321
322         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
323
324         m = ldb.Message()
325         m.dn = ldb.Dn(self.rwdc_db, group)
326         m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member')
327         self.rwdc_db.modify(m)
328
329         self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn)
330
331     def test_login_lockout_not_revealed(self):
332         '''Test that SendToSam is restricted by preloaded users/groups'''
333
334         username = self.lockout1ntlm_creds.get_username()
335         userpass = self.lockout1ntlm_creds.get_password()
336         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
337
338         # Preload but do not add to revealed group
339         preload_rodc_user(userdn)
340
341         self.kerberos = False
342
343         creds = self.lockout1ntlm_creds
344
345         # Open a second LDB connection with the user credentials. Use the
346         # command line credentials for informations like the domain, the realm
347         # and the workstation.
348         creds_lockout = self.insta_creds(creds)
349
350         # The wrong password
351         creds_lockout.set_password("thatsAcomplPASS1x")
352
353         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
354
355         badPasswordTime = 0
356         logonCount = 0
357         lastLogon = 0
358         lastLogonTimestamp = 0
359         logoncount_relation = ''
360         lastlogon_relation = ''
361
362         res = self._check_account(userdn,
363                                   badPwdCount=1,
364                                   badPasswordTime=("greater", badPasswordTime),
365                                   logonCount=logonCount,
366                                   lastLogon=lastLogon,
367                                   lastLogonTimestamp=lastLogonTimestamp,
368                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
369                                   msDSUserAccountControlComputed=0,
370                                   msg='lastlogontimestamp with wrong password')
371         badPasswordTime = int(res[0]["badPasswordTime"][0])
372
373         # BadPwdCount on RODC increases alongside RWDC
374         res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
375         self.assertTrue('badPwdCount' in res[0])
376         self.assertEqual(int(res[0]['badPwdCount'][0]), 1)
377
378         # Correct old password
379         creds_lockout.set_password(userpass)
380
381         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
382
383         # Wait for potential SendToSam...
384         time.sleep(5)
385
386         # BadPwdCount on RODC decreases, but not the RWDC
387         res = self._check_account(userdn,
388                                   badPwdCount=1,
389                                   badPasswordTime=badPasswordTime,
390                                   logonCount=(logoncount_relation, logonCount),
391                                   lastLogon=('greater', lastLogon),
392                                   lastLogonTimestamp=lastLogonTimestamp,
393                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
394                                   msDSUserAccountControlComputed=0,
395                                   msg='badPwdCount not reset on RWDC')
396
397         res = self.rodc_db.search(userdn, attrs=['badPwdCount'])
398         self.assertTrue('badPwdCount' in res[0])
399         self.assertEqual(int(res[0]['badPwdCount'][0]), 0)
400
401     def _test_login_lockout_rodc_rwdc(self, creds, userdn):
402         username = creds.get_username()
403         userpass = creds.get_password()
404
405         # Open a second LDB connection with the user credentials. Use the
406         # command line credentials for informations like the domain, the realm
407         # and the workstation.
408         creds_lockout = self.insta_creds(creds)
409
410         # The wrong password
411         creds_lockout.set_password("thatsAcomplPASS1x")
412
413         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
414
415         badPasswordTime = 0
416         logonCount = 0
417         lastLogon = 0
418         lastLogonTimestamp = 0
419         logoncount_relation = ''
420         lastlogon_relation = ''
421
422         res = self._check_account(userdn,
423                                   badPwdCount=1,
424                                   badPasswordTime=("greater", badPasswordTime),
425                                   logonCount=logonCount,
426                                   lastLogon=lastLogon,
427                                   lastLogonTimestamp=lastLogonTimestamp,
428                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
429                                   msDSUserAccountControlComputed=0,
430                                   msg='lastlogontimestamp with wrong password')
431         badPasswordTime = int(res[0]["badPasswordTime"][0])
432
433         # Correct old password
434         creds_lockout.set_password(userpass)
435
436         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
437
438         # lastLogonTimestamp should not change
439         # lastLogon increases if badPwdCount is non-zero (!)
440         res = self._check_account(userdn,
441                                   badPwdCount=0,
442                                   badPasswordTime=badPasswordTime,
443                                   logonCount=(logoncount_relation, logonCount),
444                                   lastLogon=('greater', lastLogon),
445                                   lastLogonTimestamp=lastLogonTimestamp,
446                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
447                                   msDSUserAccountControlComputed=0,
448                                   msg='LLTimestamp is updated to lastlogon')
449
450         logonCount = int(res[0]["logonCount"][0])
451         lastLogon = int(res[0]["lastLogon"][0])
452
453         # The wrong password
454         creds_lockout.set_password("thatsAcomplPASS1x")
455
456         self.assertLoginFailure(self.host_url, creds_lockout, self.lp)
457
458         res = self._check_account(userdn,
459                                   badPwdCount=1,
460                                   badPasswordTime=("greater", badPasswordTime),
461                                   logonCount=logonCount,
462                                   lastLogon=lastLogon,
463                                   lastLogonTimestamp=lastLogonTimestamp,
464                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
465                                   msDSUserAccountControlComputed=0)
466         badPasswordTime = int(res[0]["badPasswordTime"][0])
467
468         # The wrong password
469         creds_lockout.set_password("thatsAcomplPASS1x")
470
471         try:
472             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
473             self.fail()
474
475         except LdbError as e1:
476             (num, msg) = e1.args
477             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
478
479         res = self._check_account(userdn,
480                                   badPwdCount=2,
481                                   badPasswordTime=("greater", badPasswordTime),
482                                   logonCount=logonCount,
483                                   lastLogon=lastLogon,
484                                   lastLogonTimestamp=lastLogonTimestamp,
485                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
486                                   msDSUserAccountControlComputed=0)
487         badPasswordTime = int(res[0]["badPasswordTime"][0])
488
489         print("two failed password change")
490
491         # The wrong password
492         creds_lockout.set_password("thatsAcomplPASS1x")
493
494         try:
495             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
496             self.fail()
497
498         except LdbError as e2:
499             (num, msg) = e2.args
500             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
501
502         res = self._check_account(userdn,
503                                   badPwdCount=3,
504                                   badPasswordTime=("greater", badPasswordTime),
505                                   logonCount=logonCount,
506                                   lastLogon=lastLogon,
507                                   lastLogonTimestamp=lastLogonTimestamp,
508                                   lockoutTime=("greater", badPasswordTime),
509                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
510                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
511         badPasswordTime = int(res[0]["badPasswordTime"][0])
512         lockoutTime = int(res[0]["lockoutTime"][0])
513
514         # The wrong password
515         creds_lockout.set_password("thatsAcomplPASS1x")
516         try:
517             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
518             self.fail()
519         except LdbError as e3:
520             (num, msg) = e3.args
521             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
522
523         res = self._check_account(userdn,
524                                   badPwdCount=3,
525                                   badPasswordTime=badPasswordTime,
526                                   logonCount=logonCount,
527                                   lastLogon=lastLogon,
528                                   lastLogonTimestamp=lastLogonTimestamp,
529                                   lockoutTime=lockoutTime,
530                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
531                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
532
533         # The wrong password
534         creds_lockout.set_password("thatsAcomplPASS1x")
535         try:
536             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
537             self.fail()
538         except LdbError as e4:
539             (num, msg) = e4.args
540             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
541
542         res = self._check_account(userdn,
543                                   badPwdCount=3,
544                                   badPasswordTime=badPasswordTime,
545                                   logonCount=logonCount,
546                                   lastLogon=lastLogon,
547                                   lastLogonTimestamp=lastLogonTimestamp,
548                                   lockoutTime=lockoutTime,
549                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
550                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
551
552         # The correct password, but we are locked out
553         creds_lockout.set_password(userpass)
554         try:
555             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
556             self.fail()
557         except LdbError as e5:
558             (num, msg) = e5.args
559             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
560
561         res = self._check_account(userdn,
562                                   badPwdCount=3,
563                                   badPasswordTime=badPasswordTime,
564                                   logonCount=logonCount,
565                                   lastLogon=lastLogon,
566                                   lastLogonTimestamp=lastLogonTimestamp,
567                                   lockoutTime=lockoutTime,
568                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
569                                   msDSUserAccountControlComputed=dsdb.UF_LOCKOUT)
570
571         # wait for the lockout to end
572         time.sleep(self.account_lockout_duration + 1)
573         print(self.account_lockout_duration + 1)
574
575         res = self._check_account(userdn,
576                                   badPwdCount=3, effective_bad_password_count=0,
577                                   badPasswordTime=badPasswordTime,
578                                   logonCount=logonCount,
579                                   lockoutTime=lockoutTime,
580                                   lastLogon=lastLogon,
581                                   lastLogonTimestamp=lastLogonTimestamp,
582                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
583                                   msDSUserAccountControlComputed=0)
584
585         # The correct password after letting the timeout expire
586
587         creds_lockout.set_password(userpass)
588
589         creds_lockout2 = self.insta_creds(creds_lockout)
590
591         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp)
592         time.sleep(3)
593
594         res = self._check_account(userdn,
595                                   badPwdCount=0,
596                                   badPasswordTime=badPasswordTime,
597                                   logonCount=(logoncount_relation, logonCount),
598                                   lastLogon=(lastlogon_relation, lastLogon),
599                                   lastLogonTimestamp=lastLogonTimestamp,
600                                   lockoutTime=lockoutTime,
601                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
602                                   msDSUserAccountControlComputed=0,
603                                   msg="lastLogon is way off")
604
605         # The wrong password
606         creds_lockout.set_password("thatsAcomplPASS1x")
607         try:
608             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
609             self.fail()
610         except LdbError as e6:
611             (num, msg) = e6.args
612             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
613
614         res = self._check_account(userdn,
615                                   badPwdCount=1,
616                                   badPasswordTime=("greater", badPasswordTime),
617                                   logonCount=logonCount,
618                                   lockoutTime=lockoutTime,
619                                   lastLogon=lastLogon,
620                                   lastLogonTimestamp=lastLogonTimestamp,
621                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
622                                   msDSUserAccountControlComputed=0)
623         badPasswordTime = int(res[0]["badPasswordTime"][0])
624
625         # The wrong password
626         creds_lockout.set_password("thatsAcomplPASS1x")
627         try:
628             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
629             self.fail()
630         except LdbError as e7:
631             (num, msg) = e7.args
632             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
633
634         res = self._check_account(userdn,
635                                   badPwdCount=2,
636                                   badPasswordTime=("greater", badPasswordTime),
637                                   logonCount=logonCount,
638                                   lockoutTime=lockoutTime,
639                                   lastLogon=lastLogon,
640                                   lastLogonTimestamp=lastLogonTimestamp,
641                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
642                                   msDSUserAccountControlComputed=0)
643         badPasswordTime = int(res[0]["badPasswordTime"][0])
644
645         time.sleep(self.lockout_observation_window + 1)
646
647         res = self._check_account(userdn,
648                                   badPwdCount=2, effective_bad_password_count=0,
649                                   badPasswordTime=badPasswordTime,
650                                   logonCount=logonCount,
651                                   lockoutTime=lockoutTime,
652                                   lastLogon=lastLogon,
653                                   lastLogonTimestamp=lastLogonTimestamp,
654                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
655                                   msDSUserAccountControlComputed=0)
656
657         # The wrong password
658         creds_lockout.set_password("thatsAcomplPASS1x")
659         try:
660             ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
661             self.fail()
662         except LdbError as e8:
663             (num, msg) = e8.args
664             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
665
666         res = self._check_account(userdn,
667                                   badPwdCount=1,
668                                   badPasswordTime=("greater", badPasswordTime),
669                                   logonCount=logonCount,
670                                   lockoutTime=lockoutTime,
671                                   lastLogon=lastLogon,
672                                   lastLogonTimestamp=lastLogonTimestamp,
673                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
674                                   msDSUserAccountControlComputed=0)
675         badPasswordTime = int(res[0]["badPasswordTime"][0])
676
677         # The correct password without letting the timeout expire
678         creds_lockout.set_password(userpass)
679         ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp)
680
681         res = self._check_account(userdn,
682                                   badPwdCount=0,
683                                   badPasswordTime=badPasswordTime,
684                                   logonCount=(logoncount_relation, logonCount),
685                                   lockoutTime=lockoutTime,
686                                   lastLogon=("greater", lastLogon),
687                                   lastLogonTimestamp=lastLogonTimestamp,
688                                   userAccountControl=dsdb.UF_NORMAL_ACCOUNT,
689                                   msDSUserAccountControlComputed=0)
690
691
692 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
693     counter = itertools.count(1, 1)
694
695     def force_replication(self, base=None):
696         if base is None:
697             base = self.base_dn
698
699         # XXX feels like a horrendous way to do it.
700         credstring = '-U%s%%%s' % (CREDS.get_username(),
701                                    CREDS.get_password())
702         cmd = adjust_cmd_for_py_version(['bin/samba-tool',
703                'drs', 'replicate',
704                RODC, RWDC, base,
705                credstring,
706                '--sync-forced'])
707
708         p = subprocess.Popen(cmd,
709                              stderr=subprocess.PIPE,
710                              stdout=subprocess.PIPE)
711         stdout, stderr = p.communicate()
712         if p.returncode:
713             print("failed with code %s" % p.returncode)
714             print(' '.join(cmd))
715             print("stdout")
716             print(stdout)
717             print("stderr")
718             print(stderr)
719             raise RodcRwdcTestException()
720
721     def _check_account_initial(self, dn):
722         self.force_replication()
723         return super(RodcRwdcTests, self)._check_account_initial(dn)
724
725     def tearDown(self):
726         super(RodcRwdcTests, self).tearDown()
727         self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
728         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
729         set_auto_replication(RWDC, True)
730
731     def setUp(self):
732         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
733                              session_info=system_session(LP), lp=LP)
734
735         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
736                              session_info=system_session(LP), lp=LP)
737
738         # Define variables for BasePasswordTestCase
739         self.lp = LP
740         self.global_creds = CREDS
741         self.host = RWDC
742         self.host_url = 'ldap://%s' % RWDC
743         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
744                          credentials=self.global_creds, lp=self.lp)
745
746         super(RodcRwdcTests, self).setUp()
747         self.host = RODC
748         self.host_url = 'ldap://%s' % RODC
749         self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
750                          credentials=self.global_creds, lp=self.lp)
751
752         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
753         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
754         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
755
756         self.base_dn = self.rwdc_db.domain_dn()
757
758         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
759                                    attrs=['dsServiceName'])
760         self.service = root[0]['dsServiceName'][0]
761         self.tag = uuid.uuid4().hex
762
763         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
764         self.rwdc_db.set_dsheuristics("000000001")
765
766         set_auto_replication(RWDC, False)
767
768         # make sure DCs are synchronized before the test
769         self.force_replication()
770         self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
771         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
772
773     def delete_ldb_connections(self):
774         super(RodcRwdcTests, self).delete_ldb_connections()
775         del self.rwdc_db
776         del self.rodc_db
777
778     def assertReferral(self, fn, *args, **kwargs):
779         try:
780             fn(*args, **kwargs)
781             self.fail("failed to raise ldap referral")
782         except ldb.LdbError as e9:
783             (code, msg) = e9.args
784             self.assertEqual(code, ldb.ERR_REFERRAL,
785                              "expected referral, got %s %s" % (code, msg))
786
787     def _test_rodc_dsheuristics(self):
788         d = self.rodc_db.get_dsheuristics()
789         self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
790         self.assertReferral(self.rodc_db.set_dsheuristics, d)
791
792     def TEST_rodc_heuristics_kerberos(self):
793         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
794         self._test_rodc_dsheuristics()
795
796     def TEST_rodc_heuristics_ntlm(self):
797         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
798         self._test_rodc_dsheuristics()
799
800     def _test_add(self, objects, cross_ncs=False):
801         for o in objects:
802             dn = o['dn']
803             if cross_ncs:
804                 base = str(self.rwdc_db.get_config_basedn())
805                 controls = ["search_options:1:2"]
806                 cn = dn.split(',', 1)[0]
807                 expression = '(%s)' % cn
808             else:
809                 base = dn
810                 controls = []
811                 expression = None
812
813             try:
814                 res = self.rodc_db.search(base,
815                                           expression=expression,
816                                           scope=ldb.SCOPE_SUBTREE,
817                                           attrs=['dn'],
818                                           controls=controls)
819                 self.assertEqual(len(res), 0)
820             except ldb.LdbError as e:
821                 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
822                     raise
823
824             try:
825                 self.rwdc_db.add(o)
826             except ldb.LdbError as e:
827                 (ecode, emsg) = e.args
828                 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
829                           (o, ecode, emsg))
830
831             if cross_ncs:
832                 self.force_replication(base=base)
833             else:
834                 self.force_replication()
835
836             try:
837                 res = self.rodc_db.search(base,
838                                           expression=expression,
839                                           scope=ldb.SCOPE_SUBTREE,
840                                           attrs=['dn'],
841                                           controls=controls)
842                 self.assertEqual(len(res), 1)
843             except ldb.LdbError as e:
844                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
845                                     "replication seems to have failed")
846
847     def _test_add_replicated_objects(self, mode):
848         tag = "%s%s" % (self.tag, mode)
849         self._test_add([
850             {
851                 'dn': "ou=%s1,%s" % (tag, self.base_dn),
852                 "objectclass": "organizationalUnit"
853             },
854             {
855                 'dn': "cn=%s2,%s" % (tag, self.base_dn),
856                 "objectclass": "user"
857             },
858             {
859                 'dn': "cn=%s3,%s" % (tag, self.base_dn),
860                 "objectclass": "group"
861             },
862         ])
863         self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
864         self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
865         self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
866
867     def test_add_replicated_objects_kerberos(self):
868         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
869         self._test_add_replicated_objects('kerberos')
870
871     def test_add_replicated_objects_ntlm(self):
872         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
873         self._test_add_replicated_objects('ntlm')
874
875     def _test_add_replicated_connections(self, mode):
876         tag = "%s%s" % (self.tag, mode)
877         self._test_add([
878             {
879                 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
880                 "objectclass": "NTDSConnection",
881                 'enabledConnection': 'TRUE',
882                 'fromServer': self.base_dn,
883                 'options': '0'
884             },
885         ], cross_ncs=True)
886         self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
887
888     def test_add_replicated_connections_kerberos(self):
889         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
890         self._test_add_replicated_connections('kerberos')
891
892     def test_add_replicated_connections_ntlm(self):
893         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
894         self._test_add_replicated_connections('ntlm')
895
896     def _test_modify_replicated_attributes(self):
897         dn = 'CN=Guest,CN=Users,' + self.base_dn
898         value = self.tag
899         for attr in ['carLicense', 'middleName']:
900             m = ldb.Message()
901             m.dn = ldb.Dn(self.rwdc_db, dn)
902             m[attr] = ldb.MessageElement(value,
903                                          ldb.FLAG_MOD_REPLACE,
904                                          attr)
905             try:
906                 self.rwdc_db.modify(m)
907             except ldb.LdbError as e:
908                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
909                           (dn, attr, RWDC, e))
910
911             self.force_replication()
912
913             try:
914                 res = self.rodc_db.search(dn,
915                                           scope=ldb.SCOPE_SUBTREE,
916                                           attrs=[attr])
917                 results = [str(x[attr][0]) for x in res]
918                 self.assertEqual(results, [value])
919             except ldb.LdbError as e:
920                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
921                                     "replication seems to have failed")
922
923     def test_modify_replicated_attributes_kerberos(self):
924         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
925         self._test_modify_replicated_attributes()
926
927     def test_modify_replicated_attributes_ntlm(self):
928         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
929         self._test_modify_replicated_attributes()
930
931     def _test_add_modify_delete(self):
932         dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
933         values = ["%s%s" % (i, self.tag) for i in range(3)]
934         attr = "carLicense"
935         self._test_add([
936             {
937                 'dn': dn,
938                 "objectclass": "user",
939                 attr: values[0]
940             },
941         ])
942         self.force_replication()
943         for value in values[1:]:
944
945             m = ldb.Message()
946             m.dn = ldb.Dn(self.rwdc_db, dn)
947             m[attr] = ldb.MessageElement(value,
948                                          ldb.FLAG_MOD_REPLACE,
949                                          attr)
950             try:
951                 self.rwdc_db.modify(m)
952             except ldb.LdbError as e:
953                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
954                           (dn, attr, RWDC, e))
955
956             self.force_replication()
957
958             try:
959                 res = self.rodc_db.search(dn,
960                                           scope=ldb.SCOPE_SUBTREE,
961                                           attrs=[attr])
962                 results = [str(x[attr][0]) for x in res]
963                 self.assertEqual(results, [value])
964             except ldb.LdbError as e:
965                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
966                                     "replication seems to have failed")
967
968         self.rwdc_db.delete(dn)
969         self.force_replication()
970         try:
971             res = self.rodc_db.search(dn,
972                                       scope=ldb.SCOPE_SUBTREE,
973                                       attrs=[attr])
974             if len(res) > 0:
975                 self.fail("Failed to delete %s" % (dn))
976         except ldb.LdbError as e:
977             self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
978                              "Failed to delete %s" % (dn))
979
980     def test_add_modify_delete_kerberos(self):
981         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
982         self._test_add_modify_delete()
983
984     def test_add_modify_delete_ntlm(self):
985         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
986         self._test_add_modify_delete()
987
988     def _new_user(self):
989         username = "u%sX%s" % (self.tag[:12], next(self.counter))
990         password = 'password#1'
991         dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
992         o = {
993             'dn': dn,
994             "objectclass": "user",
995             'sAMAccountName': username,
996         }
997         try:
998             self.rwdc_db.add(o)
999         except ldb.LdbError as e:
1000             self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
1001
1002         self.rwdc_db.modify_ldif("dn: %s\n"
1003                                  "changetype: modify\n"
1004                                  "delete: userPassword\n"
1005                                  "add: userPassword\n"
1006                                  "userPassword: %s\n" % (dn, password))
1007         self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
1008         return (dn, username, password)
1009
1010     def _change_password(self, user_dn, old_password, new_password):
1011         self.rwdc_db.modify_ldif(
1012             "dn: %s\n"
1013             "changetype: modify\n"
1014             "delete: userPassword\n"
1015             "userPassword: %s\n"
1016             "add: userPassword\n"
1017             "userPassword: %s\n" % (user_dn, old_password, new_password))
1018
1019     def try_ldap_logon(self, server, creds, errno=None):
1020         try:
1021             tmpdb = SamDB('ldap://%s' % server, credentials=creds,
1022                           session_info=system_session(LP), lp=LP)
1023             if errno is not None:
1024                 self.fail("logon failed to fail with ldb error %s" % errno)
1025         except ldb.LdbError as e10:
1026             (code, msg) = e10.args
1027             if code != errno:
1028                 if errno is None:
1029                     self.fail("logon incorrectly raised ldb error (code=%s)" %
1030                               code)
1031                 else:
1032                     self.fail("logon failed to raise correct ldb error"
1033                               "Expected: %s Got: %s" %
1034                               (errno, code))
1035
1036     def zero_min_password_age(self):
1037         min_pwd_age = int(self.rwdc_db.get_minPwdAge())
1038         if min_pwd_age != 0:
1039             self.rwdc_db.set_minPwdAge('0')
1040
1041     def _test_ldap_change_password(self, errno=None):
1042         self.zero_min_password_age()
1043
1044         dn, username, password = self._new_user()
1045         creds1 = make_creds(username, password)
1046
1047         # With NTLM, this should fail on RODC before replication,
1048         # because the user isn't known.
1049         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1050         self.force_replication()
1051
1052         # Now the user is replicated to RODC, so logon should work
1053         self.try_ldap_logon(RODC, creds1)
1054
1055         passwords = ['password#%s' % i for i in range(1, 6)]
1056         for prev, password in zip(passwords[:-1], passwords[1:]):
1057             self._change_password(dn, prev, password)
1058
1059         # The password has changed enough times to make the old
1060         # password invalid (though with kerberos that doesn't matter).
1061         # For NTLM, the old creds should always fail
1062         self.try_ldap_logon(RODC, creds1, errno)
1063         self.try_ldap_logon(RWDC, creds1, errno)
1064
1065         creds2 = make_creds(username, password)
1066
1067         # new creds work straight away with NTLM, because although it
1068         # doesn't have the password, it knows the user and forwards
1069         # the query.
1070         self.try_ldap_logon(RODC, creds2)
1071         self.try_ldap_logon(RWDC, creds2)
1072
1073         self.force_replication()
1074
1075         # After another replication check RODC still works and fails,
1076         # as appropriate to various creds
1077         self.try_ldap_logon(RODC, creds2)
1078         self.try_ldap_logon(RODC, creds1, errno)
1079
1080         prev = password
1081         password = 'password#6'
1082         self._change_password(dn, prev, password)
1083         creds3 = make_creds(username, password)
1084
1085         # previous password should still work.
1086         self.try_ldap_logon(RWDC, creds2)
1087         self.try_ldap_logon(RODC, creds2)
1088
1089         # new password should still work.
1090         self.try_ldap_logon(RWDC, creds3)
1091         self.try_ldap_logon(RODC, creds3)
1092
1093         # old password should still fail (but not on kerberos).
1094         self.try_ldap_logon(RWDC, creds1, errno)
1095         self.try_ldap_logon(RODC, creds1, errno)
1096
1097     def test_ldap_change_password_kerberos(self):
1098         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1099         self._test_ldap_change_password()
1100
1101     def test_ldap_change_password_ntlm(self):
1102         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1103         self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
1104
1105     def _test_ldap_change_password_reveal_on_demand(self, errno=None):
1106         self.zero_min_password_age()
1107
1108         res = self.rodc_db.search(self.rodc_dn,
1109                                   scope=ldb.SCOPE_BASE,
1110                                   attrs=['msDS-RevealOnDemandGroup'])
1111
1112         group = res[0]['msDS-RevealOnDemandGroup'][0].decode('utf8')
1113
1114         user_dn, username, password = self._new_user()
1115         creds1 = make_creds(username, password)
1116
1117         m = ldb.Message()
1118         m.dn = ldb.Dn(self.rwdc_db, group)
1119         m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
1120         self.rwdc_db.modify(m)
1121
1122         # Against Windows, this will just forward if no account exists on the KDC
1123         # Therefore, this does not error on Windows.
1124         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
1125
1126         self.force_replication()
1127
1128         # The proxy case
1129         self.try_ldap_logon(RODC, creds1)
1130         preload_rodc_user(user_dn)
1131
1132         # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
1133         self.try_ldap_logon(RODC, creds1)
1134
1135         passwords = ['password#%s' % i for i in range(1, 6)]
1136         for prev, password in zip(passwords[:-1], passwords[1:]):
1137             self._change_password(user_dn, prev, password)
1138
1139         # The password has changed enough times to make the old
1140         # password invalid, but the RODC shouldn't know that.
1141         self.try_ldap_logon(RODC, creds1)
1142         self.try_ldap_logon(RWDC, creds1, errno)
1143
1144         creds2 = make_creds(username, password)
1145         self.try_ldap_logon(RWDC, creds2)
1146         # We can forward WRONG_PASSWORD over NTLM.
1147         # This SHOULD succeed.
1148         self.try_ldap_logon(RODC, creds2)
1149
1150     def test_change_password_reveal_on_demand_ntlm(self):
1151         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
1152         self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
1153
1154     def test_change_password_reveal_on_demand_kerberos(self):
1155         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
1156         self._test_ldap_change_password_reveal_on_demand()
1157
1158     def test_login_lockout_krb5(self):
1159         username = self.lockout1krb5_creds.get_username()
1160         userpass = self.lockout1krb5_creds.get_password()
1161         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1162
1163         preload_rodc_user(userdn)
1164
1165         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1166         fail_creds = self.insta_creds(self.template_creds,
1167                                       username=username,
1168                                       userpass=userpass + "X",
1169                                       kerberos_state=use_kerberos)
1170
1171         try:
1172             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1173             self.fail()
1174         except LdbError as e11:
1175             (num, msg) = e11.args
1176             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1177
1178         # Succeed to reset everything to 0
1179         success_creds = self.insta_creds(self.template_creds,
1180                                          username=username,
1181                                          userpass=userpass,
1182                                          kerberos_state=use_kerberos)
1183
1184         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1185
1186         self._test_login_lockout(self.lockout1krb5_creds)
1187
1188     def test_login_lockout_ntlm(self):
1189         username = self.lockout1ntlm_creds.get_username()
1190         userpass = self.lockout1ntlm_creds.get_password()
1191         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1192
1193         preload_rodc_user(userdn)
1194
1195         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1196         fail_creds = self.insta_creds(self.template_creds,
1197                                       username=username,
1198                                       userpass=userpass + "X",
1199                                       kerberos_state=use_kerberos)
1200
1201         try:
1202             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1203             self.fail()
1204         except LdbError as e12:
1205             (num, msg) = e12.args
1206             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1207
1208         # Succeed to reset everything to 0
1209         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1210
1211         self._test_login_lockout(self.lockout1ntlm_creds)
1212
1213     def test_multiple_logon_krb5(self):
1214         username = self.lockout1krb5_creds.get_username()
1215         userpass = self.lockout1krb5_creds.get_password()
1216         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1217
1218         preload_rodc_user(userdn)
1219
1220         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
1221         fail_creds = self.insta_creds(self.template_creds,
1222                                       username=username,
1223                                       userpass=userpass + "X",
1224                                       kerberos_state=use_kerberos)
1225
1226         try:
1227             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1228             self.fail()
1229         except LdbError as e13:
1230             (num, msg) = e13.args
1231             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1232
1233         # Succeed to reset everything to 0
1234         success_creds = self.insta_creds(self.template_creds,
1235                                          username=username,
1236                                          userpass=userpass,
1237                                          kerberos_state=use_kerberos)
1238
1239         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
1240
1241         self._test_multiple_logon(self.lockout1krb5_creds)
1242
1243     def test_multiple_logon_ntlm(self):
1244         username = self.lockout1ntlm_creds.get_username()
1245         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
1246         userpass = self.lockout1ntlm_creds.get_password()
1247
1248         preload_rodc_user(userdn)
1249
1250         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
1251         fail_creds = self.insta_creds(self.template_creds,
1252                                       username=username,
1253                                       userpass=userpass + "X",
1254                                       kerberos_state=use_kerberos)
1255
1256         try:
1257             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
1258             self.fail()
1259         except LdbError as e14:
1260             (num, msg) = e14.args
1261             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
1262
1263         # Succeed to reset everything to 0
1264         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
1265
1266         self._test_multiple_logon(self.lockout1ntlm_creds)
1267
1268
1269 def main():
1270     global RODC, RWDC, CREDS, LP
1271     parser = optparse.OptionParser(
1272         "rodc_rwdc.py [options] <rodc host> <rwdc host>")
1273
1274     sambaopts = options.SambaOptions(parser)
1275     versionopts = options.VersionOptions(parser)
1276     credopts = options.CredentialsOptions(parser)
1277     subunitopts = SubunitOptions(parser)
1278
1279     parser.add_option_group(sambaopts)
1280     parser.add_option_group(versionopts)
1281     parser.add_option_group(credopts)
1282     parser.add_option_group(subunitopts)
1283
1284     opts, args = parser.parse_args()
1285
1286     LP = sambaopts.get_loadparm()
1287     CREDS = credopts.get_credentials(LP)
1288     CREDS.set_gensec_features(CREDS.get_gensec_features() |
1289                               gensec.FEATURE_SEAL)
1290
1291     try:
1292         RODC, RWDC = args
1293     except ValueError:
1294         parser.print_usage()
1295         sys.exit(1)
1296
1297     set_auto_replication(RWDC, True)
1298     try:
1299         TestProgram(module=__name__, opts=subunitopts)
1300     finally:
1301         set_auto_replication(RWDC, True)
1302
1303
1304 main()