tests/rodc: Test for NTLM wrong password forwarding
[mdw/samba.git] / source4 / dsdb / tests / python / rodc_rwdc.py
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
3 """Test communication of credentials etc, between an RODC and a RWDC.
4
5 How does it work when the password is changed on the RWDC?
6 """
7
8 import optparse
9 import sys
10 import base64
11 import uuid
12 import subprocess
13 import itertools
14 import time
15
16 sys.path.insert(0, "bin/python")
17 import samba
18 import ldb
19
20 from samba.tests.subunitrun import SubunitOptions, TestProgram
21 import samba.getopt as options
22
23 from samba.auth import system_session
24 from samba.samdb import SamDB
25 from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
26 from samba import gensec, dsdb
27 from ldb import SCOPE_BASE, LdbError, ERR_INVALID_CREDENTIALS
28 from samba.dcerpc import security, samr
29
30 import password_lockout_base
31
32 def passwd_encode(pw):
33     return base64.b64encode(('"%s"' % pw).encode('utf-16-le'))
34
35
36 class RodcRwdcTestException(Exception):
37     pass
38
39
40 def make_creds(username, password, kerberos_state=None):
41     # use the global CREDS as a template
42     c = Credentials()
43     c.set_username(username)
44     c.set_password(password)
45     c.set_domain(CREDS.get_domain())
46     c.set_realm(CREDS.get_realm())
47     c.set_workstation(CREDS.get_workstation())
48
49     if kerberos_state is None:
50         kerberos_state = CREDS.get_kerberos_state()
51     c.set_kerberos_state(kerberos_state)
52
53     print '-' * 73
54     if kerberos_state == MUST_USE_KERBEROS:
55         print "we seem to be using kerberos for %s %s" % (username, password)
56     elif kerberos_state == DONT_USE_KERBEROS:
57         print "NOT using kerberos for %s %s" % (username, password)
58     else:
59         print "kerberos state is %s" % kerberos_state
60
61     c.set_gensec_features(c.get_gensec_features() |
62                           gensec.FEATURE_SEAL)
63     return c
64
65
66 def set_auto_replication(dc, allow):
67     credstring = '-U%s%%%s' % (CREDS.get_username(),
68                                CREDS.get_password())
69
70     on_or_off = '-' if allow else '+'
71
72     for opt in ['DISABLE_INBOUND_REPL',
73                 'DISABLE_OUTBOUND_REPL']:
74         cmd = ['bin/samba-tool',
75                'drs', 'options',
76                credstring, dc,
77                "--dsa-option=%s%s" % (on_or_off, opt)]
78
79         p = subprocess.Popen(cmd,
80                              stderr=subprocess.PIPE,
81                              stdout=subprocess.PIPE)
82         stdout, stderr = p.communicate()
83         if p.returncode:
84             if 'LDAP_REFERRAL' not in stderr:
85                 raise RodcRwdcTestException()
86             print ("ignoring +%s REFERRAL error; assuming %s is RODC" %
87                    (opt, dc))
88
89
90 def preload_rodc_user(user_dn):
91     credstring = '-U%s%%%s' % (CREDS.get_username(),
92                                CREDS.get_password())
93
94     set_auto_replication(RWDC, True)
95     cmd = ['bin/samba-tool',
96            'rodc', 'preload',
97            user_dn,
98            credstring,
99            '--server', RWDC,]
100
101     print ' '.join(cmd)
102     subprocess.check_call(cmd)
103     set_auto_replication(RWDC, False)
104
105
106
107 def get_server_ref_from_samdb(samdb):
108     server_name = samdb.get_serverName()
109     res = samdb.search(server_name,
110                        scope=ldb.SCOPE_BASE,
111                        attrs=['serverReference'])
112
113     return res[0]['serverReference'][0]
114
115
116
117 class RodcRwdcTests(password_lockout_base.BasePasswordTestCase):
118     counter = itertools.count(1).next
119
120     def force_replication(self, base=None):
121         if base is None:
122             base = self.base_dn
123
124         # XXX feels like a horrendous way to do it.
125         credstring = '-U%s%%%s' % (CREDS.get_username(),
126                                    CREDS.get_password())
127         cmd = ['bin/samba-tool',
128                'drs', 'replicate',
129                RODC, RWDC, base,
130                credstring,
131                '--sync-forced']
132
133         p = subprocess.Popen(cmd,
134                              stderr=subprocess.PIPE,
135                              stdout=subprocess.PIPE)
136         stdout, stderr = p.communicate()
137         if p.returncode:
138             print "failed with code %s" % p.returncode
139             print ' '.join(cmd)
140             print "stdout"
141             print stdout
142             print "stderr"
143             print stderr
144             raise RodcRwdcTestException()
145
146     def _check_account_initial(self, dn):
147         self.force_replication()
148         return super(RodcRwdcTests, self)._check_account_initial(dn)
149
150     def tearDown(self):
151         super(RodcRwdcTests, self).tearDown()
152         self.rwdc_db.set_dsheuristics(self.rwdc_dsheuristics)
153         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
154         set_auto_replication(RWDC, True)
155
156     def setUp(self):
157         self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS,
158                              session_info=system_session(LP), lp=LP)
159
160         self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS,
161                              session_info=system_session(LP), lp=LP)
162
163         # Define variables for BasePasswordTestCase
164         self.lp = LP
165         self.global_creds = CREDS
166         self.host = RWDC
167         self.host_url = 'ldap://%s' % RWDC
168         self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp),
169                          credentials=self.global_creds, lp=self.lp)
170
171         super(RodcRwdcTests, self).setUp()
172         self.host = RODC
173         self.host_url = 'ldap://%s' % RODC
174         self.ldb = SamDB(url='ldap://%s' % RODC, session_info=system_session(self.lp),
175                          credentials=self.global_creds, lp=self.lp)
176
177         self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds)
178         self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
179         self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
180
181         self.base_dn = self.rwdc_db.domain_dn()
182
183         root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE,
184                                    attrs=['dsServiceName'])
185         self.service = root[0]['dsServiceName'][0]
186         self.tag = uuid.uuid4().hex
187
188         self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics()
189         self.rwdc_db.set_dsheuristics("000000001")
190
191         set_auto_replication(RWDC, False)
192
193         # make sure DCs are synchronized before the test
194         self.force_replication()
195         self.rwdc_dn = get_server_ref_from_samdb(self.rwdc_db)
196         self.rodc_dn = get_server_ref_from_samdb(self.rodc_db)
197
198     def assertReferral(self, fn, *args, **kwargs):
199         try:
200             fn(*args, **kwargs)
201             self.fail("failed to raise ldap referral")
202         except ldb.LdbError as (code, msg):
203             self.assertEqual(code, ldb.ERR_REFERRAL,
204                              "expected referral, got %s %s" % (code, msg))
205
206     def _test_rodc_dsheuristics(self):
207         d = self.rodc_db.get_dsheuristics()
208         self.assertReferral(self.rodc_db.set_dsheuristics, "000000001")
209         self.assertReferral(self.rodc_db.set_dsheuristics, d)
210
211     def TEST_rodc_heuristics_kerberos(self):
212         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
213         self._test_rodc_dsheuristics()
214
215     def TEST_rodc_heuristics_ntlm(self):
216         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
217         self._test_rodc_dsheuristics()
218
219     def _test_add(self, objects, cross_ncs=False):
220         for o in objects:
221             dn = o['dn']
222             if cross_ncs:
223                 base = str(self.rwdc_db.get_config_basedn())
224                 controls = ["search_options:1:2"]
225                 cn = dn.split(',', 1)[0]
226                 expression = '(%s)' % cn
227             else:
228                 base = dn
229                 controls = []
230                 expression = None
231
232             try:
233                 res = self.rodc_db.search(base,
234                                           expression=expression,
235                                           scope=ldb.SCOPE_SUBTREE,
236                                           attrs=['dn'],
237                                           controls=controls)
238                 self.assertEqual(len(res), 0)
239             except ldb.LdbError, e:
240                 if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
241                     raise
242
243             try:
244                 self.rwdc_db.add(o)
245             except ldb.LdbError as (ecode, emsg):
246                 self.fail("Failed to add %s to rwdc: ldb error: %s %s" %
247                           (ecode, emsg))
248
249             if cross_ncs:
250                 self.force_replication(base=base)
251             else:
252                 self.force_replication()
253
254             try:
255                 res = self.rodc_db.search(base,
256                                           expression=expression,
257                                           scope=ldb.SCOPE_SUBTREE,
258                                           attrs=['dn'],
259                                           controls=controls)
260                 self.assertEqual(len(res), 1)
261             except ldb.LdbError, e:
262                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
263                                     "replication seems to have failed")
264
265     def _test_add_replicated_objects(self, mode):
266         tag = "%s%s" % (self.tag, mode)
267         self._test_add([
268             {
269                 'dn': "ou=%s1,%s" % (tag, self.base_dn),
270                 "objectclass": "organizationalUnit"
271             },
272             {
273                 'dn': "cn=%s2,%s" % (tag, self.base_dn),
274                 "objectclass": "user"
275             },
276             {
277                 'dn': "cn=%s3,%s" % (tag, self.base_dn),
278                 "objectclass": "group"
279             },
280         ])
281         self.rwdc_db.delete("ou=%s1,%s" % (tag, self.base_dn))
282         self.rwdc_db.delete("cn=%s2,%s" % (tag, self.base_dn))
283         self.rwdc_db.delete("cn=%s3,%s" % (tag, self.base_dn))
284
285     def test_add_replicated_objects_kerberos(self):
286         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
287         self._test_add_replicated_objects('kerberos')
288
289     def test_add_replicated_objects_ntlm(self):
290         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
291         self._test_add_replicated_objects('ntlm')
292
293     def _test_add_replicated_connections(self, mode):
294         tag = "%s%s" % (self.tag, mode)
295         self._test_add([
296             {
297                 'dn': "cn=%sfoofoofoo,%s" % (tag, self.service),
298                 "objectclass": "NTDSConnection",
299                 'enabledConnection': 'TRUE',
300                 'fromServer': self.base_dn,
301                 'options': '0'
302             },
303         ], cross_ncs=True)
304         self.rwdc_db.delete("cn=%sfoofoofoo,%s" % (tag, self.service))
305
306     def test_add_replicated_connections_kerberos(self):
307         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
308         self._test_add_replicated_connections('kerberos')
309
310     def test_add_replicated_connections_ntlm(self):
311         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
312         self._test_add_replicated_connections('ntlm')
313
314     def _test_modify_replicated_attributes(self):
315         dn = 'CN=Guest,CN=Users,' + self.base_dn
316         value = self.tag
317         for attr in ['carLicense', 'middleName']:
318             m = ldb.Message()
319             m.dn = ldb.Dn(self.rwdc_db, dn)
320             m[attr] = ldb.MessageElement(value,
321                                          ldb.FLAG_MOD_REPLACE,
322                                          attr)
323             try:
324                 self.rwdc_db.modify(m)
325             except ldb.LdbError as e:
326                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
327                           (dn, attr, RWDC, e))
328
329             self.force_replication()
330
331             try:
332                 res = self.rodc_db.search(dn,
333                                           scope=ldb.SCOPE_SUBTREE,
334                                           attrs=[attr])
335                 results = [x[attr][0] for x in res]
336                 self.assertEqual(results, [value])
337             except ldb.LdbError, e:
338                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
339                                     "replication seems to have failed")
340
341     def test_modify_replicated_attributes_kerberos(self):
342         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
343         self._test_modify_replicated_attributes()
344
345     def test_modify_replicated_attributes_ntlm(self):
346         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
347         self._test_modify_replicated_attributes()
348
349     def _test_add_modify_delete(self):
350         dn = "cn=%s_add_modify,%s" % (self.tag, self.base_dn)
351         values = ["%s%s" % (i, self.tag) for i in range(3)]
352         attr = "carLicense"
353         self._test_add([
354             {
355                 'dn': dn,
356                 "objectclass": "user",
357                 attr: values[0]
358             },
359         ])
360         self.force_replication()
361         for value in values[1:]:
362
363             m = ldb.Message()
364             m.dn = ldb.Dn(self.rwdc_db, dn)
365             m[attr] = ldb.MessageElement(value,
366                                          ldb.FLAG_MOD_REPLACE,
367                                          attr)
368             try:
369                 self.rwdc_db.modify(m)
370             except ldb.LdbError as e:
371                 self.fail("Failed to modify %s %s on RWDC %s with %s" %
372                           (dn, attr, RWDC, e))
373
374             self.force_replication()
375
376             try:
377                 res = self.rodc_db.search(dn,
378                                           scope=ldb.SCOPE_SUBTREE,
379                                           attrs=[attr])
380                 results = [x[attr][0] for x in res]
381                 self.assertEqual(results, [value])
382             except ldb.LdbError, e:
383                 self.assertNotEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
384                                     "replication seems to have failed")
385
386         self.rwdc_db.delete(dn)
387         self.force_replication()
388         try:
389             res = self.rodc_db.search(dn,
390                                       scope=ldb.SCOPE_SUBTREE,
391                                       attrs=[attr])
392             if len(res) > 0:
393                 self.fail("Failed to delete %s" % (dn))
394         except ldb.LdbError, e:
395             self.assertEqual(e.args[0], ldb.ERR_NO_SUCH_OBJECT,
396                              "Failed to delete %s" % (dn))
397
398     def test_add_modify_delete_kerberos(self):
399         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
400         self._test_add_modify_delete()
401
402     def test_add_modify_delete_ntlm(self):
403         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
404         self._test_add_modify_delete()
405
406     def _new_user(self):
407         username = "u%sX%s" % (self.tag[:12], self.counter())
408         password = 'password#1'
409         dn = 'CN=%s,CN=Users,%s' % (username, self.base_dn)
410         o = {
411             'dn': dn,
412             "objectclass": "user",
413             'sAMAccountName': username,
414         }
415         try:
416             self.rwdc_db.add(o)
417         except ldb.LdbError as e:
418             self.fail("Failed to add %s to rwdc: ldb error: %s" % (o, e))
419
420         self.rwdc_db.modify_ldif("dn: %s\n"
421                                  "changetype: modify\n"
422                                  "delete: userPassword\n"
423                                  "add: userPassword\n"
424                                  "userPassword: %s\n" % (dn, password))
425         self.rwdc_db.enable_account("(sAMAccountName=%s)" % username)
426         return (dn, username, password)
427
428     def _change_password(self, user_dn, old_password, new_password):
429         self.rwdc_db.modify_ldif(
430             "dn: %s\n"
431             "changetype: modify\n"
432             "delete: userPassword\n"
433             "userPassword: %s\n"
434             "add: userPassword\n"
435             "userPassword: %s\n" % (user_dn, old_password, new_password))
436
437     def try_ldap_logon(self, server, creds, errno=None):
438         try:
439             tmpdb = SamDB('ldap://%s' % server, credentials=creds,
440                           session_info=system_session(LP), lp=LP)
441             if errno is not None:
442                 self.fail("logon failed to fail with ldb error %s" % errno)
443         except ldb.LdbError as (code, msg):
444             if code != errno:
445                 if errno is None:
446                     self.fail("logon incorrectly raised ldb error (code=%s)" %
447                               code)
448                 else:
449                     self.fail("logon failed to raise correct ldb error"
450                               "Expected: %s Got: %s" %
451                               (errno, code))
452
453
454     def zero_min_password_age(self):
455         min_pwd_age = int(self.rwdc_db.get_minPwdAge())
456         if min_pwd_age != 0:
457             self.rwdc_db.set_minPwdAge('0')
458
459     def _test_ldap_change_password(self, errno=None):
460         self.zero_min_password_age()
461
462         dn, username, password = self._new_user()
463         creds1 = make_creds(username, password)
464
465         # With NTLM, this should fail on RODC before replication,
466         # because the user isn't known.
467         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
468         self.force_replication()
469
470         # Now the user is replicated to RODC, so logon should work
471         self.try_ldap_logon(RODC, creds1)
472
473         passwords = ['password#%s' % i for i in range(1, 6)]
474         for prev, password in zip(passwords[:-1], passwords[1:]):
475             self._change_password(dn, prev, password)
476
477         # The password has changed enough times to make the old
478         # password invalid (though with kerberos that doesn't matter).
479         # For NTLM, the old creds should always fail
480         self.try_ldap_logon(RODC, creds1, errno)
481         self.try_ldap_logon(RWDC, creds1, errno)
482
483         creds2 = make_creds(username, password)
484
485         # new creds work straight away with NTLM, because although it
486         # doesn't have the password, it knows the user and forwards
487         # the query.
488         self.try_ldap_logon(RODC, creds2)
489         self.try_ldap_logon(RWDC, creds2)
490
491         self.force_replication()
492
493         # After another replication check RODC still works and fails,
494         # as appropriate to various creds
495         self.try_ldap_logon(RODC, creds2)
496         self.try_ldap_logon(RODC, creds1, errno)
497
498         prev = password
499         password = 'password#6'
500         self._change_password(dn, prev, password)
501         creds3 = make_creds(username, password)
502
503         # previous password should still work.
504         self.try_ldap_logon(RWDC, creds2)
505         self.try_ldap_logon(RODC, creds2)
506
507         # new password should still work.
508         self.try_ldap_logon(RWDC, creds3)
509         self.try_ldap_logon(RODC, creds3)
510
511         # old password should still fail (but not on kerberos).
512         self.try_ldap_logon(RWDC, creds1, errno)
513         self.try_ldap_logon(RODC, creds1, errno)
514
515     def test_ldap_change_password_kerberos(self):
516         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
517         self._test_ldap_change_password()
518
519     def test_ldap_change_password_ntlm(self):
520         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
521         self._test_ldap_change_password(ldb.ERR_INVALID_CREDENTIALS)
522
523     def _test_ldap_change_password_reveal_on_demand(self, errno=None):
524         self.zero_min_password_age()
525
526         res = self.rodc_db.search(self.rodc_dn,
527                                   scope=ldb.SCOPE_BASE,
528                                   attrs=['msDS-RevealOnDemandGroup'])
529
530         group = res[0]['msDS-RevealOnDemandGroup'][0]
531
532         user_dn, username, password = self._new_user()
533         creds1 = make_creds(username, password)
534
535         m = ldb.Message()
536         m.dn = ldb.Dn(self.rwdc_db, group)
537         m['member'] = ldb.MessageElement(user_dn, ldb.FLAG_MOD_ADD, 'member')
538         self.rwdc_db.modify(m)
539
540         # Against Windows, this will just forward if no account exists on the KDC
541         # Therefore, this does not error on Windows.
542         self.try_ldap_logon(RODC, creds1, ldb.ERR_INVALID_CREDENTIALS)
543
544         self.force_replication()
545
546         # The proxy case
547         self.try_ldap_logon(RODC, creds1)
548         preload_rodc_user(user_dn)
549
550         # Now the user AND password are replicated to RODC, so logon should work (not proxy case)
551         self.try_ldap_logon(RODC, creds1)
552
553         passwords = ['password#%s' % i for i in range(1, 6)]
554         for prev, password in zip(passwords[:-1], passwords[1:]):
555             self._change_password(user_dn, prev, password)
556
557         # The password has changed enough times to make the old
558         # password invalid, but the RODC shouldn't know that.
559         self.try_ldap_logon(RODC, creds1)
560         self.try_ldap_logon(RWDC, creds1, errno)
561
562         creds2 = make_creds(username, password)
563         self.try_ldap_logon(RWDC, creds2)
564         # We can forward WRONG_PASSWORD over NTLM.
565         # This SHOULD succeed.
566         self.try_ldap_logon(RODC, creds2)
567
568
569     def test_change_password_reveal_on_demand_ntlm(self):
570         CREDS.set_kerberos_state(DONT_USE_KERBEROS)
571         self._test_ldap_change_password_reveal_on_demand(ldb.ERR_INVALID_CREDENTIALS)
572
573     def test_change_password_reveal_on_demand_kerberos(self):
574         CREDS.set_kerberos_state(MUST_USE_KERBEROS)
575         self._test_ldap_change_password_reveal_on_demand()
576
577     def test_login_lockout_krb5(self):
578         username = self.lockout1krb5_creds.get_username()
579         userpass = self.lockout1krb5_creds.get_password()
580         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
581
582         preload_rodc_user(userdn)
583
584         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
585         fail_creds = self.insta_creds(self.template_creds,
586                                       username=username,
587                                       userpass=userpass+"X",
588                                       kerberos_state=use_kerberos)
589
590         try:
591             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
592             self.fail()
593         except LdbError, (num, msg):
594             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
595
596         # Succeed to reset everything to 0
597         success_creds = self.insta_creds(self.template_creds,
598                                          username=username,
599                                          userpass=userpass,
600                                          kerberos_state=use_kerberos)
601
602         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
603
604         self._test_login_lockout(self.lockout1krb5_creds)
605
606     def test_login_lockout_ntlm(self):
607         username = self.lockout1ntlm_creds.get_username()
608         userpass = self.lockout1ntlm_creds.get_password()
609         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
610
611         preload_rodc_user(userdn)
612
613         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
614         fail_creds = self.insta_creds(self.template_creds,
615                                       username=username,
616                                       userpass=userpass+"X",
617                                       kerberos_state=use_kerberos)
618
619         try:
620             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
621             self.fail()
622         except LdbError, (num, msg):
623             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
624
625         # Succeed to reset everything to 0
626         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
627
628         self._test_login_lockout(self.lockout1ntlm_creds)
629
630     def test_multiple_logon_krb5(self):
631         username = self.lockout1krb5_creds.get_username()
632         userpass = self.lockout1krb5_creds.get_password()
633         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
634
635         preload_rodc_user(userdn)
636
637         use_kerberos = self.lockout1krb5_creds.get_kerberos_state()
638         fail_creds = self.insta_creds(self.template_creds,
639                                       username=username,
640                                       userpass=userpass+"X",
641                                       kerberos_state=use_kerberos)
642
643         try:
644             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
645             self.fail()
646         except LdbError, (num, msg):
647             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
648
649         # Succeed to reset everything to 0
650         success_creds = self.insta_creds(self.template_creds,
651                                          username=username,
652                                          userpass=userpass,
653                                          kerberos_state=use_kerberos)
654
655         ldb = SamDB(url=self.host_url, credentials=success_creds, lp=self.lp)
656
657         self._test_multiple_logon(self.lockout1krb5_creds)
658
659     def test_multiple_logon_ntlm(self):
660         username = self.lockout1ntlm_creds.get_username()
661         userdn = "cn=%s,cn=users,%s" % (username, self.base_dn)
662         userpass = self.lockout1ntlm_creds.get_password()
663
664         preload_rodc_user(userdn)
665
666         use_kerberos = self.lockout1ntlm_creds.get_kerberos_state()
667         fail_creds = self.insta_creds(self.template_creds,
668                                       username=username,
669                                       userpass=userpass+"X",
670                                       kerberos_state=use_kerberos)
671
672         try:
673             ldb = SamDB(url=self.host_url, credentials=fail_creds, lp=self.lp)
674             self.fail()
675         except LdbError, (num, msg):
676             self.assertEquals(num, ERR_INVALID_CREDENTIALS)
677
678         # Succeed to reset everything to 0
679         ldb = SamDB(url=self.host_url, credentials=self.lockout1ntlm_creds, lp=self.lp)
680
681         self._test_multiple_logon(self.lockout1ntlm_creds)
682
683 def main():
684     global RODC, RWDC, CREDS, LP
685     parser = optparse.OptionParser(
686         "rodc_rwdc.py [options] <rodc host> <rwdc host>")
687
688     sambaopts = options.SambaOptions(parser)
689     versionopts = options.VersionOptions(parser)
690     credopts = options.CredentialsOptions(parser)
691     subunitopts = SubunitOptions(parser)
692
693     parser.add_option_group(sambaopts)
694     parser.add_option_group(versionopts)
695     parser.add_option_group(credopts)
696     parser.add_option_group(subunitopts)
697
698     opts, args = parser.parse_args()
699
700     LP = sambaopts.get_loadparm()
701     CREDS = credopts.get_credentials(LP)
702     CREDS.set_gensec_features(CREDS.get_gensec_features() |
703                               gensec.FEATURE_SEAL)
704
705     try:
706         RODC, RWDC = args
707     except ValueError:
708         parser.print_usage()
709         sys.exit(1)
710
711     set_auto_replication(RWDC, True)
712     try:
713         TestProgram(module=__name__, opts=subunitopts)
714     finally:
715         set_auto_replication(RWDC, True)
716
717 main()