py:dcerpc/raw_testcase: add check_response_auth() helper function
authorStefan Metzmacher <metze@samba.org>
Tue, 20 Nov 2018 15:02:50 +0000 (16:02 +0100)
committerJeremy Allison <jra@samba.org>
Sun, 23 Dec 2018 17:15:21 +0000 (18:15 +0100)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=7113
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11892

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
python/samba/tests/dcerpc/raw_testcase.py

index 035d3ac0c6a21c09b1f4a1766e6f8a260bc8d734..79425ad177c4c131eaeef9394a6ede34b6529a52 100644 (file)
@@ -646,6 +646,38 @@ class RawDCERPCTest(TestCase):
 
         return a
 
+    def check_response_auth(self, rep, rep_blob, auth_context=None,
+                            auth_pad_length=None):
+
+        if auth_context is None:
+            self.assertEquals(rep.auth_length, 0)
+            return rep.u.stub_and_verifier
+
+        ofs_stub = dcerpc.DCERPC_REQUEST_LENGTH
+        ofs_sig = rep.frag_length - rep.auth_length
+        ofs_trailer = ofs_sig - dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+        rep_data = rep_blob[ofs_stub:ofs_trailer]
+        rep_whole = rep_blob[0:ofs_sig]
+        rep_sig = rep_blob[ofs_sig:]
+        rep_auth_info_blob = rep_blob[ofs_trailer:]
+
+        rep_auth_info = self.parse_auth(rep_auth_info_blob,
+                                        auth_context=auth_context,
+                                        stub_len=len(rep_data))
+        if auth_pad_length is not None:
+            self.assertEquals(rep_auth_info.auth_pad_length, auth_pad_length)
+        self.assertEquals(rep_auth_info.credentials, rep_sig)
+
+        if auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PRIVACY:
+            # TODO: not yet supported here
+            self.assertTrue(False)
+        elif auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PACKET:
+            auth_context["gensec"].check_packet(rep_data, rep_whole, rep_sig)
+
+        stub_out = rep_data[0:len(rep_data)-rep_auth_info.auth_pad_length]
+
+        return stub_out
+
     def generate_pdu(self, ptype, call_id, payload,
                      rpc_vers=5,
                      rpc_vers_minor=0,