python/tests: add bind time feature related tests to dcerpc raw protocol tests
authorStefan Metzmacher <metze@samba.org>
Fri, 23 Oct 2015 13:39:34 +0000 (15:39 +0200)
committerAndreas Schneider <asn@cryptomilk.org>
Wed, 26 Oct 2016 09:20:14 +0000 (11:20 +0200)
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andreas Schneider <asn@samba.org>
python/samba/tests/dcerpc/raw_protocol.py

index 3c1d4adb3bca6c03a8a4cd1940e3df72c51c19c8..ef660ea32b77a17c58bea83c83c6d904263bbea9 100755 (executable)
@@ -1296,6 +1296,152 @@ class TestDCERPC_BIND(RawDCERPCTest):
         self.assertEquals(rep.u.cancel_count, 0)
         self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
 
+    def test_no_auth_bind_time_none_simple(self):
+        features = 0
+        btf = base.bind_time_features_syntax(features)
+
+        zero_syntax = misc.ndr_syntax_id()
+
+        tsf1_list = [btf]
+        ctx1 = dcerpc.ctx_list()
+        ctx1.context_id = 1
+        ctx1.num_transfer_syntaxes = len(tsf1_list)
+        ctx1.abstract_syntax = zero_syntax
+        ctx1.transfer_syntaxes = tsf1_list
+
+        req = self.generate_bind(call_id=0, ctx_list=[ctx1])
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
+                        auth_length=0)
+        self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+        self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+        self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+        self.assertEquals(rep.u.secondary_address_size, 4)
+        self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
+        self.assertEquals(len(rep.u._pad1), 2)
+        self.assertEquals(rep.u._pad1, '\0' * 2)
+        self.assertEquals(rep.u.num_results, 1)
+        self.assertEquals(rep.u.ctx_list[0].result,
+                dcerpc.DCERPC_BIND_ACK_RESULT_NEGOTIATE_ACK)
+        self.assertEquals(rep.u.ctx_list[0].reason, features)
+        self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax)
+        self.assertEquals(rep.u.auth_info, '\0' * 0)
+
+    def test_no_auth_bind_time_none_ignore_additional(self):
+        features1 = 0
+        btf1 = base.bind_time_features_syntax(features1)
+
+        features2 =  dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
+        features2 |= dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING
+        btf2 = base.bind_time_features_syntax(features2)
+
+        zero_syntax = misc.ndr_syntax_id()
+        ndr64 = base.transfer_syntax_ndr64()
+
+        tsf1_list = [btf1,btf2,zero_syntax]
+        ctx1 = dcerpc.ctx_list()
+        ctx1.context_id = 1
+        ctx1.num_transfer_syntaxes = len(tsf1_list)
+        ctx1.abstract_syntax = ndr64
+        ctx1.transfer_syntaxes = tsf1_list
+
+        req = self.generate_bind(call_id=0, ctx_list=[ctx1])
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
+                        auth_length=0)
+        self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+        self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+        self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+        self.assertEquals(rep.u.secondary_address_size, 4)
+        self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
+        self.assertEquals(len(rep.u._pad1), 2)
+        self.assertEquals(rep.u._pad1, '\0' * 2)
+        self.assertEquals(rep.u.num_results, 1)
+        self.assertEquals(rep.u.ctx_list[0].result,
+                dcerpc.DCERPC_BIND_ACK_RESULT_NEGOTIATE_ACK)
+        self.assertEquals(rep.u.ctx_list[0].reason, features1)
+        self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax)
+        self.assertEquals(rep.u.auth_info, '\0' * 0)
+
+    def test_no_auth_bind_time_only_first(self):
+        features1 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
+        btf1 = base.bind_time_features_syntax(features1)
+
+        features2 = dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING
+        btf2 = base.bind_time_features_syntax(features2)
+
+        zero_syntax = misc.ndr_syntax_id()
+
+        tsf1_list = [zero_syntax,btf1,btf2,zero_syntax]
+        ctx1 = dcerpc.ctx_list()
+        ctx1.context_id = 1
+        ctx1.num_transfer_syntaxes = len(tsf1_list)
+        ctx1.abstract_syntax = zero_syntax
+        ctx1.transfer_syntaxes = tsf1_list
+
+        req = self.generate_bind(call_id=0, ctx_list=[ctx1])
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
+                        auth_length=0)
+        self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+        self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+        self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+        self.assertEquals(rep.u.secondary_address_size, 4)
+        self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
+        self.assertEquals(len(rep.u._pad1), 2)
+        self.assertEquals(rep.u._pad1, '\0' * 2)
+        self.assertEquals(rep.u.num_results, 1)
+        self.assertEquals(rep.u.ctx_list[0].result,
+                dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION)
+        self.assertEquals(rep.u.ctx_list[0].reason,
+                dcerpc.DCERPC_BIND_ACK_REASON_ABSTRACT_SYNTAX_NOT_SUPPORTED)
+        self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax)
+        self.assertEquals(rep.u.auth_info, '\0' * 0)
+
+    def test_no_auth_bind_time_twice(self):
+        features1 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
+        btf1 = base.bind_time_features_syntax(features1)
+
+        features2 = dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING
+        btf2 = base.bind_time_features_syntax(features2)
+
+        zero_syntax = misc.ndr_syntax_id()
+
+        tsf1_list = [btf1]
+        ctx1 = dcerpc.ctx_list()
+        ctx1.context_id = 1
+        ctx1.num_transfer_syntaxes = len(tsf1_list)
+        ctx1.abstract_syntax = zero_syntax
+        ctx1.transfer_syntaxes = tsf1_list
+
+        tsf2_list = [btf2]
+        ctx2 = dcerpc.ctx_list()
+        ctx2.context_id = 2
+        ctx2.num_transfer_syntaxes = len(tsf2_list)
+        ctx2.abstract_syntax = zero_syntax
+        ctx2.transfer_syntaxes = tsf2_list
+
+        req = self.generate_bind(call_id=0, ctx_list=[ctx1,ctx2])
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
+                        auth_length=0)
+        self.assertEquals(rep.u.reject_reason,
+                dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED)
+        self.assertEquals(rep.u.num_versions, 1)
+        self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
+        self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
+        self.assertEquals(len(rep.u._pad), 3)
+        self.assertEquals(rep.u._pad, '\0' * 3)
+
+        # wait for a disconnect
+        rep = self.recv_pdu()
+        self.assertIsNone(rep)
+        self.assertNotConnected()
+
     def _test_auth_none_level_bind(self, auth_level,
                                    reason=dcerpc.DCERPC_BIND_NAK_REASON_INVALID_AUTH_TYPE):
         ndr32 = base.transfer_syntax_ndr()