py:dcerpc/raw_testcase: add helper functions for ncacn_np: SMB connection support
authorStefan Metzmacher <metze@samba.org>
Thu, 22 Nov 2018 17:21:03 +0000 (18:21 +0100)
committerJeremy Allison <jra@samba.org>
Sun, 23 Dec 2018 17:15:22 +0000 (18:15 +0100)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=7113
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11892

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
python/samba/tests/dcerpc/raw_testcase.py

index 77722d82b93fab28b1efd634b013c8cbf37704fb..ca1b546a399f9da0070da166f0839614e9e281aa 100644 (file)
@@ -21,13 +21,63 @@ import socket
 import samba.dcerpc.dcerpc as dcerpc
 import samba.dcerpc.base
 import samba.dcerpc.epmapper
+import samba.dcerpc.security as security
 import samba.tests
 from samba import gensec
 from samba.credentials import Credentials
 from samba.tests import TestCase
 from samba.ndr import ndr_pack, ndr_unpack, ndr_unpack_out
 from samba.compat import text_type
+from samba.ntstatus import (
+    NT_STATUS_CONNECTION_DISCONNECTED,
+    NT_STATUS_PIPE_DISCONNECTED,
+    NT_STATUS_IO_TIMEOUT
+)
+from samba import NTSTATUSError
+from samba.samba3 import param as s3param
+from samba.samba3 import libsmb_samba_internal
+
+class smb_pipe_socket(object):
+
+    def __init__(self, target_hostname, pipename, creds, impersonation_level, lp):
+        lp3 = s3param.get_context()
+        lp3.load(lp.configfile)
+        self.smbconn = libsmb_samba_internal.Conn(target_hostname, 'IPC$',
+                                                  credentials=creds, sign=True)
+        self.smbfid = self.smbconn.create(pipename,
+                                          DesiredAccess=0x12019f,
+                                          ShareAccess=0x7,
+                                          CreateDisposition=1,
+                                          CreateOptions=0x400040,
+                                          ImpersonationLevel=impersonation_level)
+        return
+
+    def close(self):
+        self.smbconn.close(self.smbfid)
+        del self.smbconn
+
+    def settimeout(self, timeo):
+        # The socket module we simulate there
+        # specifies the timeo as seconds as float.
+        msecs = int(timeo * 1000)
+        assert msecs >= 0
+        self.smbconn.settimeout(msecs)
+        return
 
+    def send(self, buf, flags=0):
+        return self.smbconn.write(self.smbfid, buffer=buf, offset=0, mode=8)
+
+    def recv(self, len, flags=0):
+        try:
+            return self.smbconn.read(self.smbfid, offset=0, size=len)
+        except NTSTATUSError as e:
+            if e.args[0] == NT_STATUS_CONNECTION_DISCONNECTED:
+                return b'\0' * 0
+            if e.args[0] == NT_STATUS_PIPE_DISCONNECTED:
+                return b'\0' * 0
+            if e.args[0] == NT_STATUS_IO_TIMEOUT:
+                raise socket.timeout(str(e))
+            raise e
 
 class RawDCERPCTest(TestCase):
     """A raw DCE/RPC Test case."""
@@ -40,9 +90,10 @@ class RawDCERPCTest(TestCase):
         if self.do_hexdump:
             sys.stderr.write("disconnect[%s]\n" % reason)
 
-    def connect(self):
+    def _connect_tcp(self):
+        tcp_port = int(self.primary_address)
         try:
-            self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
+            self.a = socket.getaddrinfo(self.host, tcp_port, socket.AF_UNSPEC,
                                         socket.SOCK_STREAM, socket.SOL_TCP,
                                         0)
             self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
@@ -60,7 +111,36 @@ class RawDCERPCTest(TestCase):
             pass
         self.max_xmit_frag = 5840
         self.max_recv_frag = 5840
