PEP8: fix E225: missing whitespace around operator
[nivanova/samba-autobuild/.git] / python / samba / tests / __init__.py
index 1c5f678038ee74d8e5a668de4d965334f1f4269b..60c18c7323465704fa8bd7b9349549d5014e9ebe 100644 (file)
 """Samba Python tests."""
 
 import os
+import tempfile
 import ldb
 import samba
-import samba.auth
 from samba import param
-from samba.samdb import SamDB
 from samba import credentials
-import samba.ndr
-import samba.dcerpc.dcerpc
-import samba.dcerpc.base
-import samba.dcerpc.epmapper
+from samba.credentials import Credentials
+from samba import gensec
 import socket
 import struct
 import subprocess
 import sys
 import tempfile
 import unittest
+import re
+import samba.auth
+import samba.dcerpc.base
+from samba.compat import PY3, text_type
+from samba.compat import string_types
+from random import randint
+try:
+    from samba.samdb import SamDB
+except ImportError:
+    # We are built without samdb support,
+    # imitate it so that connect_samdb() can recover
+    def SamDB(*args, **kwargs):
+        return None
+
+import samba.ndr
+import samba.dcerpc.dcerpc
+import samba.dcerpc.epmapper
 
 try:
     from unittest import SkipTest
@@ -42,7 +56,7 @@ except ImportError:
     class SkipTest(Exception):
         """Test skipped."""
 
-HEXDUMP_FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
+HEXDUMP_FILTER = bytearray([x if ((len(repr(chr(x))) == 3) and (x < 127)) else ord('.') for x in range(256)])
 
 class TestCase(unittest.TestCase):
     """A Samba test case."""
@@ -62,21 +76,66 @@ class TestCase(unittest.TestCase):
     def get_credentials(self):
         return cmdline_credentials
 
+    def get_creds_ccache_name(self):
+        creds = self.get_credentials()
+        ccache = creds.get_named_ccache(self.get_loadparm())
+        ccache_name = ccache.get_name()
+
+        return ccache_name
+
     def hexdump(self, src):
         N = 0
         result = ''
+        is_string = isinstance(src, string_types)
         while src:
             ll = src[:8]
             lr = src[8:16]
             src = src[16:]
-            hl = ' '.join(["%02X" % ord(x) for x in ll])
-            hr = ' '.join(["%02X" % ord(x) for x in lr])
-            ll = ll.translate(HEXDUMP_FILTER)
-            lr = lr.translate(HEXDUMP_FILTER)
+            if is_string:
+                hl = ' '.join(["%02X" % ord(x) for x in ll])
+                hr = ' '.join(["%02X" % ord(x) for x in lr])
+                ll = ll.translate(HEXDUMP_FILTER)
+                lr = lr.translate(HEXDUMP_FILTER)
+            else:
+                hl = ' '.join(["%02X" % x for x in ll])
+                hr = ' '.join(["%02X" % x for x in lr])
+                ll = ll.translate(HEXDUMP_FILTER).decode('utf8')
+                lr = lr.translate(HEXDUMP_FILTER).decode('utf8')
             result += "[%04X] %-*s  %-*s  %s %s\n" % (N, 8*3, hl, 8*3, hr, ll, lr)
             N += 16
         return result
 
+    def insta_creds(self, template=None, username=None, userpass=None, kerberos_state=None):
+
+        if template is None:
+            assert template is not None
+
+        if username is not None:
+            assert userpass is not None
+
+        if username is None:
+            assert userpass is None
+
+            username = template.get_username()
+            userpass = template.get_password()
+
+        if kerberos_state is None:
+            kerberos_state = template.get_kerberos_state()
+
+        # get a copy of the global creds or a the passed in creds
+        c = Credentials()
+        c.set_username(username)
+        c.set_password(userpass)
+        c.set_domain(template.get_domain())
+        c.set_realm(template.get_realm())
+        c.set_workstation(template.get_workstation())
+        c.set_gensec_features(c.get_gensec_features()
+                              | gensec.FEATURE_SEAL)
+        c.set_kerberos_state(kerberos_state)
+        return c
+
+
+
     # These functions didn't exist before Python2.7:
     if sys.version_info < (2, 7):
         import warnings
