py:dcerpc/raw_testcase: use generate_request_auth() in do_single_request()
authorStefan Metzmacher <metze@samba.org>
Tue, 20 Nov 2018 16:19:32 +0000 (17:19 +0100)
committerJeremy Allison <jra@samba.org>
Sun, 23 Dec 2018 17:15:21 +0000 (18:15 +0100)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=7113
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11892

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
python/samba/tests/dcerpc/raw_testcase.py

index 32d626e..596eaf5 100644 (file)
@@ -318,66 +318,20 @@ class RawDCERPCTest(TestCase):
             stub_in = samba.ndr.ndr_pack_in(io, bigendian=bigendian, ndr64=ndr64)
             if hexdump:
                 sys.stderr.write("stub_in: %d\n%s" % (len(stub_in), self.hexdump(stub_in)))
-        else:
-            # only used for sig_size calculation
-            stub_in = b'\xff' * samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
-
-        sig_size = 0
-        if auth_context is not None:
-            mod_len = len(stub_in) % samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
-            auth_pad_length = 0
-            if mod_len > 0:
-                auth_pad_length = samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT - mod_len
-            stub_in += b'\x00' * auth_pad_length
-
-            if auth_context["g_auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
-                sig_size = auth_context["gensec"].sig_size(len(stub_in))
-            else:
-                sig_size = 16
-
-            zero_sig = b"\x00" * sig_size
-            auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
-                                           auth_level=auth_context["auth_level"],
-                                           auth_pad_length=auth_pad_length,
-                                           auth_context_id=auth_context["auth_context_id"],
-                                           auth_blob=zero_sig)
-        else:
-            auth_info = b""
 
         pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
         pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
         if object is not None:
             pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_OBJECT_UUID
 
-        req = self.generate_request(call_id=call_id,
-                                    context_id=ctx.context_id,
-                                    pfc_flags=pfc_flags,
-                                    object=object,
-                                    opnum=io.opnum(),
-                                    stub=stub_in,
-                                    auth_info=auth_info)
-
+        req = self.generate_request_auth(call_id=call_id,
+                                         context_id=ctx.context_id,
+                                         pfc_flags=pfc_flags,
+                                         object=object,
+                                         opnum=io.opnum(),
+                                         stub=stub_in,
+                                         auth_context=auth_context)
         if send_req:
-            if sig_size != 0 and auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
-                req_blob = samba.ndr.ndr_pack(req)
-                ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH
-                ofs_sig = len(req_blob) - req.auth_length
-                ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
-                req_data = req_blob[ofs_stub:ofs_trailer]
-                req_whole = req_blob[0:ofs_sig]
-                sig = auth_context["gensec"].sign_packet(req_data, req_whole)
-                auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
-                                               auth_level=auth_context["auth_level"],
-                                               auth_pad_length=auth_pad_length,
-                                               auth_context_id=auth_context["auth_context_id"],
-                                               auth_blob=sig)
-                req = self.generate_request(call_id=call_id,
-                                            context_id=ctx.context_id,
-                                            pfc_flags=pfc_flags,
-                                            object=object,
-                                            opnum=io.opnum(),
-                                            stub=stub_in,
-                                            auth_info=auth_info)
             self.send_pdu(req, ndr_print=ndr_print, hexdump=hexdump)
         if recv_rep:
             (rep, rep_blob) = self.recv_pdu_raw(timeout=timeout,
@@ -396,7 +350,7 @@ class RawDCERPCTest(TestCase):
                 return
 
             self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
-                            auth_length=sig_size)
+                            auth_length=req.auth_length)
             self.assertNotEquals(rep.u.alloc_hint, 0)
             self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
             self.assertEquals(rep.u.cancel_count, 0)