import samba.dcerpc.dcerpc as dcerpc
import samba.dcerpc.base
import samba.dcerpc.epmapper
+import samba.dcerpc.security as security
import samba.tests
from samba import gensec
from samba.credentials import Credentials
from samba.tests import TestCase
from samba.ndr import ndr_pack, ndr_unpack, ndr_unpack_out
from samba.compat import text_type
+from samba.ntstatus import (
+ NT_STATUS_CONNECTION_DISCONNECTED,
+ NT_STATUS_PIPE_DISCONNECTED,
+ NT_STATUS_IO_TIMEOUT
+)
+from samba import NTSTATUSError
+from samba.samba3 import param as s3param
+from samba.samba3 import libsmb_samba_internal
+
+class smb_pipe_socket(object):
+
+ def __init__(self, target_hostname, pipename, creds, impersonation_level, lp):
+ lp3 = s3param.get_context()
+ lp3.load(lp.configfile)
+ self.smbconn = libsmb_samba_internal.Conn(target_hostname, 'IPC$',
+ credentials=creds, sign=True)
+ self.smbfid = self.smbconn.create(pipename,
+ DesiredAccess=0x12019f,
+ ShareAccess=0x7,
+ CreateDisposition=1,
+ CreateOptions=0x400040,
+ ImpersonationLevel=impersonation_level)
+ return
+
+ def close(self):
+ self.smbconn.close(self.smbfid)
+ del self.smbconn
+
+ def settimeout(self, timeo):
+ # The socket module we simulate there
+ # specifies the timeo as seconds as float.
+ msecs = int(timeo * 1000)
+ assert msecs >= 0
+ self.smbconn.settimeout(msecs)
+ return
+ def send(self, buf, flags=0):
+ return self.smbconn.write(self.smbfid, buffer=buf, offset=0, mode=8)
+
+ def recv(self, len, flags=0):
+ try:
+ return self.smbconn.read(self.smbfid, offset=0, size=len)
+ except NTSTATUSError as e:
+ if e.args[0] == NT_STATUS_CONNECTION_DISCONNECTED:
+ return b'\0' * 0
+ if e.args[0] == NT_STATUS_PIPE_DISCONNECTED:
+ return b'\0' * 0
+ if e.args[0] == NT_STATUS_IO_TIMEOUT:
+ raise socket.timeout(str(e))
+ raise e
class RawDCERPCTest(TestCase):
"""A raw DCE/RPC Test case."""
if self.do_hexdump:
sys.stderr.write("disconnect[%s]\n" % reason)
- def connect(self):
+ def _connect_tcp(self):
+ tcp_port = int(self.primary_address)
try:
- self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC,
+ self.a = socket.getaddrinfo(self.host, tcp_port, socket.AF_UNSPEC,
socket.SOCK_STREAM, socket.SOL_TCP,
0)
self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
pass
self.max_xmit_frag = 5840
self.max_recv_frag = 5840
- self.secondary_address = "%d" % self.tcp_port
+ if self.secondary_address is None:
+ self.secondary_address = self.primary_address
+ # compat for older tests
+ self.tcp_port = tcp_port
+
+ def _connect_smb(self):
+ a = self.primary_address.split('\\')
+ self.assertEquals(len(a), 3)
+ self.assertEquals(a[0], "")
+ self.assertEquals(a[1], "pipe")
+ pipename = a[2]
+ self.s = smb_pipe_socket(self.target_hostname,
+ pipename,
+ self.transport_creds,
+ self.transport_impersonation,
+ self.lp_ctx)
+ self.max_xmit_frag = 4280
+ self.max_recv_frag = 4280
+ if self.secondary_address is None:
+ self.secondary_address = self.primary_address
+
+ def connect(self):
+ self.assertNotConnected()
+ if self.primary_address.startswith("\\pipe\\"):
+ self._connect_smb()
+ else:
+ self._connect_tcp()
+ if self.secondary_address is None:
+ self.secondary_address = self.primary_address
+ return
def setUp(self):
super(RawDCERPCTest, self).setUp()
self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True)
if self.target_hostname is None:
self.target_hostname = self.host
- self.tcp_port = 135
+ self.primary_address = "135"
+ self.secondary_address = None
+ self.transport_creds = self.get_anon_creds()
+ self.transport_impersonation = 0x2
self.settings = {}
self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
self.settings["target_hostname"] = self.target_hostname
+ self.s = None
self.connect()
def tearDown(self):
def noop(self):
return
- def second_connection(self, tcp_port=None):
+ def reconnect_smb_pipe(self, primary_address, secondary_address=None,
+ transport_creds=None, transport_impersonation=None):
+ self._disconnect("reconnect_smb_pipe")
+ self.assertIsNotNone(primary_address)
+ self.primary_address = primary_address
+ if secondary_address is not None:
+ self.secondary_address = secondary_address
+ else:
+ self.secondary_address = None
+
+ if transport_creds is not None:
+ self.transport_creds = transport_creds
+
+ if transport_impersonation is not None:
+ self.transport_impersonation = transport_impersonation
+
+ self.connect()
+ return
+
+ def second_connection(self, primary_address=None, secondary_address=None,
+ transport_creds=None, transport_impersonation=None):
c = RawDCERPCTest(methodName='noop')
c.do_ndr_print = self.do_ndr_print
c.do_hexdump = self.do_hexdump
c.host = self.host
c.target_hostname = self.target_hostname
- if tcp_port is not None:
- c.tcp_port = tcp_port
+ if primary_address is not None:
+ c.primary_address = primary_address
+ if secondary_address is not None:
+ c.secondary_address = secondary_address
+ else:
+ c.secondary_address = None
else:
- c.tcp_port = self.tcp_port
+ self.assertIsNone(secondary_address)
+ c.primary_address = self.primary_address
+ c.secondary_address = self.secondary_address
+ if transport_creds is not None:
+ c.transport_creds = transport_creds
+ else:
+ c.transport_creds = self.transport_creds
+
+ if transport_impersonation is not None:
+ c.transport_impersonation = transport_impersonation
+ else:
+ c.transport_impersonation = self.transport_impersonation
+
+ c.lp_ctx = self.lp_ctx
c.settings = self.settings
+ c.s = None
c.connect()
return c
lhs4.lhs_data = b""
floor4 = samba.dcerpc.epmapper.epm_floor()
floor4.lhs = lhs4
- floor4.rhs.port = self.tcp_port
+ floor4.rhs.port = int(self.primary_address)
lhs5 = samba.dcerpc.epmapper.epm_lhs()
lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
lhs5.lhs_data = b""
# reconnect to the given port
self._disconnect("epmap_reconnect")
- self.tcp_port = rep_twr.tower.floors[3].rhs.port
+ self.primary_address = "%d" % rep_twr.tower.floors[3].rhs.port
+ self.secondary_address = None
self.connect()
def send_pdu(self, req, ndr_print=None, hexdump=None):
except IOError as e:
self._disconnect("send_pdu: %s" % e)
raise
+ except NTSTATUSError as e:
+ self._disconnect("send_pdu: %s" % e)
+ raise
finally:
pass