@@ -118,6 +177,14 @@ class TestCase(unittest.TestCase):
             self._cleanups = getattr(self, "_cleanups", []) + [
                 (fn, args, kwargs)]
 
+        def assertRegexpMatches(self, text, regex, msg=None):
+            # PY3 note: Python 3 will never see this, but we use
+            # text_type for the benefit of linters.
+            if isinstance(regex, (str, text_type)):
+                regex = re.compile(regex)
+            if not regex.search(text):
+                self.fail(msg)
+
         def _addSkip(self, result, reason):
             addSkip = getattr(result, 'addSkip', None)
             if addSkip is not None:
@@ -134,7 +201,7 @@ class TestCase(unittest.TestCase):
             try:
                 try:
                     self.setUp()
-                except SkipTest, e:
+                except SkipTest as e:
                     self._addSkip(result, str(e))
                     return
                 except KeyboardInterrupt:
@@ -147,7 +214,7 @@ class TestCase(unittest.TestCase):
                 try:
                     testMethod()
                     ok = True
-                except SkipTest, e:
+                except SkipTest as e:
                     self._addSkip(result, str(e))
                     return
                 except self.failureException:
@@ -159,7 +226,7 @@ class TestCase(unittest.TestCase):
 
                 try:
                     self.tearDown()
-                except SkipTest, e:
+                except SkipTest as e:
                     self._addSkip(result, str(e))
                 except KeyboardInterrupt:
                     raise
@@ -173,13 +240,37 @@ class TestCase(unittest.TestCase):
             finally:
                 result.stopTest(self)
 
+    def assertStringsEqual(self, a, b, msg=None, strip=False):
+        """Assert equality between two strings and highlight any differences.
+        If strip is true, leading and trailing whitespace is ignored."""
+        if strip:
+            a = a.strip()
+            b = b.strip()
+
+        if a != b:
+            sys.stderr.write("The strings differ %s(lengths %d vs %d); "
+                             "a diff follows\n"
+                             % ('when stripped ' if strip else '',
+                                len(a), len(b),
+                                ))
+
+            from difflib import unified_diff
+            diff = unified_diff(a.splitlines(True),
+                                b.splitlines(True),
+                                'a', 'b')
+            for line in diff:
+                sys.stderr.write(line)
+
+            self.fail(msg)
+
 
 class LdbTestCase(TestCase):
     """Trivial test case for running tests against a LDB."""
 
     def setUp(self):
         super(LdbTestCase, self).setUp()
-        self.filename = os.tempnam()
+        self.tempfile = tempfile.NamedTemporaryFile(delete=False)
+        self.filename = self.tempfile.name
         self.ldb = samba.Ldb(self.filename)
 
     def set_modules(self, modules=[]):
@@ -213,13 +304,16 @@ def env_loadparm():
     return lp
 
 
-def env_get_var_value(var_name):
+def env_get_var_value(var_name, allow_missing=False):
     """Returns value for variable in os.environ
 
     Function throws AssertionError if variable is defined.
     Unit-test based python tests require certain input params
     to be set in environment, otherwise they can't be run
     """
+    if allow_missing:
+        if var_name not in os.environ.keys():
+            return None
     assert var_name in os.environ.keys(), "Please supply %s in environment" % var_name
     return os.environ[var_name]
 
@@ -229,524 +323,6 @@ cmdline_credentials = None
 class RpcInterfaceTestCase(TestCase):
     """DCE/RPC Test case."""
 
