self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+ def test_no_auth_bind_time_none_simple(self):
+ features = 0
+ btf = base.bind_time_features_syntax(features)
+
+ zero_syntax = misc.ndr_syntax_id()
+
+ tsf1_list = [btf]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = zero_syntax
+ ctx1.transfer_syntaxes = tsf1_list
+
+ req = self.generate_bind(call_id=0, ctx_list=[ctx1])
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
+ auth_length=0)
+ self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+ self.assertEquals(rep.u.secondary_address_size, 4)
+ self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
+ self.assertEquals(len(rep.u._pad1), 2)
+ self.assertEquals(rep.u._pad1, '\0' * 2)
+ self.assertEquals(rep.u.num_results, 1)
+ self.assertEquals(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_NEGOTIATE_ACK)
+ self.assertEquals(rep.u.ctx_list[0].reason, features)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax)
+ self.assertEquals(rep.u.auth_info, '\0' * 0)
+
+ def test_no_auth_bind_time_none_ignore_additional(self):
+ features1 = 0
+ btf1 = base.bind_time_features_syntax(features1)
+
+ features2 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
+ features2 |= dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING
+ btf2 = base.bind_time_features_syntax(features2)
+
+ zero_syntax = misc.ndr_syntax_id()
+ ndr64 = base.transfer_syntax_ndr64()
+
+ tsf1_list = [btf1,btf2,zero_syntax]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = ndr64
+ ctx1.transfer_syntaxes = tsf1_list
+
+ req = self.generate_bind(call_id=0, ctx_list=[ctx1])
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
+ auth_length=0)
+ self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+ self.assertEquals(rep.u.secondary_address_size, 4)
+ self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
+ self.assertEquals(len(rep.u._pad1), 2)
+ self.assertEquals(rep.u._pad1, '\0' * 2)
+ self.assertEquals(rep.u.num_results, 1)
+ self.assertEquals(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_NEGOTIATE_ACK)
+ self.assertEquals(rep.u.ctx_list[0].reason, features1)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax)
+ self.assertEquals(rep.u.auth_info, '\0' * 0)
+
+ def test_no_auth_bind_time_only_first(self):
+ features1 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
+ btf1 = base.bind_time_features_syntax(features1)
+
+ features2 = dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING
+ btf2 = base.bind_time_features_syntax(features2)
+
+ zero_syntax = misc.ndr_syntax_id()
+
+ tsf1_list = [zero_syntax,btf1,btf2,zero_syntax]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = zero_syntax
+ ctx1.transfer_syntaxes = tsf1_list
+
+ req = self.generate_bind(call_id=0, ctx_list=[ctx1])
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
+ auth_length=0)
+ self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+ self.assertEquals(rep.u.secondary_address_size, 4)
+ self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
+ self.assertEquals(len(rep.u._pad1), 2)
+ self.assertEquals(rep.u._pad1, '\0' * 2)
+ self.assertEquals(rep.u.num_results, 1)
+ self.assertEquals(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION)
+ self.assertEquals(rep.u.ctx_list[0].reason,
+ dcerpc.DCERPC_BIND_ACK_REASON_ABSTRACT_SYNTAX_NOT_SUPPORTED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax)
+ self.assertEquals(rep.u.auth_info, '\0' * 0)
+
+ def test_no_auth_bind_time_twice(self):
+ features1 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
+ btf1 = base.bind_time_features_syntax(features1)
+
+ features2 = dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING
+ btf2 = base.bind_time_features_syntax(features2)
+
+ zero_syntax = misc.ndr_syntax_id()
+
+ tsf1_list = [btf1]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = zero_syntax
+ ctx1.transfer_syntaxes = tsf1_list
+
+ tsf2_list = [btf2]
+ ctx2 = dcerpc.ctx_list()
+ ctx2.context_id = 2
+ ctx2.num_transfer_syntaxes = len(tsf2_list)
+ ctx2.abstract_syntax = zero_syntax
+ ctx2.transfer_syntaxes = tsf2_list
+
+ req = self.generate_bind(call_id=0, ctx_list=[ctx1,ctx2])
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
+ auth_length=0)
+ self.assertEquals(rep.u.reject_reason,
+ dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED)
+ self.assertEquals(rep.u.num_versions, 1)
+ self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
+ self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
+ self.assertEquals(len(rep.u._pad), 3)
+ self.assertEquals(rep.u._pad, '\0' * 3)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+
def _test_auth_none_level_bind(self, auth_level,
reason=dcerpc.DCERPC_BIND_NAK_REASON_INVALID_AUTH_TYPE):
ndr32 = base.transfer_syntax_ndr()