-        self.secondary_address = "%d" % self.tcp_port
+        if self.secondary_address is None:
+            self.secondary_address = self.primary_address
+        # compat for older tests
+        self.tcp_port = tcp_port
+
+    def _connect_smb(self):
+        a = self.primary_address.split('\\')
+        self.assertEquals(len(a), 3)
+        self.assertEquals(a[0], "")
+        self.assertEquals(a[1], "pipe")
+        pipename = a[2]
+        self.s = smb_pipe_socket(self.target_hostname,
+                                 pipename,
+                                 self.transport_creds,
+                                 self.transport_impersonation,
+                                 self.lp_ctx)
+        self.max_xmit_frag = 4280
+        self.max_recv_frag = 4280
+        if self.secondary_address is None:
+            self.secondary_address = self.primary_address
+
+    def connect(self):
+        self.assertNotConnected()
+        if self.primary_address.startswith("\\pipe\\"):
+            self._connect_smb()
+        else:
+            self._connect_tcp()
+        if self.secondary_address is None:
+            self.secondary_address = self.primary_address
+        return
 
     def setUp(self):
         super(RawDCERPCTest, self).setUp()
@@ -73,12 +153,16 @@ class RawDCERPCTest(TestCase):
         self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True)
         if self.target_hostname is None:
             self.target_hostname = self.host
-        self.tcp_port = 135
+        self.primary_address = "135"
+        self.secondary_address = None
+        self.transport_creds = self.get_anon_creds()
+        self.transport_impersonation = 0x2
 
         self.settings = {}
         self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
         self.settings["target_hostname"] = self.target_hostname
 
+        self.s = None
         self.connect()
 
     def tearDown(self):
@@ -88,7 +172,27 @@ class RawDCERPCTest(TestCase):
     def noop(self):
         return
 
-    def second_connection(self, tcp_port=None):
+    def reconnect_smb_pipe(self, primary_address, secondary_address=None,
+                           transport_creds=None, transport_impersonation=None):
+        self._disconnect("reconnect_smb_pipe")
+        self.assertIsNotNone(primary_address)
+        self.primary_address = primary_address
+        if secondary_address is not None:
+            self.secondary_address = secondary_address
+        else:
+            self.secondary_address = None
+
+        if transport_creds is not None:
+            self.transport_creds = transport_creds
+
+        if transport_impersonation is not None:
+            self.transport_impersonation = transport_impersonation
+
+        self.connect()
+        return
+
+    def second_connection(self, primary_address=None, secondary_address=None,
+                          transport_creds=None, transport_impersonation=None):
         c = RawDCERPCTest(methodName='noop')
         c.do_ndr_print = self.do_ndr_print
         c.do_hexdump = self.do_hexdump
@@ -96,13 +200,31 @@ class RawDCERPCTest(TestCase):
 
         c.host = self.host
         c.target_hostname = self.target_hostname
-        if tcp_port is not None:
-            c.tcp_port = tcp_port
+        if primary_address is not None:
+            c.primary_address = primary_address
+            if secondary_address is not None:
+                c.secondary_address = secondary_address
+            else:
+                c.secondary_address = None
         else:
-            c.tcp_port = self.tcp_port
+            self.assertIsNone(secondary_address)
+            c.primary_address = self.primary_address
+            c.secondary_address = self.secondary_address
 
+        if transport_creds is not None:
+            c.transport_creds = transport_creds
+        else:
+            c.transport_creds = self.transport_creds
+
+        if transport_impersonation is not None:
+            c.transport_impersonation = transport_impersonation
+        else:
+            c.transport_impersonation = self.transport_impersonation
+
+        c.lp_ctx = self.lp_ctx
         c.settings = self.settings
 
+        c.s = None
         c.connect()
         return c
 
@@ -478,7 +600,7 @@ class RawDCERPCTest(TestCase):
         lhs4.lhs_data = b""
         floor4 = samba.dcerpc.epmapper.epm_floor()
         floor4.lhs = lhs4
-        floor4.rhs.port = self.tcp_port
+        floor4.rhs.port = int(self.primary_address)
         lhs5 = samba.dcerpc.epmapper.epm_lhs()
         lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
         lhs5.lhs_data = b""
@@ -514,7 +636,8 @@ class RawDCERPCTest(TestCase):
 
         # reconnect to the given port
         self._disconnect("epmap_reconnect")
-        self.tcp_port = rep_twr.tower.floors[3].rhs.port
+        self.primary_address = "%d" % rep_twr.tower.floors[3].rhs.port
+        self.secondary_address = None
         self.connect()
 
     def send_pdu(self, req, ndr_print=None, hexdump=None):
@@ -539,6 +662,9 @@ class RawDCERPCTest(TestCase):
         except IOError as e:
             self._disconnect("send_pdu: %s" % e)
             raise
+        except NTSTATUSError as e:
+            self._disconnect("send_pdu: %s" % e)
+            raise
         finally:
             pass