return a
+ def check_response_auth(self, rep, rep_blob, auth_context=None,
+ auth_pad_length=None):
+
+ if auth_context is None:
+ self.assertEquals(rep.auth_length, 0)
+ return rep.u.stub_and_verifier
+
+ ofs_stub = dcerpc.DCERPC_REQUEST_LENGTH
+ ofs_sig = rep.frag_length - rep.auth_length
+ ofs_trailer = ofs_sig - dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ rep_data = rep_blob[ofs_stub:ofs_trailer]
+ rep_whole = rep_blob[0:ofs_sig]
+ rep_sig = rep_blob[ofs_sig:]
+ rep_auth_info_blob = rep_blob[ofs_trailer:]
+
+ rep_auth_info = self.parse_auth(rep_auth_info_blob,
+ auth_context=auth_context,
+ stub_len=len(rep_data))
+ if auth_pad_length is not None:
+ self.assertEquals(rep_auth_info.auth_pad_length, auth_pad_length)
+ self.assertEquals(rep_auth_info.credentials, rep_sig)
+
+ if auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PRIVACY:
+ # TODO: not yet supported here
+ self.assertTrue(False)
+ elif auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PACKET:
+ auth_context["gensec"].check_packet(rep_data, rep_whole, rep_sig)
+
+ stub_out = rep_data[0:len(rep_data)-rep_auth_info.auth_pad_length]
+
+ return stub_out
+
def generate_pdu(self, ptype, call_id, payload,
rpc_vers=5,
rpc_vers_minor=0,