class TallocTests(samba.tests.TestCase):
- '''test talloc behaviour of pidl generated python code'''
+ """test talloc behaviour of pidl generated python code"""
def check_blocks(self, object, num_expected):
- '''check that the number of allocated blocks is correct'''
+ """check that the number of allocated blocks is correct"""
nblocks = talloc.total_blocks(object)
if object is None:
nblocks -= self.initial_blocks
self.assertEqual(nblocks, num_expected)
def get_rodc_partial_attribute_set(self):
- '''get a list of attributes for RODC replication'''
+ """get a list of attributes for RODC replication"""
partial_attribute_set = drsuapi.DsPartialAttributeSet()
# we expect one block for the object
class RpcTests(object):
- '''test type behaviour of pidl generated python RPC code'''
+ """test type behaviour of pidl generated python RPC code"""
def check_blocks(self, object, num_expected):
- '''check that the number of allocated blocks is correct'''
+ """check that the number of allocated blocks is correct"""
nblocks = talloc.total_blocks(object)
if object is None:
nblocks -= self.initial_blocks
return mac
def bad_sign_packet(self, packet, key_name):
- '''Add bad signature for a packet by bitflipping
- the final byte in the MAC'''
+ """Add bad signature for a packet by bitflipping
+ the final byte in the MAC"""
mac_list = [x if isinstance(x, int) else ord(x) for x in list("badmac")]
def setUpDynamicTestCases(cls):
def skip(ct, options):
- ''' Filter out any mutually exclusive test options '''
+ """ Filter out any mutually exclusive test options """
if ct != CredentialsType.Machine and\
TestOptions.RemoveDollar.is_set(options):
return True
ou=None, account_control=0, add_dollar=None,
expired_password=False, force_nt4_hash=False,
preserve=True):
- '''Create an account for testing.
+ """Create an account for testing.
The dn of the created account is added to self.accounts,
which is used by tearDownClass to clean up the created accounts.
- '''
+ """
if add_dollar is None and account_type is not self.AccountType.USER:
add_dollar = True
return creds, sname
def as_req(self, cname, sname, realm, etypes, padata=None, kdc_options=0):
- '''Send a Kerberos AS_REQ, returns the undecoded response
- '''
+ """Send a Kerberos AS_REQ, returns the undecoded response
+ """
till = self.get_KerberosTime(offset=36000)
return rep
def get_as_rep_key(self, creds, rep):
- '''Extract the session key from an AS-REP
- '''
+ """Extract the session key from an AS-REP
+ """
rep_padata = self.der_decode(
rep['e-data'],
asn1Spec=krb5_asn1.METHOD_DATA())
return key
def get_enc_timestamp_pa_data(self, creds, rep, skew=0):
- '''generate the pa_data data element for an AS-REQ
- '''
+ """generate the pa_data data element for an AS-REQ
+ """
key = self.get_as_rep_key(creds, rep)
return padata
def get_as_rep_enc_data(self, key, rep):
- ''' Decrypt and Decode the encrypted data in an AS-REP
- '''
+ """ Decrypt and Decode the encrypted data in an AS-REP
+ """
enc_part = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
# MIT KDC encodes both EncASRepPart and EncTGSRepPart with
# application tag 26
expected_error_mode=0, padata=None, kdc_options=0,
to_rodc=False, creds=None, service_creds=None, expect_pac=True,
expect_edata=None, expected_flags=None, unexpected_flags=None):
- '''Send a TGS-REQ, returns the response and the decrypted and
+ """Send a TGS-REQ, returns the response and the decrypted and
decoded enc-part
- '''
+ """
subkey = self.RandomKey(key.etype)
"account_name account_sid logon_name upn domain_name")
def get_pac_data(self, authorization_data):
- '''Decode the PAC element contained in the authorization-data element
- '''
+ """Decode the PAC element contained in the authorization-data element
+ """
account_name = None
user_sid = None
logon_name = None
domain_name)
def decode_service_ticket(self, creds, ticket):
- '''Decrypt and decode a service ticket
- '''
+ """Decrypt and decode a service ticket
+ """
enc_part = ticket['enc-part']
return enc_part
def get_objectSid(self, samdb, dn):
- ''' Get the objectSID for a DN
+ """ Get the objectSID for a DN
Note: performs an Ldb query.
- '''
+ """
res = samdb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
sid = samdb.schema_format_value("objectSID", res[0]["objectSID"][0])
self.do_hexdump = global_hexdump
def test_tgs_req_cname_does_not_not_match_authenticator_cname(self):
- ''' Try and obtain a ticket from the TGS, but supply a cname
+ """ Try and obtain a ticket from the TGS, but supply a cname
that differs from that provided to the krbtgt
- '''
+ """
# Create the user account
samdb = self.get_samdb()
user_name = "tsttktusr"
"rep = {%s}" % rep)
def test_ldap_service_ticket(self):
- '''Get a ticket to the ldap service
- '''
+ """Get a ticket to the ldap service
+ """
# Create the user account
samdb = self.get_samdb()
user_name = "tsttktusr"
class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
- ''' Tests for MS-KILE client principal look-up
+ """ Tests for MS-KILE client principal look-up
See [MS-KILE]: Kerberos Protocol Extensions
section 3.3.5.6.1 Client Principal Lookup
- '''
+ """
def setUp(self):
super().setUp()
"pac_data = {%s}" % str(pac_data))
def test_nt_principal_step_1(self):
- ''' Step 1
+ """ Step 1
For an NT_PRINCIPAL cname with no realm or the realm matches the
DC's domain
search for an account with the
sAMAccountName matching the cname.
- '''
+ """
# Create user and machine accounts for the test.
#
self.assertEqual(realm.upper().encode('UTF8'), enc_part['crealm'])
def test_nt_principal_step_2(self):
- ''' Step 2
+ """ Step 2
If not found
search for sAMAccountName equal to the cname + "$"
- '''
+ """
# Create a machine account for the test.
#
self.assertEqual(realm.upper().encode('UTF8'), enc_part['crealm'])
def test_nt_principal_step_3(self):
- ''' Step 3
+ """ Step 3
If not found
search for a matching UPN name where the UPN is set to
cname@realm or cname@DC's domain name
- '''
+ """
# Create a user account for the test.
#
samdb = self.get_samdb()
self.assertEqual(realm.upper().encode('UTF8'), enc_part['crealm'])
def test_nt_principal_step_4_a(self):
- ''' Step 4, no pre-authentication
+ """ Step 4, no pre-authentication
If not found and no pre-authentication
search for a matching altSecurityIdentity
- '''
+ """
# Create a user account for the test.
# with an altSecurityIdentity, and with UF_DONT_REQUIRE_PREAUTH
# set.
self.check_error_rep(rep, KDC_ERR_TGT_REVOKED)
def test_nt_principal_step_4_b(self):
- ''' Step 4, pre-authentication
+ """ Step 4, pre-authentication
If not found and pre-authentication
search for a matching user principal name
- '''
+ """
# Create user and machine accounts for the test.
#
self.assertEqual(realm.upper().encode('UTF8'), enc_part['crealm'])
def test_nt_principal_step_4_c(self):
- ''' Step 4, pre-authentication
+ """ Step 4, pre-authentication
If not found and pre-authentication
search for a matching user principal name
This test uses the altsecid, so the AS-REQ should fail.
- '''
+ """
# Create user and machine accounts for the test.
#
self.check_error_rep(rep, KDC_ERR_C_PRINCIPAL_UNKNOWN)
def test_enterprise_principal_step_1_3(self):
- ''' Steps 1-3
+ """ Steps 1-3
For an NT_ENTERPRISE_PRINCIPAL cname
search for a user principal name matching the cname
- '''
+ """
# Create a user account for the test.
#
self.assertEqual(realm.upper().encode('UTF8'), crealm)
def test_enterprise_principal_step_4(self):
- ''' Step 4
+ """ Step 4
If that fails
search for an account where the sAMAccountName matches
the name before the @
- '''
+ """
# Create a user account for the test.
#
self.assertEqual(realm.upper().encode('UTF8'), crealm)
def test_enterprise_principal_step_5(self):
- ''' Step 5
+ """ Step 5
If that fails
search for an account where the sAMAccountName matches
the name before the @ with a $ appended.
- '''
+ """
# Create a user account for the test.
#
self.assertEqual(realm.upper().encode('UTF8'), crealm)
def test_enterprise_principal_step_6_a(self):
- ''' Step 6, no pre-authentication
+ """ Step 6, no pre-authentication
If not found and no pre-authentication
search for a matching altSecurityIdentity
- '''
+ """
# Create a user account for the test.
# with an altSecurityIdentity, and with UF_DONT_REQUIRE_PREAUTH
# set.
self.check_error_rep(rep, KDC_ERR_TGT_REVOKED)
def test_nt_enterprise_principal_step_6_b(self):
- ''' Step 4, pre-authentication
+ """ Step 4, pre-authentication
If not found and pre-authentication
search for a matching user principal name
- '''
+ """
# Create user and machine accounts for the test.
#
self.assertEqual(realm.upper().encode('UTF8'), enc_part['crealm'])
def test_nt_principal_step_6_c(self):
- ''' Step 4, pre-authentication
+ """ Step 4, pre-authentication
If not found and pre-authentication
search for a matching user principal name
This test uses the altsecid, so the AS-REQ should fail.
- '''
+ """
# Create user and machine accounts for the test.
#
# ASN.1 Helper functions.
#
def encode_element(ber_type, data):
- ''' Encode an ASN.1 BER element. '''
+ """ Encode an ASN.1 BER element. """
if data is None:
return ber_type + encode_length(0)
return ber_type + encode_length(len(data)) + data
def encode_length(length):
- ''' Encode the length of an ASN.1 BER element. '''
+ """ Encode the length of an ASN.1 BER element. """
if length > 0xFFFFFF:
return b'\x84' + length.to_bytes(4, "big")
def encode_string(string):
- ''' Encode an octet string '''
+ """ Encode an octet string """
return encode_element(OCTET_STRING, string)
def encode_boolean(boolean):
- ''' Encode a boolean value '''
+ """ Encode a boolean value """
if boolean:
return encode_element(BOOLEAN, b'\xFF')
return encode_element(BOOLEAN, b'\x00')
def encode_integer(integer):
- ''' Encode an integer value '''
+ """ Encode an integer value """
bit_len = integer.bit_length()
byte_len = (bit_len // 8) + 1
return encode_element(INTEGER, integer.to_bytes(byte_len, "big"))
def encode_enumerated(enum):
- ''' Encode an enumerated value '''
+ """ Encode an enumerated value """
return encode_element(ENUMERATED, enum.to_bytes(1, "big"))
def encode_sequence(sequence):
- ''' Encode a sequence '''
+ """ Encode a sequence """
return encode_element(SEQUENCE, sequence)
def decode_element(data):
- '''
+ """
decode an ASN.1 element
- '''
+ """
if data is None:
return None
super(RawLdapTest, self).tearDown()
def disconnect(self):
- ''' Disconnect from and clean up the connection to the server '''
+ """ Disconnect from and clean up the connection to the server """
if self.socket is None:
return
self.socket.close()
self.socket = None
def connect(self):
- ''' Establish an ldaps connection to the test server '''
+ """ Establish an ldaps connection to the test server """
#
# Disable host name and certificate verification
context = ssl.create_default_context()
raise
def send(self, req):
- ''' Send the request to the server '''
+ """ Send the request to the server """
try:
self.socket.sendall(req)
except socket.error:
raise
def recv(self, num_recv=0xffff, timeout=None):
- ''' receive an array of bytes from the server '''
+ """ receive an array of bytes from the server """
data = None
try:
if timeout is not None:
return data
def bind(self):
- '''
+ """
Perform a simple bind
- '''
+ """
user = self.user.encode('UTF8')
ou = self.dns_name.replace('.', ',dc=').encode('UTF8')
self.assertGreater(len(rest), 0)
def test_decode_element(self):
- ''' Tests for the decode_element method '''
+ """ Tests for the decode_element method """
# Boolean true value
data = b'\x01\x01\xff'
self.assertEqual(b'\x05\x00'.hex(), rest.hex())
def test_search_equals_maximum_permitted_size(self):
- '''
+ """
Check that an LDAP search request equal to the maximum size is accepted
This test is done on a authenticated connection so that the maximum
non search request is 16MiB.
- '''
+ """
self.bind()
# Lets build an ldap search packet to query the RootDSE
self.assertEqual(0, len(rest))
def test_search_exceeds_maximum_permitted_size(self):
- '''
+ """
Test that a search query longer than the maximum permitted
size is rejected.
This test is done on a authenticated connection so that the maximum
non search request is 16MiB.
- '''
+ """
self.bind()
self.assertIsNone(data)
def test_simple_anonymous_bind(self):
- '''
+ """
Test a simple anonymous bind
- '''
+ """
# Lets build an anonymous simple bind request
bind = encode_integer(3) # ldap version
self.assertGreater(len(rest), 0)
def test_simple_bind_at_limit(self):
- '''
+ """
Test a simple bind, with a large invalid
user name. As the resulting packet is equal
to the maximum unauthenticated packet size we should see
an INVALID_CREDENTIALS response
- '''
+ """
# Lets build a simple bind request
bind = encode_integer(3) # ldap version
self.assertGreater(len(rest), 0)
def test_simple_bind_gt_limit(self):
- '''
+ """
Test a simple bind, with a large invalid
user name. As the resulting packet is one greater than
the maximum unauthenticated packet size we should see
the connection reset.
- '''
+ """
# Lets build a simple bind request
bind = encode_integer(3) # ldap version
self.assertIsNone(data)
def test_unauthenticated_delete_at_limit(self):
- '''
+ """
Test a delete, with a large invalid DN
As the resulting packet is equal to the maximum unauthenticated
packet size we should see an INVALID_DN_SYNTAX response
- '''
+ """
# Lets build a delete request, with a large invalid DN
dn = b' ' * 255987
self.assertGreater(len(rest), 0)
def test_unauthenticated_delete_gt_limit(self):
- '''
+ """
Test a delete, with a large invalid DN
As the resulting packet is greater than the maximum unauthenticated
packet size we should see a connection reset
- '''
+ """
# Lets build a delete request, with a large invalid DN
dn = b' ' * 255988
self.assertIsNone(data)
def test_authenticated_delete_at_limit(self):
- '''
+ """
Test a delete, with a large invalid DN
As the resulting packet is equal to the maximum authenticated
packet size we should see an INVALID_DN_SYNTAX response
- '''
+ """
# Lets build a delete request, with a large invalid DN
dn = b' ' * 16777203
self.assertGreater(len(rest), 0)
def test_authenticated_delete_gt_limit(self):
- '''
+ """
Test a delete, with a large invalid DN
As the resulting packet is one greater than the maximum
authenticated packet size we should see a connection reset
- '''
+ """
# Lets build a delete request, with a large invalid DN
dn = b' ' * 16777204
super(RawCldapTest, self).tearDown()
def disconnect(self):
- ''' Disconnect from and clean up the connection to the server '''
+ """ Disconnect from and clean up the connection to the server """
if self.socket is None:
return
self.socket.close()
self.socket = None
def connect(self):
- ''' Establish an UDP connection to the test server '''
+ """ Establish an UDP connection to the test server """
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
raise
def send(self, req):
- ''' Send the request to the server '''
+ """ Send the request to the server """
try:
self.socket.sendall(req)
except socket.error:
raise
def recv(self, num_recv=0xffff, timeout=None):
- ''' receive an array of bytes from the server '''
+ """ receive an array of bytes from the server """
data = None
try:
if timeout is not None:
return data
def test_search_equals_maximum_permitted_size(self):
- '''
+ """
Check that an CLDAP search request equal to the maximum size is
accepted
- '''
+ """
# Lets build an ldap search packet to query the RootDSE
header = encode_string(None) # Base DN, ""
self.assertEqual(0, len(rest))
def test_search_exceeds_maximum_permitted_size(self):
- '''
+ """
Test that a cldap request longer than the maximum permitted
size is rejected.
- '''
+ """
# Lets build an ldap search packet to query the RootDSE
header = encode_string(None) # Base DN, ""
"Total groups not reported correctly")
def _random_user(self, base=None):
- '''
+ """
create a user with random attribute values, you can specify
base attributes
- '''
+ """
if base is None:
base = {}
user = {
self.assertEqual(0, dotdot['dev'], 'The dev for .. was not 0')
def test_create_context_basic1(self):
- '''
+ """
Check basic CreateContexts response
- '''
+ """
try:
c = libsmb.Conn(
self.server_ip,
self.assertTrue('0000052D' in msg)
def test_old_password_simple_bind(self):
- '''Shows that we can log in with the immediate previous password, but not any earlier passwords.'''
+ """Shows that we can log in with the immediate previous password, but not any earlier passwords."""
user_dn_str = f'CN=testuser,CN=Users,{self.base_dn}'
user_dn = Dn(self.ldb, user_dn_str)
self.fail('should have failed to login with previous password!')
def test_old_password_attempt_reuse(self):
- '''Shows that we cannot reuse the original password after changing the password twice.'''
+ """Shows that we cannot reuse the original password after changing the password twice."""
res = self.ldb.search(self.ldb.domain_dn(), scope=SCOPE_BASE,
attrs=['pwdHistoryLength'])
previous_pwd = new_pwd
def test_old_password_rename_simple_bind(self):
- '''Shows that we can log in with the previous password after renaming the account.'''
+ """Shows that we can log in with the previous password after renaming the account."""
user_dn_str = f'CN=testuser,CN=Users,{self.base_dn}'
user_dn = Dn(self.ldb, user_dn_str)
self.fail('failed to login with previous password!')
def test_old_password_rename_simple_bind_2(self):
- '''Shows that we can rename the account, change the password and log in with the previous password.'''
+ """Shows that we can rename the account, change the password and log in with the previous password."""
user_dn_str = f'CN=testuser,CN=Users,{self.base_dn}'
user_dn = Dn(self.ldb, user_dn_str)
self.fail('failed to login with previous password!')
def test_old_password_rename_attempt_reuse(self):
- '''Shows that we cannot reuse the original password after renaming the account.'''
+ """Shows that we cannot reuse the original password after renaming the account."""
user_dn_str = f'CN=testuser,CN=Users,{self.base_dn}'
user_dn = Dn(self.ldb, user_dn_str)
self.fail('should not have been able to reuse password!')
def test_old_password_rename_attempt_reuse_2(self):
- '''Shows that we cannot reuse the original password after renaming the account and changing the password.'''
+ """Shows that we cannot reuse the original password after renaming the account and changing the password."""
user_dn_str = f'CN=testuser,CN=Users,{self.base_dn}'
user_dn = Dn(self.ldb, user_dn_str)