-class RawDCERPCTest(TestCase):
-    """A raw DCE/RPC Test case."""
-
-    def _disconnect(self, reason):
-        if self.s is None:
-            return
-        self.s.close()
-        self.s = None
-        if self.do_hexdump:
-            sys.stderr.write("disconnect[%s]\n" % reason)
-
-    def connect(self):
-        try:
-            self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
-                                        socket.SOCK_STREAM, socket.SOL_TCP,
-                                        0)
-            self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
-            self.s.settimeout(10)
-            self.s.connect(self.a[0][4])
-        except socket.error as e:
-            self.s.close()
-            raise
-        except IOError as e:
-            self.s.close()
-            raise
-        except Exception as e:
-            raise
-        finally:
-            pass
-
-    def setUp(self):
-        super(RawDCERPCTest, self).setUp()
-        self.do_ndr_print = False
-        self.do_hexdump = False
-
-        self.host = samba.tests.env_get_var_value('SERVER')
-        self.tcp_port = 135
-
-        self.settings = {}
-        self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
-        self.settings["target_hostname"] = self.host
-
-        self.connect()
-
-    def epmap_reconnect(self, abstract):
-        ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
-
-        tsf0_list = [ndr32]
-        ctx0 = samba.dcerpc.dcerpc.ctx_list()
-        ctx0.context_id = 1
-        ctx0.num_transfer_syntaxes = len(tsf0_list)
-        ctx0.abstract_syntax = samba.dcerpc.epmapper.abstract_syntax()
-        ctx0.transfer_syntaxes = tsf0_list
-
-        req = self.generate_bind(call_id=0, ctx_list=[ctx0])
-        self.send_pdu(req)
-        rep = self.recv_pdu()
-        self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK,
-                        req.call_id, auth_length=0)
-        self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
-        self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
-        self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
-        self.assertEqual(rep.u.secondary_address_size, 4)
-        self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port)
-        self.assertEqual(len(rep.u._pad1), 2)
-        self.assertEqual(rep.u._pad1, '\0' * 2)
-        self.assertEqual(rep.u.num_results, 1)
-        self.assertEqual(rep.u.ctx_list[0].result,
-                samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
-        self.assertEqual(rep.u.ctx_list[0].reason,
-                samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
-        self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
-        self.assertEqual(rep.u.auth_info, '\0' * 0)
-
-        # And now try a request
-        data1 = samba.ndr.ndr_pack(abstract)
-        lhs1 = samba.dcerpc.epmapper.epm_lhs()
-        lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
-        lhs1.lhs_data = data1[:18]
-        rhs1 = samba.dcerpc.epmapper.epm_rhs_uuid()
-        rhs1.unknown = data1[18:]
-        floor1 = samba.dcerpc.epmapper.epm_floor()
-        floor1.lhs = lhs1
-        floor1.rhs = rhs1
-        data2 = samba.ndr.ndr_pack(ndr32)
-        lhs2 = samba.dcerpc.epmapper.epm_lhs()
-        lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
-        lhs2.lhs_data = data2[:18]
-        rhs2 = samba.dcerpc.epmapper.epm_rhs_uuid()
-        rhs2.unknown = data1[18:]
-        floor2 = samba.dcerpc.epmapper.epm_floor()
-        floor2.lhs = lhs2
-        floor2.rhs = rhs2
-        lhs3 = samba.dcerpc.epmapper.epm_lhs()
-        lhs3.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_NCACN
-        lhs3.lhs_data = ""
-        floor3 = samba.dcerpc.epmapper.epm_floor()
-        floor3.lhs = lhs3
-        floor3.rhs.minor_version = 0
-        lhs4 = samba.dcerpc.epmapper.epm_lhs()
-        lhs4.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_TCP
-        lhs4.lhs_data = ""
-        floor4 = samba.dcerpc.epmapper.epm_floor()
-        floor4.lhs = lhs4
-        floor4.rhs.port = self.tcp_port
-        lhs5 = samba.dcerpc.epmapper.epm_lhs()
-        lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
-        lhs5.lhs_data = ""
-        floor5 = samba.dcerpc.epmapper.epm_floor()
-        floor5.lhs = lhs5
-        floor5.rhs.ipaddr = "0.0.0.0"
-
-        floors = [floor1,floor2,floor3,floor4,floor5]
-        req_tower = samba.dcerpc.epmapper.epm_tower()
-        req_tower.num_floors = len(floors)
-        req_tower.floors = floors
-        req_twr = samba.dcerpc.epmapper.epm_twr_t()
-        req_twr.tower = req_tower
-
-        pack_twr = samba.ndr.ndr_pack(req_twr)
-
-        # object
-        stub =  "\x01\x00\x00\x00"
-        stub += "\x00" * 16
-        # tower
-        stub += "\x02\x00\x00\x00"
-        stub += pack_twr
-        # padding?
-        stub += "\x00" * 1
-        # handle
-        stub += "\x00" * 20
-        # max_towers
-        stub += "\x04\x00\x00\x00"
-
-        # we do an epm_Map() request
-        req = self.generate_request(call_id = 1,
-                                    context_id=ctx0.context_id,
-                                    opnum=3,
-                                    stub=stub)
-        self.send_pdu(req)
-        rep = self.recv_pdu()
-        self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE,
-                        req.call_id, auth_length=0)
-        self.assertNotEqual(rep.u.alloc_hint, 0)
-        self.assertEqual(rep.u.context_id, req.u.context_id)
-        self.assertEqual(rep.u.cancel_count, 0)
-        self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
-
-        num_towers = struct.unpack_from("<I", rep.u.stub_and_verifier, 20)
-        (array_max, array_ofs, array_cnt) = struct.unpack_from("<III", rep.u.stub_and_verifier, 24)
-        status = struct.unpack_from("<I", rep.u.stub_and_verifier, len(rep.u.stub_and_verifier) - 4)
-        self.assertEqual(status[0], 0)
-        self.assertGreaterEqual(num_towers[0], 1)
-        self.assertEqual(array_max, 4)
-        self.assertEqual(array_ofs, 0)
-        self.assertGreaterEqual(array_cnt, 1)
-
-        unpack_twr = rep.u.stub_and_verifier[(36 + 4 * array_cnt):-4]
-        rep_twr = samba.ndr.ndr_unpack(samba.dcerpc.epmapper.epm_twr_t, unpack_twr, allow_remaining=True)
-        self.assertEqual(rep_twr.tower_length, 75)
-        self.assertEqual(rep_twr.tower.num_floors, 5)
-        self.assertEqual(len(rep_twr.tower.floors), 5)
-        self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
-                          samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
-        self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
-                          samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
-
-        # reconnect to the given port
-        self._disconnect("epmap_reconnect")
-        self.tcp_port = rep_twr.tower.floors[3].rhs.port
-        self.connect()
-
-    def send_pdu(self, req, ndr_print=None, hexdump=None):
-        if ndr_print is None:
-            ndr_print = self.do_ndr_print
-        if hexdump is None:
-            hexdump = self.do_hexdump
-        try:
-            req_pdu = samba.ndr.ndr_pack(req)
-            if ndr_print:
-                sys.stderr.write("send_pdu: %s" % samba.ndr.ndr_print(req))
-            if hexdump:
-                sys.stderr.write("send_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
-            while True:
-                sent = self.s.send(req_pdu, 0)
-                if sent == len(req_pdu):
-                    break
-                req_pdu = req_pdu[sent:]
-        except socket.error as e:
-            self._disconnect("send_pdu: %s" % e)
-            raise
-        except IOError as e:
-            self._disconnect("send_pdu: %s" % e)
-            raise
-        finally:
-            pass
-
-    def recv_raw(self, hexdump=None, timeout=None):
-        rep_pdu = None
-        if hexdump is None:
-            hexdump = self.do_hexdump
-        try:
-            if timeout is not None:
-                self.s.settimeout(timeout)
-            rep_pdu = self.s.recv(0xffff, 0)
-            self.s.settimeout(10)
-            if len(rep_pdu) == 0:
-                self._disconnect("recv_raw: EOF")
-                return None
-            if hexdump:
-                sys.stderr.write("recv_raw: %d\n%s" % (len(rep_pdu), self.hexdump(rep_pdu)))
-        except socket.timeout as e:
-            self.s.settimeout(10)
-            sys.stderr.write("recv_raw: TIMEOUT\n")
-            pass
-        except socket.error as e:
-            self._disconnect("recv_raw: %s" % e)
-            raise
-        except IOError as e:
-            self._disconnect("recv_raw: %s" % e)
-            raise
-        finally:
-            pass
-        return rep_pdu
-
-    def recv_pdu(self, ndr_print=None, hexdump=None, timeout=None):
-        rep = None
-        if ndr_print is None:
-            ndr_print = self.do_ndr_print
-        if hexdump is None:
-            hexdump = self.do_hexdump
-        try:
-            rep_pdu = self.recv_raw(hexdump=hexdump, timeout=timeout)
-            if rep_pdu is None:
-                return None
-            rep = samba.ndr.ndr_unpack(samba.dcerpc.dcerpc.ncacn_packet, rep_pdu, allow_remaining=True)
-            if ndr_print:
-                sys.stderr.write("recv_pdu: %s" % samba.ndr.ndr_print(rep))
-            self.assertEqual(rep.frag_length, len(rep_pdu))
-        finally:
-            pass
-        return rep
-
-    def generate_auth(self,
-                      auth_type=None,
-                      auth_level=None,
-                      auth_pad_length=0,
-                      auth_context_id=None,
-                      auth_blob=None,
-                      ndr_print=None, hexdump=None):
-        if ndr_print is None:
-            ndr_print = self.do_ndr_print
-        if hexdump is None:
-            hexdump = self.do_hexdump
-
-        if auth_type is not None:
-            a = samba.dcerpc.dcerpc.auth()
-            a.auth_type = auth_type
-            a.auth_level = auth_level
-            a.auth_pad_length = auth_pad_length
-            a.auth_context_id= auth_context_id
-            a.credentials = auth_blob
-
-            ai = samba.ndr.ndr_pack(a)
-            if ndr_print:
-                sys.stderr.write("generate_auth: %s" % samba.ndr.ndr_print(a))
-            if hexdump:
-                sys.stderr.write("generate_auth: %d\n%s" % (len(ai), self.hexdump(ai)))
-        else:
-            ai = ""
-
-        return ai
-
-    def parse_auth(self, auth_info, ndr_print=None, hexdump=None):
-        if ndr_print is None:
-            ndr_print = self.do_ndr_print
-        if hexdump is None:
-            hexdump = self.do_hexdump
-
-        if (len(auth_info) <= samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH):
-            return None
-
-        if hexdump:
-            sys.stderr.write("parse_auth: %d\n%s" % (len(auth_info), self.hexdump(auth_info)))
-        a = samba.ndr.ndr_unpack(samba.dcerpc.dcerpc.auth, auth_info, allow_remaining=True)
-        if ndr_print:
-            sys.stderr.write("parse_auth: %s" % samba.ndr.ndr_print(a))
-
-        return a
-
-    def generate_pdu(self, ptype, call_id, payload,
-                     rpc_vers=5,
-                     rpc_vers_minor=0,
-                     pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
-                                 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
-                     drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
-                     ndr_print=None, hexdump=None):
-
-        if getattr(payload, 'auth_info', None):
-            ai = payload.auth_info
-        else:
-            ai = ""
-
-        p = samba.dcerpc.dcerpc.ncacn_packet()
-        p.rpc_vers = rpc_vers
-        p.rpc_vers_minor = rpc_vers_minor
-        p.ptype = ptype
-        p.pfc_flags = pfc_flags
-        p.drep = drep
-        p.frag_length = 0
-        if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
-            p.auth_length = len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
-        else:
-            p.auth_length = 0
-        p.call_id = call_id
-        p.u = payload
-
-        pdu = samba.ndr.ndr_pack(p)
-        p.frag_length = len(pdu)
-
-        return p
-
-    def verify_pdu(self, p, ptype, call_id,
-                   rpc_vers=5,
-                   rpc_vers_minor=0,
-                   pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
-                               samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
-                   drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
-                   auth_length=None):
-
-        self.assertIsNotNone(p, "No valid pdu")
-
-        if getattr(p.u, 'auth_info', None):
-            ai = p.u.auth_info
-        else:
-            ai = ""
-
-        self.assertEqual(p.rpc_vers, rpc_vers)
-        self.assertEqual(p.rpc_vers_minor, rpc_vers_minor)
-        self.assertEqual(p.ptype, ptype)
-        self.assertEqual(p.pfc_flags, pfc_flags)
-        self.assertEqual(p.drep, drep)
-        self.assertGreaterEqual(p.frag_length,
-                samba.dcerpc.dcerpc.DCERPC_NCACN_PAYLOAD_OFFSET)
-        if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
-            self.assertEqual(p.auth_length,
-                    len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
-        elif auth_length is not None:
-            self.assertEqual(p.auth_length, auth_length)
-        else:
-            self.assertEqual(p.auth_length, 0)
-        self.assertEqual(p.call_id, call_id)
-
-        return
-
-    def generate_bind(self, call_id,
-                      pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
-                                  samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
-                      max_xmit_frag=5840,
-                      max_recv_frag=5840,
-                      assoc_group_id=0,
-                      ctx_list=[],
-                      auth_info="",
-                      ndr_print=None, hexdump=None):
-
-        b = samba.dcerpc.dcerpc.bind()
-        b.max_xmit_frag = max_xmit_frag
-        b.max_recv_frag = max_recv_frag
-        b.assoc_group_id = assoc_group_id
-        b.num_contexts = len(ctx_list)
-        b.ctx_list = ctx_list
-        b.auth_info = auth_info
-
-        p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_BIND,
-                              pfc_flags=pfc_flags,
-                              call_id=call_id,
-                              payload=b,
-                              ndr_print=ndr_print, hexdump=hexdump)
-
-        return p
-
-    def generate_alter(self, call_id,
-                       pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
-                                   samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
-                       max_xmit_frag=5840,
-                       max_recv_frag=5840,
-                       assoc_group_id=0,
-                       ctx_list=[],
-                       auth_info="",
-                       ndr_print=None, hexdump=None):
-
-        a = samba.dcerpc.dcerpc.bind()
-        a.max_xmit_frag = max_xmit_frag
-        a.max_recv_frag = max_recv_frag
-        a.assoc_group_id = assoc_group_id
-        a.num_contexts = len(ctx_list)
-        a.ctx_list = ctx_list
-        a.auth_info = auth_info
-
-        p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ALTER,
-                              pfc_flags=pfc_flags,
-                              call_id=call_id,
-                              payload=a,
-                              ndr_print=ndr_print, hexdump=hexdump)
-
-        return p
-
-    def generate_auth3(self, call_id,
-                       pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
-                                   samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
-                       auth_info="",
-                       ndr_print=None, hexdump=None):
-
-        a = samba.dcerpc.dcerpc.auth3()
-        a.auth_info = auth_info
-
-        p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_AUTH3,
-                              pfc_flags=pfc_flags,
-                              call_id=call_id,
-                              payload=a,
-                              ndr_print=ndr_print, hexdump=hexdump)
-
-        return p
-
-    def generate_request(self, call_id,
-                         pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
-                                     samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
-                         alloc_hint=None,
-                         context_id=None,
-                         opnum=None,
-                         object=None,
-                         stub=None,
-                         auth_info="",
-                         ndr_print=None, hexdump=None):
-
-        if alloc_hint is None:
-            alloc_hint = len(stub)
-
-        r = samba.dcerpc.dcerpc.request()
-        r.alloc_hint = alloc_hint
-        r.context_id = context_id
-        r.opnum = opnum
-        if object is not None:
-            r.object = object
-        r.stub_and_verifier = stub + auth_info
-
-        p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_REQUEST,
-                              pfc_flags=pfc_flags,
-                              call_id=call_id,
-                              payload=r,
-                              ndr_print=ndr_print, hexdump=hexdump)
-
-        if len(auth_info) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
-            p.auth_length = len(auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
-
-        return p
-
-    def generate_co_cancel(self, call_id,
-                           pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
-                                       samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
-                           auth_info="",
-                           ndr_print=None, hexdump=None):
-
-        c = samba.dcerpc.dcerpc.co_cancel()
-        c.auth_info = auth_info
-
-        p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_CO_CANCEL,
-                              pfc_flags=pfc_flags,
-                              call_id=call_id,
-                              payload=c,
-                              ndr_print=ndr_print, hexdump=hexdump)
-
-        return p
-
-    def generate_orphaned(self, call_id,
-                          pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
-                                      samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
-                          auth_info="",
-                          ndr_print=None, hexdump=None):
-
-        o = samba.dcerpc.dcerpc.orphaned()
-        o.auth_info = auth_info
-
-        p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ORPHANED,
-                              pfc_flags=pfc_flags,
-                              call_id=call_id,
-                              payload=o,
-                              ndr_print=ndr_print, hexdump=hexdump)
-
-        return p
-
-    def generate_shutdown(self, call_id,
-                          pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
-                                      samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
-                          ndr_print=None, hexdump=None):
-
-        s = samba.dcerpc.dcerpc.shutdown()
-
-        p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_SHUTDOWN,
-                              pfc_flags=pfc_flags,
-                              call_id=call_id,
-                              payload=s,
-                              ndr_print=ndr_print, hexdump=hexdump)
-
-        return p
-
-    def assertIsConnected(self):
-        self.assertIsNotNone(self.s, msg="Not connected")
-        return
-
-    def assertNotConnected(self):
-        self.assertIsNone(self.s, msg="Is connected")
-        return
-
-    def assertNDRSyntaxEquals(self, s1, s2):
-        self.assertEqual(s1.uuid, s2.uuid)
-        self.assertEqual(s1.if_version, s2.if_version)
-        return
 
 class ValidNetbiosNameTests(TestCase):
 
