py:dcerpc/raw_protocol: add tests for delayed header signing activation
authorStefan Metzmacher <metze@samba.org>
Wed, 21 Nov 2018 10:49:40 +0000 (11:49 +0100)
committerJeremy Allison <jra@samba.org>
Sat, 12 Jan 2019 02:13:40 +0000 (03:13 +0100)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=7113
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11892

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
python/samba/tests/dcerpc/raw_protocol.py
selftest/knownfail.d/test_krb5_hdr_sign_delayed [new file with mode: 0644]

index aab6d86253f6b54cee68243b136c45f120114470..384b7d47bcae1d1387bc0dcbcab58ac9cfa00fa2 100755 (executable)
@@ -4988,6 +4988,192 @@ class TestDCERPC_BIND(RawDCERPCTest):
         conn2._disconnect("End of Test")
         return
 
         conn2._disconnect("End of Test")
         return
 
+    def _test_krb5_hdr_sign_delayed1(self, do_upgrade):
+        auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5
+        auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY
+        auth_context_id = 1
+
+        creds = self.get_user_creds()
+
+        abstract = samba.dcerpc.mgmt.abstract_syntax()
+        transfer = base.transfer_syntax_ndr()
+
+        tsf1_list = [transfer]
+        ctx = samba.dcerpc.dcerpc.ctx_list()
+        ctx.context_id = 1
+        ctx.num_transfer_syntaxes = len(tsf1_list)
+        ctx.abstract_syntax = abstract
+        ctx.transfer_syntaxes = tsf1_list
+
+        auth_context = self.get_auth_context_creds(creds=creds,
+                                                   auth_type=auth_type,
+                                                   auth_level=auth_level,
+                                                   auth_context_id=auth_context_id,
+                                                   hdr_signing=False)
+
+        ack = self.do_generic_bind(call_id=1,
+                                   ctx=ctx,
+                                   auth_context=auth_context)
+
+        inq_if_ids = samba.dcerpc.mgmt.inq_if_ids()
+        self.do_single_request(call_id=2, ctx=ctx, io=inq_if_ids,
+                               auth_context=auth_context)
+
+        #
+        # This is just an alter context without authentication
+        # But it can turn on header signing for the whole connection
+        #
+        ack2 = self.do_generic_bind(call_id=3, ctx=ctx,
+                                    pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST |
+                                    dcerpc.DCERPC_PFC_FLAG_LAST |
+                                    dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN,
+                                    assoc_group_id = ack.u.assoc_group_id,
+                                    start_with_alter=True)
+
+        self.assertFalse(auth_context['hdr_signing'])
+        if do_upgrade:
+            auth_context['hdr_signing'] = True
+            auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER)
+            fault_status=None
+        else:
+            fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR
+
+        self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids,
+                               auth_context=auth_context,
+                               fault_status=fault_status)
+
+        if fault_status is not None:
+            # wait for a disconnect
+            rep = self.recv_pdu()
+            self.assertIsNone(rep)
+            self.assertNotConnected()
+            return
+
+        self.do_single_request(call_id=5, ctx=ctx, io=inq_if_ids,
+                               auth_context=auth_context)
+        return
+
+    def test_krb5_hdr_sign_delayed1_ok1(self):
+        return self._test_krb5_hdr_sign_delayed1(do_upgrade=True)
+
+    def test_krb5_hdr_sign_delayed1_fail1(self):
+        return self._test_krb5_hdr_sign_delayed1(do_upgrade=False)
+
+    def _test_krb5_hdr_sign_delayed2(self, do_upgrade):
+        auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5
+        auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY
+        auth_context_id = 1
+
+        creds = self.get_user_creds()
+
+        abstract = samba.dcerpc.mgmt.abstract_syntax()
+        transfer = base.transfer_syntax_ndr()
+
+        tsf1_list = [transfer]
+        ctx = samba.dcerpc.dcerpc.ctx_list()
+        ctx.context_id = 1
+        ctx.num_transfer_syntaxes = len(tsf1_list)
+        ctx.abstract_syntax = abstract
+        ctx.transfer_syntaxes = tsf1_list
+
+        auth_context = self.get_auth_context_creds(creds=creds,
+                                                   auth_type=auth_type,
+                                                   auth_level=auth_level,
+                                                   auth_context_id=auth_context_id,
+                                                   hdr_signing=False)
+
+        #
+        # SUPPORT_HEADER_SIGN on alter context activates header signing
+        #
+        ack = self.do_generic_bind(call_id=1,
+                                   ctx=ctx,
+                                   auth_context=auth_context,
+                                   pfc_flags_2nd=dcerpc.DCERPC_PFC_FLAG_FIRST |
+                                      dcerpc.DCERPC_PFC_FLAG_LAST |
+                                      dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN)
+
+        self.assertFalse(auth_context['hdr_signing'])
+        if do_upgrade:
+            auth_context['hdr_signing'] = True
+            auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER)
+            fault_status=None
+        else:
+            fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR
+
+        inq_if_ids = samba.dcerpc.mgmt.inq_if_ids()
+        self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids,
+                               auth_context=auth_context,
+                               fault_status=fault_status)
+
+        if fault_status is not None:
+            # wait for a disconnect
+            rep = self.recv_pdu()
+            self.assertIsNone(rep)
+            self.assertNotConnected()
+            return
+
+        self.do_single_request(call_id=5, ctx=ctx, io=inq_if_ids,
+                               auth_context=auth_context)
+        return
+
+    def test_krb5_hdr_sign_delayed2_ok1(self):
+        return self._test_krb5_hdr_sign_delayed2(do_upgrade=True)
+
+    def test_krb5_hdr_sign_delayed2_fail1(self):
+        return self._test_krb5_hdr_sign_delayed2(do_upgrade=False)
+
+    def test_krb5_hdr_sign_delayed3_fail1(self):
+        auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5
+        auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY
+        auth_context_id = 1
+
+        creds = self.get_user_creds()
+
+        abstract = samba.dcerpc.mgmt.abstract_syntax()
+        transfer = base.transfer_syntax_ndr()
+
+        tsf1_list = [transfer]
+        ctx = samba.dcerpc.dcerpc.ctx_list()
+        ctx.context_id = 1
+        ctx.num_transfer_syntaxes = len(tsf1_list)
+        ctx.abstract_syntax = abstract
+        ctx.transfer_syntaxes = tsf1_list
+
+        auth_context = self.get_auth_context_creds(creds=creds,
+                                                   auth_type=auth_type,
+                                                   auth_level=auth_level,
+                                                   auth_context_id=auth_context_id,
+                                                   hdr_signing=False)
+
+        #
+        # SUPPORT_HEADER_SIGN on auth3 doesn't activate header signing
+        #
+        ack = self.do_generic_bind(call_id=1,
+                                   ctx=ctx,
+                                   auth_context=auth_context,
+                                   pfc_flags_2nd=dcerpc.DCERPC_PFC_FLAG_FIRST |
+                                      dcerpc.DCERPC_PFC_FLAG_LAST |
+                                      dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN,
+                                   use_auth3=True)
+
+        inq_if_ids = samba.dcerpc.mgmt.inq_if_ids()
+        self.do_single_request(call_id=2, ctx=ctx, io=inq_if_ids,
+                               auth_context=auth_context)
+
+        self.assertFalse(auth_context['hdr_signing'])
+        auth_context['hdr_signing'] = True
+        auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER)
+        fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR
+
+        self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids,
+                               auth_context=auth_context,
+                               fault_status=fault_status)
+
+        # wait for a disconnect
+        rep = self.recv_pdu()
+        self.assertIsNone(rep)
+        self.assertNotConnected()
+        return
 
 if __name__ == "__main__":
     global_ndr_print = True
 
 if __name__ == "__main__":
     global_ndr_print = True
diff --git a/selftest/knownfail.d/test_krb5_hdr_sign_delayed b/selftest/knownfail.d/test_krb5_hdr_sign_delayed
new file mode 100644 (file)
index 0000000..62231cb
--- /dev/null
@@ -0,0 +1,2 @@
+^samba.tests.dcerpc.raw_protocol.*.TestDCERPC_BIND.test_krb5_hdr_sign_delayed1
+^samba.tests.dcerpc.raw_protocol.*.TestDCERPC_BIND.test_krb5_hdr_sign_delayed2