"""Samba Python tests."""
import os
+import tempfile
import ldb
import samba
-import samba.auth
from samba import param
-from samba.samdb import SamDB
from samba import credentials
-import samba.ndr
-import samba.dcerpc.dcerpc
-import samba.dcerpc.base
-import samba.dcerpc.epmapper
from samba.credentials import Credentials
from samba import gensec
import socket
import sys
import tempfile
import unittest
+import re
+import samba.auth
+import samba.dcerpc.base
+from samba.compat import PY3, text_type
+from samba.compat import string_types
+from random import randint
+try:
+ from samba.samdb import SamDB
+except ImportError:
+ # We are built without samdb support,
+ # imitate it so that connect_samdb() can recover
+ def SamDB(*args, **kwargs):
+ return None
+
+import samba.ndr
+import samba.dcerpc.dcerpc
+import samba.dcerpc.epmapper
try:
from unittest import SkipTest
class SkipTest(Exception):
"""Test skipped."""
-HEXDUMP_FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
+HEXDUMP_FILTER = bytearray([x if ((len(repr(chr(x))) == 3) and (x < 127)) else ord('.') for x in range(256)])
class TestCase(unittest.TestCase):
"""A Samba test case."""
def get_credentials(self):
return cmdline_credentials
+ def get_creds_ccache_name(self):
+ creds = self.get_credentials()
+ ccache = creds.get_named_ccache(self.get_loadparm())
+ ccache_name = ccache.get_name()
+
+ return ccache_name
+
def hexdump(self, src):
N = 0
result = ''
+ is_string = isinstance(src, string_types)
while src:
ll = src[:8]
lr = src[8:16]
src = src[16:]
- hl = ' '.join(["%02X" % ord(x) for x in ll])
- hr = ' '.join(["%02X" % ord(x) for x in lr])
- ll = ll.translate(HEXDUMP_FILTER)
- lr = lr.translate(HEXDUMP_FILTER)
+ if is_string:
+ hl = ' '.join(["%02X" % ord(x) for x in ll])
+ hr = ' '.join(["%02X" % ord(x) for x in lr])
+ ll = ll.translate(HEXDUMP_FILTER)
+ lr = lr.translate(HEXDUMP_FILTER)
+ else:
+ hl = ' '.join(["%02X" % x for x in ll])
+ hr = ' '.join(["%02X" % x for x in lr])
+ ll = ll.translate(HEXDUMP_FILTER).decode('utf8')
+ lr = lr.translate(HEXDUMP_FILTER).decode('utf8')
result += "[%04X] %-*s %-*s %s %s\n" % (N, 8*3, hl, 8*3, hr, ll, lr)
N += 16
return result
+ def insta_creds(self, template=None, username=None, userpass=None, kerberos_state=None):
+
+ if template is None:
+ assert template is not None
+
+ if username is not None:
+ assert userpass is not None
+
+ if username is None:
+ assert userpass is None
+
+ username = template.get_username()
+ userpass = template.get_password()
+
+ if kerberos_state is None:
+ kerberos_state = template.get_kerberos_state()
+
+ # get a copy of the global creds or a the passed in creds
+ c = Credentials()
+ c.set_username(username)
+ c.set_password(userpass)
+ c.set_domain(template.get_domain())
+ c.set_realm(template.get_realm())
+ c.set_workstation(template.get_workstation())
+ c.set_gensec_features(c.get_gensec_features()
+ | gensec.FEATURE_SEAL)
+ c.set_kerberos_state(kerberos_state)
+ return c
+
+
+
# These functions didn't exist before Python2.7:
if sys.version_info < (2, 7):
import warnings
self._cleanups = getattr(self, "_cleanups", []) + [
(fn, args, kwargs)]
+ def assertRegexpMatches(self, text, regex, msg=None):
+ # PY3 note: Python 3 will never see this, but we use
+ # text_type for the benefit of linters.
+ if isinstance(regex, (str, text_type)):
+ regex = re.compile(regex)
+ if not regex.search(text):
+ self.fail(msg)
+
def _addSkip(self, result, reason):
addSkip = getattr(result, 'addSkip', None)
if addSkip is not None:
try:
try:
self.setUp()
- except SkipTest, e:
+ except SkipTest as e:
self._addSkip(result, str(e))
return
except KeyboardInterrupt:
try:
testMethod()
ok = True
- except SkipTest, e:
+ except SkipTest as e:
self._addSkip(result, str(e))
return
except self.failureException:
try:
self.tearDown()
- except SkipTest, e:
+ except SkipTest as e:
self._addSkip(result, str(e))
except KeyboardInterrupt:
raise
finally:
result.stopTest(self)
+ def assertStringsEqual(self, a, b, msg=None, strip=False):
+ """Assert equality between two strings and highlight any differences.
+ If strip is true, leading and trailing whitespace is ignored."""
+ if strip:
+ a = a.strip()
+ b = b.strip()
+
+ if a != b:
+ sys.stderr.write("The strings differ %s(lengths %d vs %d); "
+ "a diff follows\n"
+ % ('when stripped ' if strip else '',
+ len(a), len(b),
+ ))
+
+ from difflib import unified_diff
+ diff = unified_diff(a.splitlines(True),
+ b.splitlines(True),
+ 'a', 'b')
+ for line in diff:
+ sys.stderr.write(line)
+
+ self.fail(msg)
+
class LdbTestCase(TestCase):
"""Trivial test case for running tests against a LDB."""
def setUp(self):
super(LdbTestCase, self).setUp()
- self.filename = os.tempnam()
+ self.tempfile = tempfile.NamedTemporaryFile(delete=False)
+ self.filename = self.tempfile.name
self.ldb = samba.Ldb(self.filename)
def set_modules(self, modules=[]):
class RpcInterfaceTestCase(TestCase):
"""DCE/RPC Test case."""
-class RawDCERPCTest(TestCase):
- """A raw DCE/RPC Test case."""
-
- def _disconnect(self, reason):
- if self.s is None:
- return
- self.s.close()
- self.s = None
- if self.do_hexdump:
- sys.stderr.write("disconnect[%s]\n" % reason)
-
- def connect(self):
- try:
- self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
- socket.SOCK_STREAM, socket.SOL_TCP,
- 0)
- self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
- self.s.settimeout(10)
- self.s.connect(self.a[0][4])
- except socket.error as e:
- self.s.close()
- raise
- except IOError as e:
- self.s.close()
- raise
- except Exception as e:
- raise
- finally:
- pass
-
- def setUp(self):
- super(RawDCERPCTest, self).setUp()
- self.do_ndr_print = False
- self.do_hexdump = False
-
- self.host = samba.tests.env_get_var_value('SERVER')
- self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True)
- if self.target_hostname is None:
- self.target_hostname = self.host
- self.tcp_port = 135
-
- self.settings = {}
- self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
- self.settings["target_hostname"] = self.target_hostname
-
- self.connect()
-
- def get_user_creds(self):
- c = Credentials()
- c.guess()
- username = samba.tests.env_get_var_value('USERNAME')
- password = samba.tests.env_get_var_value('PASSWORD')
- c.set_username(username)
- c.set_password(password)
- return c
-
- def get_anon_creds(self):
- c = Credentials()
- c.set_anonymous()
- return c
-
- def get_auth_context_creds(self, creds, auth_type, auth_level,
- auth_context_id,
- g_auth_level=None):
-
- if g_auth_level is None:
- g_auth_level = auth_level
-
- g = gensec.Security.start_client(self.settings)
- g.set_credentials(creds)
- g.want_feature(gensec.FEATURE_DCE_STYLE)
- g.start_mech_by_authtype(auth_type, g_auth_level)
-
- auth_context = {}
- auth_context["auth_type"] = auth_type
- auth_context["auth_level"] = auth_level
- auth_context["auth_context_id"] = auth_context_id
- auth_context["g_auth_level"] = g_auth_level
- auth_context["gensec"] = g
-
- return auth_context
-
- def do_generic_bind(self, ctx, auth_context=None,
- pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- assoc_group_id=0, call_id=0,
- nak_reason=None, alter_fault=None):
- ctx_list = [ctx]
-
- if auth_context is not None:
- from_server = ""
- (finished, to_server) = auth_context["gensec"].update(from_server)
- self.assertFalse(finished)
-
- auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
- auth_level=auth_context["auth_level"],
- auth_context_id=auth_context["auth_context_id"],
- auth_blob=to_server)
- else:
- auth_info = ""
-
- req = self.generate_bind(call_id=call_id,
- pfc_flags=pfc_flags,
- ctx_list=ctx_list,
- assoc_group_id=assoc_group_id,
- auth_info=auth_info)
- self.send_pdu(req)
- rep = self.recv_pdu()
- if nak_reason is not None:
- self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
- auth_length=0)
- self.assertEquals(rep.u.reject_reason, nak_reason)
- self.assertEquals(rep.u.num_versions, 1)
- self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
- self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
- self.assertEquals(len(rep.u._pad), 3)
- self.assertEquals(rep.u._pad, '\0' * 3)
- return
- self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
- pfc_flags=pfc_flags)
- self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
- self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
- if assoc_group_id != 0:
- self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
- else:
- self.assertNotEquals(rep.u.assoc_group_id, 0)
- assoc_group_id = rep.u.assoc_group_id
- port_str = "%d" % self.tcp_port
- port_len = len(port_str) + 1
- mod_len = (2 + port_len) % 4
- if mod_len != 0:
- port_pad = 4 - mod_len
- else:
- port_pad = 0
- self.assertEquals(rep.u.secondary_address_size, port_len)
- self.assertEquals(rep.u.secondary_address, port_str)
- self.assertEquals(len(rep.u._pad1), port_pad)
- # sometimes windows sends random bytes
- # self.assertEquals(rep.u._pad1, '\0' * port_pad)
- self.assertEquals(rep.u.num_results, 1)
- self.assertEquals(rep.u.ctx_list[0].result,
- samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
- self.assertEquals(rep.u.ctx_list[0].reason,
- samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
- self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
- ack = rep
- if auth_context is None:
- self.assertEquals(rep.auth_length, 0)
- self.assertEquals(len(rep.u.auth_info), 0)
- return ack
- self.assertNotEquals(rep.auth_length, 0)
- self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
- self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
-
- a = self.parse_auth(rep.u.auth_info)
-
- from_server = a.credentials
- (finished, to_server) = auth_context["gensec"].update(from_server)
- self.assertFalse(finished)
-
- auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
- auth_level=auth_context["auth_level"],
- auth_context_id=auth_context["auth_context_id"],
- auth_blob=to_server)
- req = self.generate_alter(call_id=call_id,
- ctx_list=ctx_list,
- assoc_group_id=0xffffffff-assoc_group_id,
- auth_info=auth_info)
- self.send_pdu(req)
- rep = self.recv_pdu()
- if alter_fault is not None:
- self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
- pfc_flags=req.pfc_flags |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
- auth_length=0)
- self.assertNotEquals(rep.u.alloc_hint, 0)
- self.assertEquals(rep.u.context_id, 0)
- self.assertEquals(rep.u.cancel_count, 0)
- self.assertEquals(rep.u.flags, 0)
- self.assertEquals(rep.u.status, alter_fault)
- self.assertEquals(rep.u.reserved, 0)
- self.assertEquals(len(rep.u.error_and_verifier), 0)
- return None
- self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id)
- self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
- self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
- self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
- self.assertEquals(rep.u.secondary_address_size, 0)
- self.assertEquals(rep.u.secondary_address, '')
- self.assertEquals(len(rep.u._pad1), 2)
- # sometimes windows sends random bytes
- # self.assertEquals(rep.u._pad1, '\0' * 2)
- self.assertEquals(rep.u.num_results, 1)
- self.assertEquals(rep.u.ctx_list[0].result,
- samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
- self.assertEquals(rep.u.ctx_list[0].reason,
- samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
- self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
- self.assertNotEquals(rep.auth_length, 0)
- self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
- self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
-
- a = self.parse_auth(rep.u.auth_info)
-
- from_server = a.credentials
- (finished, to_server) = auth_context["gensec"].update(from_server)
- self.assertTrue(finished)
-
- return ack
-
- def prepare_presentation(self, abstract, transfer, object=None,
- context_id=0xffff, epmap=False, auth_context=None,
- pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- assoc_group_id=0,
- return_ack=False):
- if epmap:
- self.epmap_reconnect(abstract, transfer=transfer, object=object)
-
- tsf1_list = [transfer]
- ctx = samba.dcerpc.dcerpc.ctx_list()
- ctx.context_id = context_id
- ctx.num_transfer_syntaxes = len(tsf1_list)
- ctx.abstract_syntax = abstract
- ctx.transfer_syntaxes = tsf1_list
-
- ack = self.do_generic_bind(ctx=ctx,
- auth_context=auth_context,
- pfc_flags=pfc_flags,
- assoc_group_id=assoc_group_id)
- if ack is None:
- ctx = None
-
- if return_ack:
- return (ctx, ack)
- return ctx
-
- def do_single_request(self, call_id, ctx, io,
- auth_context=None,
- object=None,
- bigendian=False, ndr64=False,
- allow_remaining=False,
- send_req=True,
- recv_rep=True,
- fault_pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- fault_status=None,
- fault_context_id=None,
- timeout=None,
- ndr_print=None,
- hexdump=None):
-
- if fault_context_id is None:
- fault_context_id = ctx.context_id
-
- if ndr_print is None:
- ndr_print = self.do_ndr_print
- if hexdump is None:
- hexdump = self.do_hexdump
-
- if send_req:
- if ndr_print:
- sys.stderr.write("in: %s" % samba.ndr.ndr_print_in(io))
- stub_in = samba.ndr.ndr_pack_in(io, bigendian=bigendian, ndr64=ndr64)
- if hexdump:
- sys.stderr.write("stub_in: %d\n%s" % (len(stub_in), self.hexdump(stub_in)))
- else:
- # only used for sig_size calculation
- stub_in = '\xff' * samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
-
- sig_size = 0
- if auth_context is not None:
- mod_len = len(stub_in) % samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
- auth_pad_length = 0
- if mod_len > 0:
- auth_pad_length = samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT - mod_len
- stub_in += '\x00' * auth_pad_length
-
- if auth_context["g_auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
- sig_size = auth_context["gensec"].sig_size(len(stub_in))
- else:
- sig_size = 16
-
- zero_sig = "\x00"*sig_size
- auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
- auth_level=auth_context["auth_level"],
- auth_pad_length=auth_pad_length,
- auth_context_id=auth_context["auth_context_id"],
- auth_blob=zero_sig)
- else:
- auth_info=""
-
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
- pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
- if object is not None:
- pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_OBJECT_UUID
-
- req = self.generate_request(call_id=call_id,
- context_id=ctx.context_id,
- pfc_flags=pfc_flags,
- object=object,
- opnum=io.opnum(),
- stub=stub_in,
- auth_info=auth_info)
-
- if send_req:
- if sig_size != 0 and auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
- req_blob = samba.ndr.ndr_pack(req)
- ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH
- ofs_sig = len(req_blob) - req.auth_length
- ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
- req_data = req_blob[ofs_stub:ofs_trailer]
- req_whole = req_blob[0:ofs_sig]
- sig = auth_context["gensec"].sign_packet(req_data, req_whole)
- auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
- auth_level=auth_context["auth_level"],
- auth_pad_length=auth_pad_length,
- auth_context_id=auth_context["auth_context_id"],
- auth_blob=sig)
- req = self.generate_request(call_id=call_id,
- context_id=ctx.context_id,
- pfc_flags=pfc_flags,
- object=object,
- opnum=io.opnum(),
- stub=stub_in,
- auth_info=auth_info)
- self.send_pdu(req, ndr_print=ndr_print, hexdump=hexdump)
- if recv_rep:
- (rep, rep_blob) = self.recv_pdu_raw(timeout=timeout,
- ndr_print=ndr_print,
- hexdump=hexdump)
- if fault_status:
- self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
- pfc_flags=fault_pfc_flags, auth_length=0)
- self.assertNotEquals(rep.u.alloc_hint, 0)
- self.assertEquals(rep.u.context_id, fault_context_id)
- self.assertEquals(rep.u.cancel_count, 0)
- self.assertEquals(rep.u.flags, 0)
- self.assertEquals(rep.u.status, fault_status)
- self.assertEquals(rep.u.reserved, 0)
- self.assertEquals(len(rep.u.error_and_verifier), 0)
- return
-
- self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
- auth_length=sig_size)
- self.assertNotEquals(rep.u.alloc_hint, 0)
- self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
- self.assertEquals(rep.u.cancel_count, 0)
- self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
- if sig_size != 0:
-
- ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH
- ofs_sig = rep.frag_length - rep.auth_length
- ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
- rep_data = rep_blob[ofs_stub:ofs_trailer]
- rep_whole = rep_blob[0:ofs_sig]
- rep_sig = rep_blob[ofs_sig:]
- rep_auth_info_blob = rep_blob[ofs_trailer:]
-
- rep_auth_info = self.parse_auth(rep_auth_info_blob)
- self.assertEquals(rep_auth_info.auth_type, auth_context["auth_type"])
- self.assertEquals(rep_auth_info.auth_level, auth_context["auth_level"])
- self.assertLessEqual(rep_auth_info.auth_pad_length, len(rep_data))
- self.assertEquals(rep_auth_info.auth_reserved, 0)
- self.assertEquals(rep_auth_info.auth_context_id, auth_context["auth_context_id"])
- self.assertEquals(rep_auth_info.credentials, rep_sig)
-
- if auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
- auth_context["gensec"].check_packet(rep_data, rep_whole, rep_sig)
-
- stub_out = rep_data[0:-rep_auth_info.auth_pad_length]
- else:
- stub_out = rep.u.stub_and_verifier
-
- if hexdump:
- sys.stderr.write("stub_out: %d\n%s" % (len(stub_out), self.hexdump(stub_out)))
- samba.ndr.ndr_unpack_out(io, stub_out, bigendian=bigendian, ndr64=ndr64,
- allow_remaining=allow_remaining)
- if ndr_print:
- sys.stderr.write("out: %s" % samba.ndr.ndr_print_out(io))
-
- def epmap_reconnect(self, abstract, transfer=None, object=None):
- ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
-
- if transfer is None:
- transfer = ndr32
-
- if object is None:
- object = samba.dcerpc.misc.GUID()
-
- ctx = self.prepare_presentation(samba.dcerpc.epmapper.abstract_syntax(),
- transfer, context_id=0)
-
- data1 = samba.ndr.ndr_pack(abstract)
- lhs1 = samba.dcerpc.epmapper.epm_lhs()
- lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
- lhs1.lhs_data = data1[:18]
- rhs1 = samba.dcerpc.epmapper.epm_rhs_uuid()
- rhs1.unknown = data1[18:]
- floor1 = samba.dcerpc.epmapper.epm_floor()
- floor1.lhs = lhs1
- floor1.rhs = rhs1
- data2 = samba.ndr.ndr_pack(transfer)
- lhs2 = samba.dcerpc.epmapper.epm_lhs()
- lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
- lhs2.lhs_data = data2[:18]
- rhs2 = samba.dcerpc.epmapper.epm_rhs_uuid()
- rhs2.unknown = data1[18:]
- floor2 = samba.dcerpc.epmapper.epm_floor()
- floor2.lhs = lhs2
- floor2.rhs = rhs2
- lhs3 = samba.dcerpc.epmapper.epm_lhs()
- lhs3.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_NCACN
- lhs3.lhs_data = ""
- floor3 = samba.dcerpc.epmapper.epm_floor()
- floor3.lhs = lhs3
- floor3.rhs.minor_version = 0
- lhs4 = samba.dcerpc.epmapper.epm_lhs()
- lhs4.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_TCP
- lhs4.lhs_data = ""
- floor4 = samba.dcerpc.epmapper.epm_floor()
- floor4.lhs = lhs4
- floor4.rhs.port = self.tcp_port
- lhs5 = samba.dcerpc.epmapper.epm_lhs()
- lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
- lhs5.lhs_data = ""
- floor5 = samba.dcerpc.epmapper.epm_floor()
- floor5.lhs = lhs5
- floor5.rhs.ipaddr = "0.0.0.0"
-
- floors = [floor1,floor2,floor3,floor4,floor5]
- req_tower = samba.dcerpc.epmapper.epm_tower()
- req_tower.num_floors = len(floors)
- req_tower.floors = floors
- req_twr = samba.dcerpc.epmapper.epm_twr_t()
- req_twr.tower = req_tower
-
- epm_map = samba.dcerpc.epmapper.epm_Map()
- epm_map.in_object = object
- epm_map.in_map_tower = req_twr
- epm_map.in_entry_handle = samba.dcerpc.misc.policy_handle()
- epm_map.in_max_towers = 4
-
- self.do_single_request(call_id=2, ctx=ctx, io=epm_map)
-
- self.assertGreaterEqual(epm_map.out_num_towers, 1)
- rep_twr = epm_map.out_towers[0].twr
- self.assertIsNotNone(rep_twr)
- self.assertEqual(rep_twr.tower_length, 75)
- self.assertEqual(rep_twr.tower.num_floors, 5)
- self.assertEqual(len(rep_twr.tower.floors), 5)
- self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
- samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
- self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
- samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
-
- # reconnect to the given port
- self._disconnect("epmap_reconnect")
- self.tcp_port = rep_twr.tower.floors[3].rhs.port
- self.connect()
-
- def send_pdu(self, req, ndr_print=None, hexdump=None):
- if ndr_print is None:
- ndr_print = self.do_ndr_print
- if hexdump is None:
- hexdump = self.do_hexdump
- try:
- req_pdu = samba.ndr.ndr_pack(req)
- if ndr_print:
- sys.stderr.write("send_pdu: %s" % samba.ndr.ndr_print(req))
- if hexdump:
- sys.stderr.write("send_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
- while True:
- sent = self.s.send(req_pdu, 0)
- if sent == len(req_pdu):
- break
- req_pdu = req_pdu[sent:]
- except socket.error as e:
- self._disconnect("send_pdu: %s" % e)
- raise
- except IOError as e:
- self._disconnect("send_pdu: %s" % e)
- raise
- finally:
- pass
-
- def recv_raw(self, hexdump=None, timeout=None):
- rep_pdu = None
- if hexdump is None:
- hexdump = self.do_hexdump
- try:
- if timeout is not None:
- self.s.settimeout(timeout)
- rep_pdu = self.s.recv(0xffff, 0)
- self.s.settimeout(10)
- if len(rep_pdu) == 0:
- self._disconnect("recv_raw: EOF")
- return None
- if hexdump:
- sys.stderr.write("recv_raw: %d\n%s" % (len(rep_pdu), self.hexdump(rep_pdu)))
- except socket.timeout as e:
- self.s.settimeout(10)
- sys.stderr.write("recv_raw: TIMEOUT\n")
- pass
- except socket.error as e:
- self._disconnect("recv_raw: %s" % e)
- raise
- except IOError as e:
- self._disconnect("recv_raw: %s" % e)
- raise
- finally:
- pass
- return rep_pdu
-
- def recv_pdu_raw(self, ndr_print=None, hexdump=None, timeout=None):
- rep_pdu = None
- rep = None
- if ndr_print is None:
- ndr_print = self.do_ndr_print
- if hexdump is None:
- hexdump = self.do_hexdump
- try:
- rep_pdu = self.recv_raw(hexdump=hexdump, timeout=timeout)
- if rep_pdu is None:
- return (None,None)
- rep = samba.ndr.ndr_unpack(samba.dcerpc.dcerpc.ncacn_packet, rep_pdu, allow_remaining=True)
- if ndr_print:
- sys.stderr.write("recv_pdu: %s" % samba.ndr.ndr_print(rep))
- self.assertEqual(rep.frag_length, len(rep_pdu))
- finally:
- pass
- return (rep, rep_pdu)
-
- def recv_pdu(self, ndr_print=None, hexdump=None, timeout=None):
- (rep, rep_pdu) = self.recv_pdu_raw(ndr_print=ndr_print,
- hexdump=hexdump,
- timeout=timeout)
- return rep
-
- def generate_auth(self,
- auth_type=None,
- auth_level=None,
- auth_pad_length=0,
- auth_context_id=None,
- auth_blob=None,
- ndr_print=None, hexdump=None):
- if ndr_print is None:
- ndr_print = self.do_ndr_print
- if hexdump is None:
- hexdump = self.do_hexdump
-
- if auth_type is not None:
- a = samba.dcerpc.dcerpc.auth()
- a.auth_type = auth_type
- a.auth_level = auth_level
- a.auth_pad_length = auth_pad_length
- a.auth_context_id= auth_context_id
- a.credentials = auth_blob
-
- ai = samba.ndr.ndr_pack(a)
- if ndr_print:
- sys.stderr.write("generate_auth: %s" % samba.ndr.ndr_print(a))
- if hexdump:
- sys.stderr.write("generate_auth: %d\n%s" % (len(ai), self.hexdump(ai)))
- else:
- ai = ""
-
- return ai
-
- def parse_auth(self, auth_info, ndr_print=None, hexdump=None):
- if ndr_print is None:
- ndr_print = self.do_ndr_print
- if hexdump is None:
- hexdump = self.do_hexdump
-
- if (len(auth_info) <= samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH):
- return None
-
- if hexdump:
- sys.stderr.write("parse_auth: %d\n%s" % (len(auth_info), self.hexdump(auth_info)))
- a = samba.ndr.ndr_unpack(samba.dcerpc.dcerpc.auth, auth_info, allow_remaining=True)
- if ndr_print:
- sys.stderr.write("parse_auth: %s" % samba.ndr.ndr_print(a))
-
- return a
-
- def generate_pdu(self, ptype, call_id, payload,
- rpc_vers=5,
- rpc_vers_minor=0,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
- ndr_print=None, hexdump=None):
-
- if getattr(payload, 'auth_info', None):
- ai = payload.auth_info
- else:
- ai = ""
-
- p = samba.dcerpc.dcerpc.ncacn_packet()
- p.rpc_vers = rpc_vers
- p.rpc_vers_minor = rpc_vers_minor
- p.ptype = ptype
- p.pfc_flags = pfc_flags
- p.drep = drep
- p.frag_length = 0
- if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
- p.auth_length = len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
- else:
- p.auth_length = 0
- p.call_id = call_id
- p.u = payload
-
- pdu = samba.ndr.ndr_pack(p)
- p.frag_length = len(pdu)
-
- return p
-
- def verify_pdu(self, p, ptype, call_id,
- rpc_vers=5,
- rpc_vers_minor=0,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
- auth_length=None):
-
- self.assertIsNotNone(p, "No valid pdu")
-
- if getattr(p.u, 'auth_info', None):
- ai = p.u.auth_info
- else:
- ai = ""
-
- self.assertEqual(p.rpc_vers, rpc_vers)
- self.assertEqual(p.rpc_vers_minor, rpc_vers_minor)
- self.assertEqual(p.ptype, ptype)
- self.assertEqual(p.pfc_flags, pfc_flags)
- self.assertEqual(p.drep, drep)
- self.assertGreaterEqual(p.frag_length,
- samba.dcerpc.dcerpc.DCERPC_NCACN_PAYLOAD_OFFSET)
- if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
- self.assertEqual(p.auth_length,
- len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
- elif auth_length is not None:
- self.assertEqual(p.auth_length, auth_length)
- else:
- self.assertEqual(p.auth_length, 0)
- self.assertEqual(p.call_id, call_id)
-
- return
-
- def generate_bind(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- max_xmit_frag=5840,
- max_recv_frag=5840,
- assoc_group_id=0,
- ctx_list=[],
- auth_info="",
- ndr_print=None, hexdump=None):
-
- b = samba.dcerpc.dcerpc.bind()
- b.max_xmit_frag = max_xmit_frag
- b.max_recv_frag = max_recv_frag
- b.assoc_group_id = assoc_group_id
- b.num_contexts = len(ctx_list)
- b.ctx_list = ctx_list
- b.auth_info = auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_BIND,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=b,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def generate_alter(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- max_xmit_frag=5840,
- max_recv_frag=5840,
- assoc_group_id=0,
- ctx_list=[],
- auth_info="",
- ndr_print=None, hexdump=None):
-
- a = samba.dcerpc.dcerpc.bind()
- a.max_xmit_frag = max_xmit_frag
- a.max_recv_frag = max_recv_frag
- a.assoc_group_id = assoc_group_id
- a.num_contexts = len(ctx_list)
- a.ctx_list = ctx_list
- a.auth_info = auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ALTER,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=a,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def generate_auth3(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- auth_info="",
- ndr_print=None, hexdump=None):
-
- a = samba.dcerpc.dcerpc.auth3()
- a.auth_info = auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_AUTH3,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=a,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def generate_request(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- alloc_hint=None,
- context_id=None,
- opnum=None,
- object=None,
- stub=None,
- auth_info="",
- ndr_print=None, hexdump=None):
-
- if alloc_hint is None:
- alloc_hint = len(stub)
-
- r = samba.dcerpc.dcerpc.request()
- r.alloc_hint = alloc_hint
- r.context_id = context_id
- r.opnum = opnum
- if object is not None:
- r.object = object
- r.stub_and_verifier = stub + auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_REQUEST,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=r,
- ndr_print=ndr_print, hexdump=hexdump)
-
- if len(auth_info) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
- p.auth_length = len(auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
-
- return p
-
- def generate_co_cancel(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- auth_info="",
- ndr_print=None, hexdump=None):
-
- c = samba.dcerpc.dcerpc.co_cancel()
- c.auth_info = auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_CO_CANCEL,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=c,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def generate_orphaned(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- auth_info="",
- ndr_print=None, hexdump=None):
-
- o = samba.dcerpc.dcerpc.orphaned()
- o.auth_info = auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ORPHANED,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=o,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def generate_shutdown(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- ndr_print=None, hexdump=None):
-
- s = samba.dcerpc.dcerpc.shutdown()
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_SHUTDOWN,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=s,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def assertIsConnected(self):
- self.assertIsNotNone(self.s, msg="Not connected")
- return
-
- def assertNotConnected(self):
- self.assertIsNone(self.s, msg="Is connected")
- return
-
- def assertNDRSyntaxEquals(self, s1, s2):
- self.assertEqual(s1.uuid, s2.uuid)
- self.assertEqual(s1.if_version, s2.if_version)
- return
class ValidNetbiosNameTests(TestCase):
(S.stderr)
"""
- def __init__(self, returncode, cmd, stdout, stderr):
+ def __init__(self, returncode, cmd, stdout, stderr, msg=None):
self.returncode = returncode
self.cmd = cmd
self.stdout = stdout
self.stderr = stderr
+ self.msg = msg
def __str__(self):
- return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self.cmd, self.returncode,
- self.stdout, self.stderr)
+ s = ("Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" %
+ (self.cmd, self.returncode, self.stdout, self.stderr))
+ if self.msg is not None:
+ s = "%s; message: %s" % (s, self.msg)
+
+ return s
class BlackboxTestCase(TestCaseInTempDir):
"""Base test case for blackbox tests."""
line = " ".join(parts)
return line
- def check_run(self, line):
+ def check_run(self, line, msg=None):
+ self.check_exit_code(line, 0, msg=msg)
+
+ def check_exit_code(self, line, expected, msg=None):
line = self._make_cmdline(line)
- p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- retcode = p.wait()
- if retcode:
- raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
+ p = subprocess.Popen(line,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=True)
+ stdoutdata, stderrdata = p.communicate()
+ retcode = p.returncode
+ if retcode != expected:
+ raise BlackboxProcessError(retcode,
+ line,
+ stdoutdata,
+ stderrdata,
+ msg)
def check_output(self, line):
line = self._make_cmdline(line)
p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
- retcode = p.wait()
+ stdoutdata, stderrdata = p.communicate()
+ retcode = p.returncode
if retcode:
- raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
- return p.stdout.read()
+ raise BlackboxProcessError(retcode, line, stdoutdata, stderrdata)
+ return stdoutdata
def connect_samdb(samdb_url, lp=None, session_info=None, credentials=None,
return connect_samdb(samdb_url, credentials=creds, lp=lp)
-def delete_force(samdb, dn):
+def delete_force(samdb, dn, **kwargs):
try:
- samdb.delete(dn)
- except ldb.LdbError, (num, errstr):
+ samdb.delete(dn, **kwargs)
+ except ldb.LdbError as error:
+ (num, errstr) = error.args
assert num == ldb.ERR_NO_SUCH_OBJECT, "ldb.delete() failed: %s" % errstr
+
+def create_test_ou(samdb, name):
+ """Creates a unique OU for the test"""
+
+ # Add some randomness to the test OU. Replication between the testenvs is
+ # constantly happening in the background. Deletion of the last test's
+ # objects can be slow to replicate out. So the OU created by a previous
+ # testenv may still exist at the point that tests start on another testenv.
+ rand = randint(1, 10000000)
+ dn = ldb.Dn(samdb, "OU=%s%d,%s" % (name, rand, samdb.get_default_basedn()))
+ samdb.add({"dn": dn, "objectclass": "organizationalUnit"})
+ return dn