@@ -768,15 +344,20 @@ class BlackboxProcessError(Exception):
     (S.stderr)
     """
 
-    def __init__(self, returncode, cmd, stdout, stderr):
+    def __init__(self, returncode, cmd, stdout, stderr, msg=None):
         self.returncode = returncode
         self.cmd = cmd
         self.stdout = stdout
         self.stderr = stderr
+        self.msg = msg
 
     def __str__(self):
-        return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self.cmd, self.returncode,
-                                                                             self.stdout, self.stderr)
+        s = ("Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" %
+             (self.cmd, self.returncode, self.stdout, self.stderr))
+        if self.msg is not None:
+            s = "%s; message: %s" % (s, self.msg)
+
+        return s
 
 class BlackboxTestCase(TestCaseInTempDir):
     """Base test case for blackbox tests."""
@@ -789,20 +370,32 @@ class BlackboxTestCase(TestCaseInTempDir):
         line = " ".join(parts)
         return line
 
-    def check_run(self, line):
+    def check_run(self, line, msg=None):
+        self.check_exit_code(line, 0, msg=msg)
+
+    def check_exit_code(self, line, expected, msg=None):
         line = self._make_cmdline(line)
-        p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
-        retcode = p.wait()
-        if retcode:
-            raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
+        p = subprocess.Popen(line,
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             shell=True)
+        stdoutdata, stderrdata = p.communicate()
+        retcode = p.returncode
+        if retcode != expected:
+            raise BlackboxProcessError(retcode,
+                                       line,
+                                       stdoutdata,
+                                       stderrdata,
+                                       msg)
 
     def check_output(self, line):
         line = self._make_cmdline(line)
         p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
-        retcode = p.wait()
+        stdoutdata, stderrdata = p.communicate()
+        retcode = p.returncode
         if retcode:
-            raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
-        return p.stdout.read()
+            raise BlackboxProcessError(retcode, line, stdoutdata, stderrdata)
+        return stdoutdata
 
 
 def connect_samdb(samdb_url, lp=None, session_info=None, credentials=None,
@@ -890,8 +483,21 @@ def connect_samdb_env(env_url, env_username, env_password, lp=None):
     return connect_samdb(samdb_url, credentials=creds, lp=lp)
 
 
-def delete_force(samdb, dn):
+def delete_force(samdb, dn, **kwargs):
     try:
-        samdb.delete(dn)
-    except ldb.LdbError, (num, errstr):
+        samdb.delete(dn, **kwargs)
+    except ldb.LdbError as error:
+        (num, errstr) = error.args
         assert num == ldb.ERR_NO_SUCH_OBJECT, "ldb.delete() failed: %s" % errstr
+
+def create_test_ou(samdb, name):
+    """Creates a unique OU for the test"""
+
+    # Add some randomness to the test OU. Replication between the testenvs is
+    # constantly happening in the background. Deletion of the last test's
+    # objects can be slow to replicate out. So the OU created by a previous
+    # testenv may still exist at the point that tests start on another testenv.
+    rand = randint(1, 10000000)
+    dn = ldb.Dn(samdb, "OU=%s%d,%s" % (name, rand, samdb.get_default_basedn()))
+    samdb.add({"dn": dn, "objectclass": "organizationalUnit"})
+    return dn