py:dcerpc/raw_testcase: add generate_request_auth() helper function
authorStefan Metzmacher <metze@samba.org>
Tue, 20 Nov 2018 16:16:05 +0000 (17:16 +0100)
committerJeremy Allison <jra@samba.org>
Sun, 23 Dec 2018 17:15:21 +0000 (18:15 +0100)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=7113
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11892

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
python/samba/tests/dcerpc/raw_testcase.py

index 79425ad177c4c131eaeef9394a6ede34b6529a52..0bcfd23823e9a645cec7432983ad6fa030a332b1 100644 (file)
@@ -710,6 +710,78 @@ class RawDCERPCTest(TestCase):
 
         return p
 
+    def generate_request_auth(self, call_id,
+                              pfc_flags=(dcerpc.DCERPC_PFC_FLAG_FIRST |
+                                         dcerpc.DCERPC_PFC_FLAG_LAST),
+                              alloc_hint=None,
+                              context_id=None,
+                              opnum=None,
+                              object=None,
+                              stub=None,
+                              auth_context=None,
+                              ndr_print=None, hexdump=None):
+
+        if stub is None:
+            stub = b""
+
+        sig_size = 0
+        if auth_context is not None:
+            mod_len = len(stub) % dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
+            auth_pad_length = 0
+            if mod_len > 0:
+                auth_pad_length = dcerpc.DCERPC_AUTH_PAD_ALIGNMENT - mod_len
+            stub += b'\x00' * auth_pad_length
+
+            if auth_context["g_auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
+                sig_size = auth_context["gensec"].sig_size(len(stub))
+            else:
+                sig_size = 16
+
+            zero_sig = b"\x00" * sig_size
+            auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
+                                           auth_level=auth_context["auth_level"],
+                                           auth_pad_length=auth_pad_length,
+                                           auth_context_id=auth_context["auth_context_id"],
+                                           auth_blob=zero_sig)
+        else:
+            auth_info = b""
+
+        req = self.generate_request(call_id=call_id,
+                                    pfc_flags=pfc_flags,
+                                    alloc_hint=alloc_hint,
+                                    context_id=context_id,
+                                    opnum=opnum,
+                                    object=object,
+                                    stub=stub,
+                                    auth_info=auth_info,
+                                    ndr_print=ndr_print,
+                                    hexdump=hexdump)
+        if auth_context is None:
+            return req
+
+        req_blob = samba.ndr.ndr_pack(req)
+        ofs_stub = dcerpc.DCERPC_REQUEST_LENGTH
+        ofs_sig = len(req_blob) - req.auth_length
+        ofs_trailer = ofs_sig - dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+        req_data = req_blob[ofs_stub:ofs_trailer]
+        req_whole = req_blob[0:ofs_sig]
+
+        if auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PRIVACY:
+            # TODO: not yet supported here
+            self.assertTrue(False)
+        elif auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PACKET:
+            req_sig = auth_context["gensec"].sign_packet(req_data, req_whole)
+        else:
+            return req
+        self.assertEquals(len(req_sig), req.auth_length)
+        self.assertEquals(len(req_sig), sig_size)
+
+        stub_sig_ofs = len(req.u.stub_and_verifier) - sig_size
+        stub = req.u.stub_and_verifier[0:stub_sig_ofs] + req_sig
+        req.u.stub_and_verifier = stub
+
+        return req
+
     def verify_pdu(self, p, ptype, call_id,
                    rpc_vers=5,
                    rpc_vers_minor=0,