"""Samba Python tests."""
import os
+import tempfile
import ldb
import samba
-import samba.auth
from samba import param
-from samba.samdb import SamDB
from samba import credentials
-import samba.ndr
-import samba.dcerpc.dcerpc
-import samba.dcerpc.base
-import samba.dcerpc.epmapper
+from samba.credentials import Credentials
+from samba import gensec
import socket
import struct
import subprocess
import sys
import tempfile
import unittest
+import re
+import samba.auth
+import samba.dcerpc.base
+from samba.compat import PY3, text_type
+from samba.compat import string_types
+from random import randint
+try:
+ from samba.samdb import SamDB
+except ImportError:
+ # We are built without samdb support,
+ # imitate it so that connect_samdb() can recover
+ def SamDB(*args, **kwargs):
+ return None
+
+import samba.ndr
+import samba.dcerpc.dcerpc
+import samba.dcerpc.epmapper
try:
from unittest import SkipTest
class SkipTest(Exception):
"""Test skipped."""
-HEXDUMP_FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
+HEXDUMP_FILTER = bytearray([x if ((len(repr(chr(x))) == 3) and (x < 127)) else ord('.') for x in range(256)])
class TestCase(unittest.TestCase):
"""A Samba test case."""
def get_credentials(self):
return cmdline_credentials
+ def get_creds_ccache_name(self):
+ creds = self.get_credentials()
+ ccache = creds.get_named_ccache(self.get_loadparm())
+ ccache_name = ccache.get_name()
+
+ return ccache_name
+
def hexdump(self, src):
N = 0
result = ''
+ is_string = isinstance(src, string_types)
while src:
ll = src[:8]
lr = src[8:16]
src = src[16:]
- hl = ' '.join(["%02X" % ord(x) for x in ll])
- hr = ' '.join(["%02X" % ord(x) for x in lr])
- ll = ll.translate(HEXDUMP_FILTER)
- lr = lr.translate(HEXDUMP_FILTER)
+ if is_string:
+ hl = ' '.join(["%02X" % ord(x) for x in ll])
+ hr = ' '.join(["%02X" % ord(x) for x in lr])
+ ll = ll.translate(HEXDUMP_FILTER)
+ lr = lr.translate(HEXDUMP_FILTER)
+ else:
+ hl = ' '.join(["%02X" % x for x in ll])
+ hr = ' '.join(["%02X" % x for x in lr])
+ ll = ll.translate(HEXDUMP_FILTER).decode('utf8')
+ lr = lr.translate(HEXDUMP_FILTER).decode('utf8')
result += "[%04X] %-*s %-*s %s %s\n" % (N, 8*3, hl, 8*3, hr, ll, lr)
N += 16
return result
+ def insta_creds(self, template=None, username=None, userpass=None, kerberos_state=None):
+
+ if template is None:
+ assert template is not None
+
+ if username is not None:
+ assert userpass is not None
+
+ if username is None:
+ assert userpass is None
+
+ username = template.get_username()
+ userpass = template.get_password()
+
+ if kerberos_state is None:
+ kerberos_state = template.get_kerberos_state()
+
+ # get a copy of the global creds or a the passed in creds
+ c = Credentials()
+ c.set_username(username)
+ c.set_password(userpass)
+ c.set_domain(template.get_domain())
+ c.set_realm(template.get_realm())
+ c.set_workstation(template.get_workstation())
+ c.set_gensec_features(c.get_gensec_features()
+ | gensec.FEATURE_SEAL)
+ c.set_kerberos_state(kerberos_state)
+ return c
+
+
+
# These functions didn't exist before Python2.7:
if sys.version_info < (2, 7):
import warnings
self._cleanups = getattr(self, "_cleanups", []) + [
(fn, args, kwargs)]
+ def assertRegexpMatches(self, text, regex, msg=None):
+ # PY3 note: Python 3 will never see this, but we use
+ # text_type for the benefit of linters.
+ if isinstance(regex, (str, text_type)):
+ regex = re.compile(regex)
+ if not regex.search(text):
+ self.fail(msg)
+
def _addSkip(self, result, reason):
addSkip = getattr(result, 'addSkip', None)
if addSkip is not None:
try:
try:
self.setUp()
- except SkipTest, e:
+ except SkipTest as e:
self._addSkip(result, str(e))
return
except KeyboardInterrupt:
try:
testMethod()
ok = True
- except SkipTest, e:
+ except SkipTest as e:
self._addSkip(result, str(e))
return
except self.failureException:
try:
self.tearDown()
- except SkipTest, e:
+ except SkipTest as e:
self._addSkip(result, str(e))
except KeyboardInterrupt:
raise
finally:
result.stopTest(self)
+ def assertStringsEqual(self, a, b, msg=None, strip=False):
+ """Assert equality between two strings and highlight any differences.
+ If strip is true, leading and trailing whitespace is ignored."""
+ if strip:
+ a = a.strip()
+ b = b.strip()
+
+ if a != b:
+ sys.stderr.write("The strings differ %s(lengths %d vs %d); "
+ "a diff follows\n"
+ % ('when stripped ' if strip else '',
+ len(a), len(b),
+ ))
+
+ from difflib import unified_diff
+ diff = unified_diff(a.splitlines(True),
+ b.splitlines(True),
+ 'a', 'b')
+ for line in diff:
+ sys.stderr.write(line)
+
+ self.fail(msg)
+
class LdbTestCase(TestCase):
"""Trivial test case for running tests against a LDB."""
def setUp(self):
super(LdbTestCase, self).setUp()
- self.filename = os.tempnam()
+ self.tempfile = tempfile.NamedTemporaryFile(delete=False)
+ self.filename = self.tempfile.name
self.ldb = samba.Ldb(self.filename)
def set_modules(self, modules=[]):
return lp
-def env_get_var_value(var_name):
+def env_get_var_value(var_name, allow_missing=False):
"""Returns value for variable in os.environ
Function throws AssertionError if variable is defined.
Unit-test based python tests require certain input params
to be set in environment, otherwise they can't be run
"""
+ if allow_missing:
+ if var_name not in os.environ.keys():
+ return None
assert var_name in os.environ.keys(), "Please supply %s in environment" % var_name
return os.environ[var_name]
class RpcInterfaceTestCase(TestCase):
"""DCE/RPC Test case."""
-class RawDCERPCTest(TestCase):
- """A raw DCE/RPC Test case."""
-
- def _disconnect(self, reason):
- if self.s is None:
- return
- self.s.close()
- self.s = None
- if self.do_hexdump:
- sys.stderr.write("disconnect[%s]\n" % reason)
-
- def connect(self):
- try:
- self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
- socket.SOCK_STREAM, socket.SOL_TCP,
- 0)
- self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
- self.s.settimeout(10)
- self.s.connect(self.a[0][4])
- except socket.error as e:
- self.s.close()
- raise
- except IOError as e:
- self.s.close()
- raise
- except Exception as e:
- raise
- finally:
- pass
-
- def setUp(self):
- super(RawDCERPCTest, self).setUp()
- self.do_ndr_print = False
- self.do_hexdump = False
-
- self.host = samba.tests.env_get_var_value('SERVER')
- self.tcp_port = 135
-
- self.settings = {}
- self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
- self.settings["target_hostname"] = self.host
-
- self.connect()
-
- def epmap_reconnect(self, abstract):
- ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
-
- tsf0_list = [ndr32]
- ctx0 = samba.dcerpc.dcerpc.ctx_list()
- ctx0.context_id = 1
- ctx0.num_transfer_syntaxes = len(tsf0_list)
- ctx0.abstract_syntax = samba.dcerpc.epmapper.abstract_syntax()
- ctx0.transfer_syntaxes = tsf0_list
-
- req = self.generate_bind(call_id=0, ctx_list=[ctx0])
- self.send_pdu(req)
- rep = self.recv_pdu()
- self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK,
- req.call_id, auth_length=0)
- self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
- self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
- self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
- self.assertEqual(rep.u.secondary_address_size, 4)
- self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port)
- self.assertEqual(len(rep.u._pad1), 2)
- self.assertEqual(rep.u._pad1, '\0' * 2)
- self.assertEqual(rep.u.num_results, 1)
- self.assertEqual(rep.u.ctx_list[0].result,
- samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
- self.assertEqual(rep.u.ctx_list[0].reason,
- samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
- self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
- self.assertEqual(rep.u.auth_info, '\0' * 0)
-
- # And now try a request
- data1 = samba.ndr.ndr_pack(abstract)
- lhs1 = samba.dcerpc.epmapper.epm_lhs()
- lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
- lhs1.lhs_data = data1[:18]
- rhs1 = samba.dcerpc.epmapper.epm_rhs_uuid()
- rhs1.unknown = data1[18:]
- floor1 = samba.dcerpc.epmapper.epm_floor()
- floor1.lhs = lhs1
- floor1.rhs = rhs1
- data2 = samba.ndr.ndr_pack(ndr32)
- lhs2 = samba.dcerpc.epmapper.epm_lhs()
- lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
- lhs2.lhs_data = data2[:18]
- rhs2 = samba.dcerpc.epmapper.epm_rhs_uuid()
- rhs2.unknown = data1[18:]
- floor2 = samba.dcerpc.epmapper.epm_floor()
- floor2.lhs = lhs2
- floor2.rhs = rhs2
- lhs3 = samba.dcerpc.epmapper.epm_lhs()
- lhs3.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_NCACN
- lhs3.lhs_data = ""
- floor3 = samba.dcerpc.epmapper.epm_floor()
- floor3.lhs = lhs3
- floor3.rhs.minor_version = 0
- lhs4 = samba.dcerpc.epmapper.epm_lhs()
- lhs4.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_TCP
- lhs4.lhs_data = ""
- floor4 = samba.dcerpc.epmapper.epm_floor()
- floor4.lhs = lhs4
- floor4.rhs.port = self.tcp_port
- lhs5 = samba.dcerpc.epmapper.epm_lhs()
- lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
- lhs5.lhs_data = ""
- floor5 = samba.dcerpc.epmapper.epm_floor()
- floor5.lhs = lhs5
- floor5.rhs.ipaddr = "0.0.0.0"
-
- floors = [floor1,floor2,floor3,floor4,floor5]
- req_tower = samba.dcerpc.epmapper.epm_tower()
- req_tower.num_floors = len(floors)
- req_tower.floors = floors
- req_twr = samba.dcerpc.epmapper.epm_twr_t()
- req_twr.tower = req_tower
-
- pack_twr = samba.ndr.ndr_pack(req_twr)
-
- # object
- stub = "\x01\x00\x00\x00"
- stub += "\x00" * 16
- # tower
- stub += "\x02\x00\x00\x00"
- stub += pack_twr
- # padding?
- stub += "\x00" * 1
- # handle
- stub += "\x00" * 20
- # max_towers
- stub += "\x04\x00\x00\x00"
-
- # we do an epm_Map() request
- req = self.generate_request(call_id = 1,
- context_id=ctx0.context_id,
- opnum=3,
- stub=stub)
- self.send_pdu(req)
- rep = self.recv_pdu()
- self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE,
- req.call_id, auth_length=0)
- self.assertNotEqual(rep.u.alloc_hint, 0)
- self.assertEqual(rep.u.context_id, req.u.context_id)
- self.assertEqual(rep.u.cancel_count, 0)
- self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
-
- num_towers = struct.unpack_from("<I", rep.u.stub_and_verifier, 20)
- (array_max, array_ofs, array_cnt) = struct.unpack_from("<III", rep.u.stub_and_verifier, 24)
- status = struct.unpack_from("<I", rep.u.stub_and_verifier, len(rep.u.stub_and_verifier) - 4)
- self.assertEqual(status[0], 0)
- self.assertGreaterEqual(num_towers[0], 1)
- self.assertEqual(array_max, 4)
- self.assertEqual(array_ofs, 0)
- self.assertGreaterEqual(array_cnt, 1)
-
- unpack_twr = rep.u.stub_and_verifier[(36 + 4 * array_cnt):-4]
- rep_twr = samba.ndr.ndr_unpack(samba.dcerpc.epmapper.epm_twr_t, unpack_twr, allow_remaining=True)
- self.assertEqual(rep_twr.tower_length, 75)
- self.assertEqual(rep_twr.tower.num_floors, 5)
- self.assertEqual(len(rep_twr.tower.floors), 5)
- self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
- samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
- self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
- samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
-
- # reconnect to the given port
- self._disconnect("epmap_reconnect")
- self.tcp_port = rep_twr.tower.floors[3].rhs.port
- self.connect()
-
- def send_pdu(self, req, ndr_print=None, hexdump=None):
- if ndr_print is None:
- ndr_print = self.do_ndr_print
- if hexdump is None:
- hexdump = self.do_hexdump
- try:
- req_pdu = samba.ndr.ndr_pack(req)
- if ndr_print:
- sys.stderr.write("send_pdu: %s" % samba.ndr.ndr_print(req))
- if hexdump:
- sys.stderr.write("send_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
- while True:
- sent = self.s.send(req_pdu, 0)
- if sent == len(req_pdu):
- break
- req_pdu = req_pdu[sent:]
- except socket.error as e:
- self._disconnect("send_pdu: %s" % e)
- raise
- except IOError as e:
- self._disconnect("send_pdu: %s" % e)
- raise
- finally:
- pass
-
- def recv_raw(self, hexdump=None, timeout=None):
- rep_pdu = None
- if hexdump is None:
- hexdump = self.do_hexdump
- try:
- if timeout is not None:
- self.s.settimeout(timeout)
- rep_pdu = self.s.recv(0xffff, 0)
- self.s.settimeout(10)
- if len(rep_pdu) == 0:
- self._disconnect("recv_raw: EOF")
- return None
- if hexdump:
- sys.stderr.write("recv_raw: %d\n%s" % (len(rep_pdu), self.hexdump(rep_pdu)))
- except socket.timeout as e:
- self.s.settimeout(10)
- sys.stderr.write("recv_raw: TIMEOUT\n")
- pass
- except socket.error as e:
- self._disconnect("recv_raw: %s" % e)
- raise
- except IOError as e:
- self._disconnect("recv_raw: %s" % e)
- raise
- finally:
- pass
- return rep_pdu
-
- def recv_pdu(self, ndr_print=None, hexdump=None, timeout=None):
- rep = None
- if ndr_print is None:
- ndr_print = self.do_ndr_print
- if hexdump is None:
- hexdump = self.do_hexdump
- try:
- rep_pdu = self.recv_raw(hexdump=hexdump, timeout=timeout)
- if rep_pdu is None:
- return None
- rep = samba.ndr.ndr_unpack(samba.dcerpc.dcerpc.ncacn_packet, rep_pdu, allow_remaining=True)
- if ndr_print:
- sys.stderr.write("recv_pdu: %s" % samba.ndr.ndr_print(rep))
- self.assertEqual(rep.frag_length, len(rep_pdu))
- finally:
- pass
- return rep
-
- def generate_auth(self,
- auth_type=None,
- auth_level=None,
- auth_pad_length=0,
- auth_context_id=None,
- auth_blob=None,
- ndr_print=None, hexdump=None):
- if ndr_print is None:
- ndr_print = self.do_ndr_print
- if hexdump is None:
- hexdump = self.do_hexdump
-
- if auth_type is not None:
- a = samba.dcerpc.dcerpc.auth()
- a.auth_type = auth_type
- a.auth_level = auth_level
- a.auth_pad_length = auth_pad_length
- a.auth_context_id= auth_context_id
- a.credentials = auth_blob
-
- ai = samba.ndr.ndr_pack(a)
- if ndr_print:
- sys.stderr.write("generate_auth: %s" % samba.ndr.ndr_print(a))
- if hexdump:
- sys.stderr.write("generate_auth: %d\n%s" % (len(ai), self.hexdump(ai)))
- else:
- ai = ""
-
- return ai
-
- def parse_auth(self, auth_info, ndr_print=None, hexdump=None):
- if ndr_print is None:
- ndr_print = self.do_ndr_print
- if hexdump is None:
- hexdump = self.do_hexdump
-
- if (len(auth_info) <= samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH):
- return None
-
- if hexdump:
- sys.stderr.write("parse_auth: %d\n%s" % (len(auth_info), self.hexdump(auth_info)))
- a = samba.ndr.ndr_unpack(samba.dcerpc.dcerpc.auth, auth_info, allow_remaining=True)
- if ndr_print:
- sys.stderr.write("parse_auth: %s" % samba.ndr.ndr_print(a))
-
- return a
-
- def generate_pdu(self, ptype, call_id, payload,
- rpc_vers=5,
- rpc_vers_minor=0,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
- ndr_print=None, hexdump=None):
-
- if getattr(payload, 'auth_info', None):
- ai = payload.auth_info
- else:
- ai = ""
-
- p = samba.dcerpc.dcerpc.ncacn_packet()
- p.rpc_vers = rpc_vers
- p.rpc_vers_minor = rpc_vers_minor
- p.ptype = ptype
- p.pfc_flags = pfc_flags
- p.drep = drep
- p.frag_length = 0
- if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
- p.auth_length = len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
- else:
- p.auth_length = 0
- p.call_id = call_id
- p.u = payload
-
- pdu = samba.ndr.ndr_pack(p)
- p.frag_length = len(pdu)
-
- return p
-
- def verify_pdu(self, p, ptype, call_id,
- rpc_vers=5,
- rpc_vers_minor=0,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
- auth_length=None):
-
- self.assertIsNotNone(p, "No valid pdu")
-
- if getattr(p.u, 'auth_info', None):
- ai = p.u.auth_info
- else:
- ai = ""
-
- self.assertEqual(p.rpc_vers, rpc_vers)
- self.assertEqual(p.rpc_vers_minor, rpc_vers_minor)
- self.assertEqual(p.ptype, ptype)
- self.assertEqual(p.pfc_flags, pfc_flags)
- self.assertEqual(p.drep, drep)
- self.assertGreaterEqual(p.frag_length,
- samba.dcerpc.dcerpc.DCERPC_NCACN_PAYLOAD_OFFSET)
- if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
- self.assertEqual(p.auth_length,
- len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
- elif auth_length is not None:
- self.assertEqual(p.auth_length, auth_length)
- else:
- self.assertEqual(p.auth_length, 0)
- self.assertEqual(p.call_id, call_id)
-
- return
-
- def generate_bind(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- max_xmit_frag=5840,
- max_recv_frag=5840,
- assoc_group_id=0,
- ctx_list=[],
- auth_info="",
- ndr_print=None, hexdump=None):
-
- b = samba.dcerpc.dcerpc.bind()
- b.max_xmit_frag = max_xmit_frag
- b.max_recv_frag = max_recv_frag
- b.assoc_group_id = assoc_group_id
- b.num_contexts = len(ctx_list)
- b.ctx_list = ctx_list
- b.auth_info = auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_BIND,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=b,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def generate_alter(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- max_xmit_frag=5840,
- max_recv_frag=5840,
- assoc_group_id=0,
- ctx_list=[],
- auth_info="",
- ndr_print=None, hexdump=None):
-
- a = samba.dcerpc.dcerpc.bind()
- a.max_xmit_frag = max_xmit_frag
- a.max_recv_frag = max_recv_frag
- a.assoc_group_id = assoc_group_id
- a.num_contexts = len(ctx_list)
- a.ctx_list = ctx_list
- a.auth_info = auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ALTER,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=a,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def generate_auth3(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- auth_info="",
- ndr_print=None, hexdump=None):
-
- a = samba.dcerpc.dcerpc.auth3()
- a.auth_info = auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_AUTH3,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=a,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def generate_request(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- alloc_hint=None,
- context_id=None,
- opnum=None,
- object=None,
- stub=None,
- auth_info="",
- ndr_print=None, hexdump=None):
-
- if alloc_hint is None:
- alloc_hint = len(stub)
-
- r = samba.dcerpc.dcerpc.request()
- r.alloc_hint = alloc_hint
- r.context_id = context_id
- r.opnum = opnum
- if object is not None:
- r.object = object
- r.stub_and_verifier = stub + auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_REQUEST,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=r,
- ndr_print=ndr_print, hexdump=hexdump)
-
- if len(auth_info) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
- p.auth_length = len(auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
-
- return p
-
- def generate_co_cancel(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- auth_info="",
- ndr_print=None, hexdump=None):
-
- c = samba.dcerpc.dcerpc.co_cancel()
- c.auth_info = auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_CO_CANCEL,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=c,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def generate_orphaned(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- auth_info="",
- ndr_print=None, hexdump=None):
-
- o = samba.dcerpc.dcerpc.orphaned()
- o.auth_info = auth_info
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ORPHANED,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=o,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def generate_shutdown(self, call_id,
- pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
- samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
- ndr_print=None, hexdump=None):
-
- s = samba.dcerpc.dcerpc.shutdown()
-
- p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_SHUTDOWN,
- pfc_flags=pfc_flags,
- call_id=call_id,
- payload=s,
- ndr_print=ndr_print, hexdump=hexdump)
-
- return p
-
- def assertIsConnected(self):
- self.assertIsNotNone(self.s, msg="Not connected")
- return
-
- def assertNotConnected(self):
- self.assertIsNone(self.s, msg="Is connected")
- return
-
- def assertNDRSyntaxEquals(self, s1, s2):
- self.assertEqual(s1.uuid, s2.uuid)
- self.assertEqual(s1.if_version, s2.if_version)
- return
class ValidNetbiosNameTests(TestCase):
(S.stderr)
"""
- def __init__(self, returncode, cmd, stdout, stderr):
+ def __init__(self, returncode, cmd, stdout, stderr, msg=None):
self.returncode = returncode
self.cmd = cmd
self.stdout = stdout
self.stderr = stderr
+ self.msg = msg
def __str__(self):
- return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self.cmd, self.returncode,
- self.stdout, self.stderr)
+ s = ("Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" %
+ (self.cmd, self.returncode, self.stdout, self.stderr))
+ if self.msg is not None:
+ s = "%s; message: %s" % (s, self.msg)
+
+ return s
class BlackboxTestCase(TestCaseInTempDir):
"""Base test case for blackbox tests."""
line = " ".join(parts)
return line
- def check_run(self, line):
+ def check_run(self, line, msg=None):
+ self.check_exit_code(line, 0, msg=msg)
+
+ def check_exit_code(self, line, expected, msg=None):
line = self._make_cmdline(line)
- p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
- retcode = p.wait()
- if retcode:
- raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
+ p = subprocess.Popen(line,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ shell=True)
+ stdoutdata, stderrdata = p.communicate()
+ retcode = p.returncode
+ if retcode != expected:
+ raise BlackboxProcessError(retcode,
+ line,
+ stdoutdata,
+ stderrdata,
+ msg)
def check_output(self, line):
line = self._make_cmdline(line)
p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
- retcode = p.wait()
+ stdoutdata, stderrdata = p.communicate()
+ retcode = p.returncode
if retcode:
- raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
- return p.stdout.read()
+ raise BlackboxProcessError(retcode, line, stdoutdata, stderrdata)
+ return stdoutdata
def connect_samdb(samdb_url, lp=None, session_info=None, credentials=None,
return connect_samdb(samdb_url, credentials=creds, lp=lp)
-def delete_force(samdb, dn):
+def delete_force(samdb, dn, **kwargs):
try:
- samdb.delete(dn)
- except ldb.LdbError, (num, errstr):
+ samdb.delete(dn, **kwargs)
+ except ldb.LdbError as error:
+ (num, errstr) = error.args
assert num == ldb.ERR_NO_SUCH_OBJECT, "ldb.delete() failed: %s" % errstr
+
+def create_test_ou(samdb, name):
+ """Creates a unique OU for the test"""
+
+ # Add some randomness to the test OU. Replication between the testenvs is
+ # constantly happening in the background. Deletion of the last test's
+ # objects can be slow to replicate out. So the OU created by a previous
+ # testenv may still exist at the point that tests start on another testenv.
+ rand = randint(1, 10000000)
+ dn = ldb.Dn(samdb, "OU=%s%d,%s" % (name, rand, samdb.get_default_basedn()))
+ samdb.add({"dn": dn, "objectclass": "organizationalUnit"})
+ return dn