self.etype = key.enctype
self.ctype = EncTypeChecksum[self.etype]
self.kvno = kvno
- return
def encrypt(self, usage, plaintext):
ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
self.forced_keys = {}
self.forced_salt = None
- return
def set_as_supported_enctypes(self, value):
self.as_supported_enctypes = int(value)
- return
def set_tgs_supported_enctypes(self, value):
self.tgs_supported_enctypes = int(value)
- return
def set_ap_supported_enctypes(self, value):
self.ap_supported_enctypes = int(value)
- return
def _get_krb5_etypes(self, supported_enctypes):
etypes = ()
def set_forced_salt(self, salt):
self.forced_salt = bytes(salt)
- return
def get_forced_salt(self):
return self.forced_salt
self.decryption_key = decryption_key
self.ticket_private = ticket_private
self.encpart_private = encpart_private
- return
class RawKerberosTest(TestCaseInTempDir):
cls.etype_test_permutations = res
cls.setup_etype_test_permutations_done = True
- return
@classmethod
def etype_test_permutation_name_idx(cls):
except IOError:
self.s.close()
raise
- except Exception:
- raise
- finally:
- pass
def connect(self):
self.assertNotConnected()
self._connect_tcp()
if self.do_hexdump:
sys.stderr.write("connected[%s]\n" % self.host)
- return
def env_get_var(self, varname, prefix,
fallback_default=True,
except IOError as e:
self._disconnect("send_pdu: %s" % e)
raise
- finally:
- pass
def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
rep_pdu = None
except socket.timeout:
self.s.settimeout(10)
sys.stderr.write("recv_raw: TIMEOUT\n")
- pass
except socket.error as e:
self._disconnect("recv_raw: %s" % e)
raise
except IOError as e:
self._disconnect("recv_raw: %s" % e)
raise
- finally:
- pass
return rep_pdu
def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
rep_pdu = None
rep = None
- try:
+ raw_pdu = self.recv_raw(
+ num_recv=4, hexdump=hexdump, timeout=timeout)
+ if raw_pdu is None:
+ return (None, None)
+ header = struct.unpack(">I", raw_pdu[0:4])
+ k5_len = header[0]
+ if k5_len == 0:
+ return (None, "")
+ missing = k5_len
+ rep_pdu = b''
+ while missing > 0:
raw_pdu = self.recv_raw(
- num_recv=4, hexdump=hexdump, timeout=timeout)
- if raw_pdu is None:
- return (None, None)
- header = struct.unpack(">I", raw_pdu[0:4])
- k5_len = header[0]
- if k5_len == 0:
- return (None, "")
- missing = k5_len
- rep_pdu = b''
- while missing > 0:
- raw_pdu = self.recv_raw(
- num_recv=missing, hexdump=hexdump, timeout=timeout)
- self.assertGreaterEqual(len(raw_pdu), 1)
- rep_pdu += raw_pdu
- missing = k5_len - len(rep_pdu)
- k5_raw = self.der_decode(
- rep_pdu,
- asn1Spec=None,
- native_encode=False,
- asn1_print=False,
- hexdump=False)
- pvno = k5_raw['field-0']
- self.assertEqual(pvno, 5)
- msg_type = k5_raw['field-1']
- self.assertIn(msg_type, [11, 13, 30])
- if msg_type == 11:
- asn1Spec = krb5_asn1.AS_REP()
- elif msg_type == 13:
- asn1Spec = krb5_asn1.TGS_REP()
- elif msg_type == 30:
- asn1Spec = krb5_asn1.KRB_ERROR()
- rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
- asn1_print=asn1_print, hexdump=False)
- finally:
- pass
+ num_recv=missing, hexdump=hexdump, timeout=timeout)
+ self.assertGreaterEqual(len(raw_pdu), 1)
+ rep_pdu += raw_pdu
+ missing = k5_len - len(rep_pdu)
+ k5_raw = self.der_decode(
+ rep_pdu,
+ asn1Spec=None,
+ native_encode=False,
+ asn1_print=False,
+ hexdump=False)
+ pvno = k5_raw['field-0']
+ self.assertEqual(pvno, 5)
+ msg_type = k5_raw['field-1']
+ self.assertIn(msg_type, [11, 13, 30])
+ if msg_type == 11:
+ asn1Spec = krb5_asn1.AS_REP()
+ elif msg_type == 13:
+ asn1Spec = krb5_asn1.TGS_REP()
+ elif msg_type == 30:
+ asn1Spec = krb5_asn1.KRB_ERROR()
+ rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
+ asn1_print=asn1_print, hexdump=False)
return (rep, rep_pdu)
def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
def assertIsConnected(self):
self.assertIsNotNone(self.s, msg="Not connected")
- return
def assertNotConnected(self):
self.assertIsNone(self.s, msg="Is connected")
- return
def send_recv_transaction(
self,
def assertNoValue(self, value):
self.assertTrue(value.isNoValue)
- return
def assertHasValue(self, value):
self.assertIsNotNone(value)
- return
def getElementValue(self, obj, elem):
v = None
def assertElementMissing(self, obj, elem):
v = self.getElementValue(obj, elem)
self.assertIsNone(v)
- return
def assertElementPresent(self, obj, elem):
v = self.getElementValue(obj, elem)
self.assertIsNotNone(v)
- return
def assertElementEqual(self, obj, elem, value):
v = self.getElementValue(obj, elem)
self.assertIsNotNone(v)
self.assertEqual(v, value)
- return
def assertElementEqualUTF8(self, obj, elem, value):
v = self.getElementValue(obj, elem)
self.assertIsNotNone(v)
self.assertEqual(v, bytes(value, 'utf8'))
- return
def assertPrincipalEqual(self, princ1, princ2):
self.assertEqual(princ1['name-type'], princ2['name-type'])
princ1['name-string'][idx],
princ2['name-string'][idx],
msg="princ1=%s != princ2=%s" % (princ1, princ2))
- return
def assertElementEqualPrincipal(self, obj, elem, value):
v = self.getElementValue(obj, elem)
self.assertIsNotNone(v)
v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
self.assertPrincipalEqual(v, value)
- return
def assertElementKVNO(self, obj, elem, value):
v = self.getElementValue(obj, elem)
self.assertEqual(v, value)
else:
self.assertIsNone(v)
- return
def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
if epoch is None:
encpart_private=encpart_private)
kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
- return
def generic_check_as_error(self,
kdc_exchange_dict,