fc1c86a1283b9f06e7ba1ddc1743999c7001b140
[samba.git] / python / samba / tests / dcerpc / raw_testcase.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 # Copyright (C) Stefan Metzmacher 2014,2015
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import sys
20 import socket
21 import samba.dcerpc.dcerpc as dcerpc
22 import samba.dcerpc.base
23 import samba.dcerpc.epmapper
24 import samba.dcerpc.security as security
25 import samba.tests
26 from samba import gensec
27 from samba.credentials import Credentials
28 from samba.tests import TestCase
29 from samba.ndr import ndr_pack, ndr_unpack, ndr_unpack_out
30 from samba.compat import text_type
31 from samba.ntstatus import (
32     NT_STATUS_CONNECTION_DISCONNECTED,
33     NT_STATUS_PIPE_DISCONNECTED,
34     NT_STATUS_IO_TIMEOUT
35 )
36 from samba import NTSTATUSError
37 from samba.samba3 import param as s3param
38 from samba.samba3 import libsmb_samba_internal as libsmb
39
40 class smb_pipe_socket(object):
41
42     def __init__(self, target_hostname, pipename, creds, impersonation_level, lp):
43         lp3 = s3param.get_context()
44         lp3.load(lp.configfile)
45         self.smbconn = libsmb.Conn(target_hostname, 'IPC$', lp3,
46                                    creds=creds, sign=True)
47         self.smbfid = self.smbconn.create(pipename,
48                                           DesiredAccess=0x12019f,
49                                           ShareAccess=0x7,
50                                           CreateDisposition=1,
51                                           CreateOptions=0x400040,
52                                           ImpersonationLevel=impersonation_level)
53         return
54
55     def close(self):
56         self.smbconn.close(self.smbfid)
57         del self.smbconn
58
59     def settimeout(self, timeo):
60         # The socket module we simulate there
61         # specifies the timeo as seconds as float.
62         msecs = int(timeo * 1000)
63         assert msecs >= 0
64         self.smbconn.settimeout(msecs)
65         return
66
67     def send(self, buf, flags=0):
68         return self.smbconn.write(self.smbfid, buffer=buf, offset=0, mode=8)
69
70     def recv(self, len, flags=0):
71         try:
72             return self.smbconn.read(self.smbfid, offset=0, size=len)
73         except NTSTATUSError as e:
74             if e.args[0] == NT_STATUS_CONNECTION_DISCONNECTED:
75                 return b'\0' * 0
76             if e.args[0] == NT_STATUS_PIPE_DISCONNECTED:
77                 return b'\0' * 0
78             if e.args[0] == NT_STATUS_IO_TIMEOUT:
79                 raise socket.timeout(str(e))
80             raise e
81
82 class RawDCERPCTest(TestCase):
83     """A raw DCE/RPC Test case."""
84
85     def _disconnect(self, reason):
86         if self.s is None:
87             return
88         self.s.close()
89         self.s = None
90         if self.do_hexdump:
91             sys.stderr.write("disconnect[%s]\n" % reason)
92
93     def _connect_tcp(self):
94         tcp_port = int(self.primary_address)
95         try:
96             self.a = socket.getaddrinfo(self.host, tcp_port, socket.AF_UNSPEC,
97                                         socket.SOCK_STREAM, socket.SOL_TCP,
98                                         0)
99             self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
100             self.s.settimeout(10)
101             self.s.connect(self.a[0][4])
102         except socket.error as e:
103             self.s.close()
104             raise
105         except IOError as e:
106             self.s.close()
107             raise
108         except Exception as e:
109             raise
110         finally:
111             pass
112         self.max_xmit_frag = 5840
113         self.max_recv_frag = 5840
114         if self.secondary_address is None:
115             self.secondary_address = self.primary_address
116         # compat for older tests
117         self.tcp_port = tcp_port
118
119     def _connect_smb(self):
120         a = self.primary_address.split('\\')
121         self.assertEquals(len(a), 3)
122         self.assertEquals(a[0], "")
123         self.assertEquals(a[1], "pipe")
124         pipename = a[2]
125         self.s = smb_pipe_socket(self.target_hostname,
126                                  pipename,
127                                  self.transport_creds,
128                                  self.transport_impersonation,
129                                  self.lp_ctx)
130         self.max_xmit_frag = 4280
131         self.max_recv_frag = 4280
132         if self.secondary_address is None:
133             self.secondary_address = self.primary_address
134
135     def connect(self):
136         self.assertNotConnected()
137         if self.primary_address.startswith("\\pipe\\"):
138             self._connect_smb()
139         else:
140             self._connect_tcp()
141         if self.secondary_address is None:
142             self.secondary_address = self.primary_address
143         return
144
145     def setUp(self):
146         super(RawDCERPCTest, self).setUp()
147         self.do_ndr_print = False
148         self.do_hexdump = False
149
150         self.ignore_random_pad = samba.tests.env_get_var_value('IGNORE_RANDOM_PAD',
151                                                                allow_missing=True)
152         self.host = samba.tests.env_get_var_value('SERVER')
153         self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True)
154         if self.target_hostname is None:
155             self.target_hostname = self.host
156         self.primary_address = "135"
157         self.secondary_address = None
158         self.transport_creds = self.get_anon_creds()
159         self.transport_impersonation = 0x2
160
161         self.settings = {}
162         self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
163         self.settings["target_hostname"] = self.target_hostname
164
165         self.s = None
166         self.connect()
167
168     def tearDown(self):
169         self._disconnect("tearDown")
170         super(TestCase, self).tearDown()
171
172     def noop(self):
173         return
174
175     def reconnect_smb_pipe(self, primary_address, secondary_address=None,
176                            transport_creds=None, transport_impersonation=None):
177         self._disconnect("reconnect_smb_pipe")
178         self.assertIsNotNone(primary_address)
179         self.primary_address = primary_address
180         if secondary_address is not None:
181             self.secondary_address = secondary_address
182         else:
183             self.secondary_address = None
184
185         if transport_creds is not None:
186             self.transport_creds = transport_creds
187
188         if transport_impersonation is not None:
189             self.transport_impersonation = transport_impersonation
190
191         self.connect()
192         return
193
194     def second_connection(self, primary_address=None, secondary_address=None,
195                           transport_creds=None, transport_impersonation=None):
196         c = RawDCERPCTest(methodName='noop')
197         c.do_ndr_print = self.do_ndr_print
198         c.do_hexdump = self.do_hexdump
199         c.ignore_random_pad = self.ignore_random_pad
200
201         c.host = self.host
202         c.target_hostname = self.target_hostname
203         if primary_address is not None:
204             c.primary_address = primary_address
205             if secondary_address is not None:
206                 c.secondary_address = secondary_address
207             else:
208                 c.secondary_address = None
209         else:
210             self.assertIsNone(secondary_address)
211             c.primary_address = self.primary_address
212             c.secondary_address = self.secondary_address
213
214         if transport_creds is not None:
215             c.transport_creds = transport_creds
216         else:
217             c.transport_creds = self.transport_creds
218
219         if transport_impersonation is not None:
220             c.transport_impersonation = transport_impersonation
221         else:
222             c.transport_impersonation = self.transport_impersonation
223
224         c.lp_ctx = self.lp_ctx
225         c.settings = self.settings
226
227         c.s = None
228         c.connect()
229         return c
230
231     def get_user_creds(self):
232         c = Credentials()
233         c.guess()
234         domain = samba.tests.env_get_var_value('DOMAIN')
235         realm = samba.tests.env_get_var_value('REALM')
236         username = samba.tests.env_get_var_value('USERNAME')
237         password = samba.tests.env_get_var_value('PASSWORD')
238         c.set_domain(domain)
239         c.set_realm(realm)
240         c.set_username(username)
241         c.set_password(password)
242         return c
243
244     def get_anon_creds(self):
245         c = Credentials()
246         c.set_anonymous()
247         return c
248
249     def get_auth_context_creds(self, creds, auth_type, auth_level,
250                                auth_context_id,
251                                g_auth_level=None,
252                                hdr_signing=False):
253
254         if g_auth_level is None:
255             g_auth_level = auth_level
256
257         g = gensec.Security.start_client(self.settings)
258         g.set_credentials(creds)
259         g.want_feature(gensec.FEATURE_DCE_STYLE)
260         g.start_mech_by_authtype(auth_type, g_auth_level)
261
262         if auth_type == dcerpc.DCERPC_AUTH_TYPE_KRB5:
263             expect_3legs = True
264         elif auth_type == dcerpc.DCERPC_AUTH_TYPE_NTLMSSP:
265             expect_3legs = True
266         else:
267             expect_3legs = False
268
269         auth_context = {}
270         auth_context["auth_type"] = auth_type
271         auth_context["auth_level"] = auth_level
272         auth_context["auth_context_id"] = auth_context_id
273         auth_context["g_auth_level"] = g_auth_level
274         auth_context["gensec"] = g
275         auth_context["hdr_signing"] = hdr_signing
276         auth_context["expect_3legs"] = expect_3legs
277
278         return auth_context
279
280     def do_generic_bind(self, ctx, auth_context=None,
281                         pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
282                         samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
283                         assoc_group_id=0, call_id=0,
284                         nak_reason=None, alter_fault=None,
285                         start_with_alter=False):
286         ctx_list = [ctx]
287
288         if auth_context is not None:
289             if auth_context['hdr_signing']:
290                 pfc_flags |= dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN
291
292             expect_3legs = auth_context["expect_3legs"]
293
294             from_server = b""
295             (finished, to_server) = auth_context["gensec"].update(from_server)
296             self.assertFalse(finished)
297
298             auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
299                                            auth_level=auth_context["auth_level"],
300                                            auth_context_id=auth_context["auth_context_id"],
301                                            auth_blob=to_server)
302         else:
303             auth_info = b""
304
305         if start_with_alter:
306             req = self.generate_alter(call_id=call_id,
307                                       pfc_flags=pfc_flags,
308                                       ctx_list=ctx_list,
309                                       assoc_group_id=0xffffffff - assoc_group_id,
310                                       auth_info=auth_info)
311             self.send_pdu(req)
312             rep = self.recv_pdu()
313             if alter_fault is not None:
314                 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
315                                 pfc_flags=req.pfc_flags |
316                                 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
317                                 auth_length=0)
318                 self.assertNotEquals(rep.u.alloc_hint, 0)
319                 self.assertEquals(rep.u.context_id, 0)
320                 self.assertEquals(rep.u.cancel_count, 0)
321                 self.assertEquals(rep.u.flags, 0)
322                 self.assertEquals(rep.u.status, alter_fault)
323                 self.assertEquals(rep.u.reserved, 0)
324                 self.assertEquals(len(rep.u.error_and_verifier), 0)
325                 return None
326             self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id,
327                             pfc_flags=req.pfc_flags)
328             self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
329             self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
330             self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
331             self.assertEquals(rep.u.secondary_address_size, 0)
332             self.assertEquals(rep.u.secondary_address, '')
333             self.assertPadding(rep.u._pad1, 2)
334         else:
335             req = self.generate_bind(call_id=call_id,
336                                      pfc_flags=pfc_flags,
337                                      ctx_list=ctx_list,
338                                      assoc_group_id=assoc_group_id,
339                                      auth_info=auth_info)
340             self.send_pdu(req)
341             rep = self.recv_pdu()
342             if nak_reason is not None:
343                 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
344                                 auth_length=0)
345                 self.assertEquals(rep.u.reject_reason, nak_reason)
346                 self.assertEquals(rep.u.num_versions, 1)
347                 self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
348                 self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
349                 self.assertPadding(rep.u._pad, 3)
350                 return
351             self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
352                             pfc_flags=pfc_flags)
353             self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
354             self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
355             if assoc_group_id != 0:
356                 self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
357             else:
358                 self.assertNotEquals(rep.u.assoc_group_id, 0)
359                 assoc_group_id = rep.u.assoc_group_id
360             sda_str = self.secondary_address
361             sda_len = len(sda_str) + 1
362             mod_len = (2 + sda_len) % 4
363             if mod_len != 0:
364                 sda_pad = 4 - mod_len
365             else:
366                 sda_pad = 0
367             self.assertEquals(rep.u.secondary_address_size, sda_len)
368             self.assertEquals(rep.u.secondary_address, sda_str)
369             self.assertPadding(rep.u._pad1, sda_pad)
370
371         self.assertEquals(rep.u.num_results, 1)
372         self.assertEquals(rep.u.ctx_list[0].result,
373                           samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
374         self.assertEquals(rep.u.ctx_list[0].reason,
375                           samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
376         self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
377         ack = rep
378         if auth_context is None:
379             self.assertEquals(rep.auth_length, 0)
380             self.assertEquals(len(rep.u.auth_info), 0)
381             return ack
382         self.assertNotEquals(rep.auth_length, 0)
383         self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
384         self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
385
386         a = self.parse_auth(rep.u.auth_info, auth_context=auth_context)
387
388         from_server = a.credentials
389         (finished, to_server) = auth_context["gensec"].update(from_server)
390         if expect_3legs:
391             self.assertTrue(finished)
392             if auth_context['hdr_signing']:
393                 auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER)
394         else:
395             self.assertFalse(finished)
396
397         auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
398                                        auth_level=auth_context["auth_level"],
399                                        auth_context_id=auth_context["auth_context_id"],
400                                        auth_blob=to_server)
401         req = self.generate_alter(call_id=call_id,
402                                   ctx_list=ctx_list,
403                                   assoc_group_id=0xffffffff - assoc_group_id,
404                                   auth_info=auth_info)
405         self.send_pdu(req)
406         rep = self.recv_pdu()
407         if alter_fault is not None:
408             self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
409                             pfc_flags=req.pfc_flags |
410                             samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
411                             auth_length=0)
412             self.assertNotEquals(rep.u.alloc_hint, 0)
413             self.assertEquals(rep.u.context_id, 0)
414             self.assertEquals(rep.u.cancel_count, 0)
415             self.assertEquals(rep.u.flags, 0)
416             self.assertEquals(rep.u.status, alter_fault)
417             self.assertEquals(rep.u.reserved, 0)
418             self.assertEquals(len(rep.u.error_and_verifier), 0)
419             return None
420         self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id)
421         self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
422         self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
423         self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
424         self.assertEquals(rep.u.secondary_address_size, 0)
425         self.assertEquals(rep.u.secondary_address, '')
426         self.assertPadding(rep.u._pad1, 2)
427         self.assertEquals(rep.u.num_results, 1)
428         self.assertEquals(rep.u.ctx_list[0].result,
429                           samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
430         self.assertEquals(rep.u.ctx_list[0].reason,
431                           samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
432         self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
433         if finished:
434             self.assertEquals(rep.auth_length, 0)
435         else:
436             self.assertNotEquals(rep.auth_length, 0)
437         self.assertGreaterEqual(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
438         self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
439
440         a = self.parse_auth(rep.u.auth_info, auth_context=auth_context)
441
442         if finished:
443             return ack
444
445         from_server = a.credentials
446         (finished, to_server) = auth_context["gensec"].update(from_server)
447         self.assertTrue(finished)
448         if auth_context['hdr_signing']:
449             auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER)
450
451         return ack
452
453     def prepare_presentation(self, abstract, transfer, object=None,
454                              context_id=0xffff, epmap=False, auth_context=None,
455                              pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
456                              samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
457                              assoc_group_id=0,
458                              return_ack=False):
459         if epmap:
460             self.epmap_reconnect(abstract, transfer=transfer, object=object)
461
462         tsf1_list = [transfer]
463         ctx = samba.dcerpc.dcerpc.ctx_list()
464         ctx.context_id = context_id
465         ctx.num_transfer_syntaxes = len(tsf1_list)
466         ctx.abstract_syntax = abstract
467         ctx.transfer_syntaxes = tsf1_list
468
469         ack = self.do_generic_bind(ctx=ctx,
470                                    auth_context=auth_context,
471                                    pfc_flags=pfc_flags,
472                                    assoc_group_id=assoc_group_id)
473         if ack is None:
474             ctx = None
475
476         if return_ack:
477             return (ctx, ack)
478         return ctx
479
480     def do_single_request(self, call_id, ctx, io,
481                           auth_context=None,
482                           object=None,
483                           bigendian=False, ndr64=False,
484                           allow_remaining=False,
485                           send_req=True,
486                           recv_rep=True,
487                           fault_pfc_flags=(
488                               samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
489                               samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
490                           fault_status=None,
491                           fault_context_id=None,
492                           timeout=None,
493                           ndr_print=None,
494                           hexdump=None):
495
496         if fault_context_id is None:
497             fault_context_id = ctx.context_id
498
499         if ndr_print is None:
500             ndr_print = self.do_ndr_print
501         if hexdump is None:
502             hexdump = self.do_hexdump
503
504         if send_req:
505             if ndr_print:
506                 sys.stderr.write("in: %s" % samba.ndr.ndr_print_in(io))
507             stub_in = samba.ndr.ndr_pack_in(io, bigendian=bigendian, ndr64=ndr64)
508             if hexdump:
509                 sys.stderr.write("stub_in: %d\n%s" % (len(stub_in), self.hexdump(stub_in)))
510
511         pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
512         pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
513         if object is not None:
514             pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_OBJECT_UUID
515
516         req = self.generate_request_auth(call_id=call_id,
517                                          context_id=ctx.context_id,
518                                          pfc_flags=pfc_flags,
519                                          object=object,
520                                          opnum=io.opnum(),
521                                          stub=stub_in,
522                                          auth_context=auth_context)
523         if send_req:
524             self.send_pdu(req, ndr_print=ndr_print, hexdump=hexdump)
525         if recv_rep:
526             (rep, rep_blob) = self.recv_pdu_raw(timeout=timeout,
527                                                 ndr_print=ndr_print,
528                                                 hexdump=hexdump)
529             if fault_status:
530                 self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
531                                 pfc_flags=fault_pfc_flags, auth_length=0)
532                 self.assertNotEquals(rep.u.alloc_hint, 0)
533                 self.assertEquals(rep.u.context_id, fault_context_id)
534                 self.assertEquals(rep.u.cancel_count, 0)
535                 self.assertEquals(rep.u.flags, 0)
536                 self.assertEquals(rep.u.status, fault_status)
537                 self.assertEquals(rep.u.reserved, 0)
538                 self.assertEquals(len(rep.u.error_and_verifier), 0)
539                 return
540
541             expected_auth_length = 0
542             if auth_context is not None and \
543                auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PACKET:
544                 expected_auth_length = req.auth_length
545
546             self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
547                             auth_length=expected_auth_length)
548             self.assertNotEquals(rep.u.alloc_hint, 0)
549             self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
550             self.assertEquals(rep.u.cancel_count, 0)
551             self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
552             stub_out = self.check_response_auth(rep, rep_blob, auth_context)
553             self.assertEqual(len(stub_out), rep.u.alloc_hint)
554
555             if hexdump:
556                 sys.stderr.write("stub_out: %d\n%s" % (len(stub_out), self.hexdump(stub_out)))
557             ndr_unpack_out(io, stub_out, bigendian=bigendian, ndr64=ndr64,
558                            allow_remaining=allow_remaining)
559             if ndr_print:
560                 sys.stderr.write("out: %s" % samba.ndr.ndr_print_out(io))
561
562     def epmap_reconnect(self, abstract, transfer=None, object=None):
563         ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
564
565         if transfer is None:
566             transfer = ndr32
567
568         if object is None:
569             object = samba.dcerpc.misc.GUID()
570
571         ctx = self.prepare_presentation(samba.dcerpc.epmapper.abstract_syntax(),
572                                         transfer, context_id=0)
573
574         data1 = ndr_pack(abstract)
575         lhs1 = samba.dcerpc.epmapper.epm_lhs()
576         lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
577         lhs1.lhs_data = data1[:18]
578         rhs1 = samba.dcerpc.epmapper.epm_rhs_uuid()
579         rhs1.unknown = data1[18:]
580         floor1 = samba.dcerpc.epmapper.epm_floor()
581         floor1.lhs = lhs1
582         floor1.rhs = rhs1
583         data2 = ndr_pack(transfer)
584         lhs2 = samba.dcerpc.epmapper.epm_lhs()
585         lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
586         lhs2.lhs_data = data2[:18]
587         rhs2 = samba.dcerpc.epmapper.epm_rhs_uuid()
588         rhs2.unknown = data1[18:]
589         floor2 = samba.dcerpc.epmapper.epm_floor()
590         floor2.lhs = lhs2
591         floor2.rhs = rhs2
592         lhs3 = samba.dcerpc.epmapper.epm_lhs()
593         lhs3.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_NCACN
594         lhs3.lhs_data = b""
595         floor3 = samba.dcerpc.epmapper.epm_floor()
596         floor3.lhs = lhs3
597         floor3.rhs.minor_version = 0
598         lhs4 = samba.dcerpc.epmapper.epm_lhs()
599         lhs4.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_TCP
600         lhs4.lhs_data = b""
601         floor4 = samba.dcerpc.epmapper.epm_floor()
602         floor4.lhs = lhs4
603         floor4.rhs.port = int(self.primary_address)
604         lhs5 = samba.dcerpc.epmapper.epm_lhs()
605         lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP
606         lhs5.lhs_data = b""
607         floor5 = samba.dcerpc.epmapper.epm_floor()
608         floor5.lhs = lhs5
609         floor5.rhs.ipaddr = "0.0.0.0"
610
611         floors = [floor1, floor2, floor3, floor4, floor5]
612         req_tower = samba.dcerpc.epmapper.epm_tower()
613         req_tower.num_floors = len(floors)
614         req_tower.floors = floors
615         req_twr = samba.dcerpc.epmapper.epm_twr_t()
616         req_twr.tower = req_tower
617
618         epm_map = samba.dcerpc.epmapper.epm_Map()
619         epm_map.in_object = object
620         epm_map.in_map_tower = req_twr
621         epm_map.in_entry_handle = samba.dcerpc.misc.policy_handle()
622         epm_map.in_max_towers = 4
623
624         self.do_single_request(call_id=2, ctx=ctx, io=epm_map)
625
626         self.assertGreaterEqual(epm_map.out_num_towers, 1)
627         rep_twr = epm_map.out_towers[0].twr
628         self.assertIsNotNone(rep_twr)
629         self.assertEqual(rep_twr.tower_length, 75)
630         self.assertEqual(rep_twr.tower.num_floors, 5)
631         self.assertEqual(len(rep_twr.tower.floors), 5)
632         self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
633                          samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
634         self.assertEqual(rep_twr.tower.floors[3].lhs.protocol,
635                          samba.dcerpc.epmapper.EPM_PROTOCOL_TCP)
636
637         # reconnect to the given port
638         self._disconnect("epmap_reconnect")
639         self.primary_address = "%d" % rep_twr.tower.floors[3].rhs.port
640         self.secondary_address = None
641         self.connect()
642
643     def send_pdu(self, req, ndr_print=None, hexdump=None):
644         if ndr_print is None:
645             ndr_print = self.do_ndr_print
646         if hexdump is None:
647             hexdump = self.do_hexdump
648         try:
649             req_pdu = ndr_pack(req)
650             if ndr_print:
651                 sys.stderr.write("send_pdu: %s" % samba.ndr.ndr_print(req))
652             if hexdump:
653                 sys.stderr.write("send_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
654             while True:
655                 sent = self.s.send(req_pdu, 0)
656                 if sent == len(req_pdu):
657                     break
658                 req_pdu = req_pdu[sent:]
659         except socket.error as e:
660             self._disconnect("send_pdu: %s" % e)
661             raise
662         except IOError as e:
663             self._disconnect("send_pdu: %s" % e)
664             raise
665         except NTSTATUSError as e:
666             self._disconnect("send_pdu: %s" % e)
667             raise
668         finally:
669             pass
670
671     def recv_raw(self, hexdump=None, timeout=None):
672         rep_pdu = None
673         if hexdump is None:
674             hexdump = self.do_hexdump
675         try:
676             if timeout is not None:
677                 self.s.settimeout(timeout)
678             rep_pdu = self.s.recv(0xffff, 0)
679             self.s.settimeout(10)
680             if len(rep_pdu) == 0:
681                 self._disconnect("recv_raw: EOF")
682                 return None
683             if hexdump:
684                 sys.stderr.write("recv_raw: %d\n%s" % (len(rep_pdu), self.hexdump(rep_pdu)))
685         except socket.timeout as e:
686             self.s.settimeout(10)
687             sys.stderr.write("recv_raw: TIMEOUT\n")
688             pass
689         except socket.error as e:
690             self._disconnect("recv_raw: %s" % e)
691             raise
692         except IOError as e:
693             self._disconnect("recv_raw: %s" % e)
694             raise
695         finally:
696             pass
697         return rep_pdu
698
699     def recv_pdu_raw(self, ndr_print=None, hexdump=None, timeout=None):
700         rep_pdu = None
701         rep = None
702         if ndr_print is None:
703             ndr_print = self.do_ndr_print
704         if hexdump is None:
705             hexdump = self.do_hexdump
706         try:
707             rep_pdu = self.recv_raw(hexdump=hexdump, timeout=timeout)
708             if rep_pdu is None:
709                 return (None, None)
710             rep = ndr_unpack(samba.dcerpc.dcerpc.ncacn_packet, rep_pdu, allow_remaining=True)
711             if ndr_print:
712                 sys.stderr.write("recv_pdu: %s" % samba.ndr.ndr_print(rep))
713             self.assertEqual(rep.frag_length, len(rep_pdu))
714         finally:
715             pass
716         return (rep, rep_pdu)
717
718     def recv_pdu(self, ndr_print=None, hexdump=None, timeout=None):
719         (rep, rep_pdu) = self.recv_pdu_raw(ndr_print=ndr_print,
720                                            hexdump=hexdump,
721                                            timeout=timeout)
722         return rep
723
724     def generate_auth(self,
725                       auth_type=None,
726                       auth_level=None,
727                       auth_pad_length=0,
728                       auth_context_id=None,
729                       auth_blob=None,
730                       ndr_print=None, hexdump=None):
731         if ndr_print is None:
732             ndr_print = self.do_ndr_print
733         if hexdump is None:
734             hexdump = self.do_hexdump
735
736         if auth_type is not None:
737             a = samba.dcerpc.dcerpc.auth()
738             a.auth_type = auth_type
739             a.auth_level = auth_level
740             a.auth_pad_length = auth_pad_length
741             a.auth_context_id = auth_context_id
742             a.credentials = auth_blob
743
744             ai = ndr_pack(a)
745             if ndr_print:
746                 sys.stderr.write("generate_auth: %s" % samba.ndr.ndr_print(a))
747             if hexdump:
748                 sys.stderr.write("generate_auth: %d\n%s" % (len(ai), self.hexdump(ai)))
749         else:
750             ai = b""
751
752         return ai
753
754     def parse_auth(self, auth_info, ndr_print=None, hexdump=None,
755                    auth_context=None, stub_len=0):
756         if ndr_print is None:
757             ndr_print = self.do_ndr_print
758         if hexdump is None:
759             hexdump = self.do_hexdump
760
761         if (len(auth_info) <= samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH):
762             return None
763
764         if hexdump:
765             sys.stderr.write("parse_auth: %d\n%s" % (len(auth_info), self.hexdump(auth_info)))
766         a = ndr_unpack(samba.dcerpc.dcerpc.auth, auth_info, allow_remaining=True)
767         if ndr_print:
768             sys.stderr.write("parse_auth: %s" % samba.ndr.ndr_print(a))
769
770         if auth_context is not None:
771             self.assertEquals(a.auth_type, auth_context["auth_type"])
772             self.assertEquals(a.auth_level, auth_context["auth_level"])
773             self.assertEquals(a.auth_reserved, 0)
774             self.assertEquals(a.auth_context_id, auth_context["auth_context_id"])
775
776             self.assertLessEqual(a.auth_pad_length, dcerpc.DCERPC_AUTH_PAD_ALIGNMENT)
777             self.assertLessEqual(a.auth_pad_length, stub_len)
778
779         return a
780
781     def check_response_auth(self, rep, rep_blob, auth_context=None,
782                             auth_pad_length=None):
783
784         if auth_context is None:
785             self.assertEquals(rep.auth_length, 0)
786             return rep.u.stub_and_verifier
787
788         if auth_context["auth_level"] == dcerpc.DCERPC_AUTH_LEVEL_CONNECT:
789             self.assertEquals(rep.auth_length, 0)
790             return rep.u.stub_and_verifier
791
792         self.assertGreater(rep.auth_length, 0)
793
794         ofs_stub = dcerpc.DCERPC_REQUEST_LENGTH
795         ofs_sig = rep.frag_length - rep.auth_length
796         ofs_trailer = ofs_sig - dcerpc.DCERPC_AUTH_TRAILER_LENGTH
797         rep_data = rep_blob[ofs_stub:ofs_trailer]
798         rep_whole = rep_blob[0:ofs_sig]
799         rep_sig = rep_blob[ofs_sig:]
800         rep_auth_info_blob = rep_blob[ofs_trailer:]
801
802         rep_auth_info = self.parse_auth(rep_auth_info_blob,
803                                         auth_context=auth_context,
804                                         stub_len=len(rep_data))
805         if auth_pad_length is not None:
806             self.assertEquals(rep_auth_info.auth_pad_length, auth_pad_length)
807         self.assertEquals(rep_auth_info.credentials, rep_sig)
808
809         if auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PRIVACY:
810             # TODO: not yet supported here
811             self.assertTrue(False)
812         elif auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PACKET:
813             auth_context["gensec"].check_packet(rep_data, rep_whole, rep_sig)
814
815         stub_out = rep_data[0:len(rep_data)-rep_auth_info.auth_pad_length]
816
817         return stub_out
818
819     def generate_pdu(self, ptype, call_id, payload,
820                      rpc_vers=5,
821                      rpc_vers_minor=0,
822                      pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
823                                 samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
824                      drep=[samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
825                      ndr_print=None, hexdump=None):
826
827         if getattr(payload, 'auth_info', None):
828             ai = payload.auth_info
829         else:
830             ai = b""
831
832         p = samba.dcerpc.dcerpc.ncacn_packet()
833         p.rpc_vers = rpc_vers
834         p.rpc_vers_minor = rpc_vers_minor
835         p.ptype = ptype
836         p.pfc_flags = pfc_flags
837         p.drep = drep
838         p.frag_length = 0
839         if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
840             p.auth_length = len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
841         else:
842             p.auth_length = 0
843         p.call_id = call_id
844         p.u = payload
845
846         pdu = ndr_pack(p)
847         p.frag_length = len(pdu)
848
849         return p
850
851     def generate_request_auth(self, call_id,
852                               pfc_flags=(dcerpc.DCERPC_PFC_FLAG_FIRST |
853                                          dcerpc.DCERPC_PFC_FLAG_LAST),
854                               alloc_hint=None,
855                               context_id=None,
856                               opnum=None,
857                               object=None,
858                               stub=None,
859                               auth_context=None,
860                               ndr_print=None, hexdump=None):
861
862         if stub is None:
863             stub = b""
864
865         sig_size = 0
866         if auth_context is not None:
867             mod_len = len(stub) % dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
868             auth_pad_length = 0
869             if mod_len > 0:
870                 auth_pad_length = dcerpc.DCERPC_AUTH_PAD_ALIGNMENT - mod_len
871             stub += b'\x00' * auth_pad_length
872
873             if auth_context["g_auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
874                 sig_size = auth_context["gensec"].sig_size(len(stub))
875             else:
876                 sig_size = 16
877
878             zero_sig = b"\x00" * sig_size
879             auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
880                                            auth_level=auth_context["auth_level"],
881                                            auth_pad_length=auth_pad_length,
882                                            auth_context_id=auth_context["auth_context_id"],
883                                            auth_blob=zero_sig)
884         else:
885             auth_info = b""
886
887         req = self.generate_request(call_id=call_id,
888                                     pfc_flags=pfc_flags,
889                                     alloc_hint=alloc_hint,
890                                     context_id=context_id,
891                                     opnum=opnum,
892                                     object=object,
893                                     stub=stub,
894                                     auth_info=auth_info,
895                                     ndr_print=ndr_print,
896                                     hexdump=hexdump)
897         if auth_context is None:
898             return req
899
900         req_blob = samba.ndr.ndr_pack(req)
901         ofs_stub = dcerpc.DCERPC_REQUEST_LENGTH
902         ofs_sig = len(req_blob) - req.auth_length
903         ofs_trailer = ofs_sig - dcerpc.DCERPC_AUTH_TRAILER_LENGTH
904         req_data = req_blob[ofs_stub:ofs_trailer]
905         req_whole = req_blob[0:ofs_sig]
906
907         if auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PRIVACY:
908             # TODO: not yet supported here
909             self.assertTrue(False)
910         elif auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_PACKET:
911             req_sig = auth_context["gensec"].sign_packet(req_data, req_whole)
912         elif auth_context["auth_level"] >= dcerpc.DCERPC_AUTH_LEVEL_CONNECT:
913             self.assertEquals(auth_context["auth_type"],
914                               dcerpc.DCERPC_AUTH_TYPE_NTLMSSP)
915             req_sig = b"\x01" +b"\x00" *15
916         else:
917             return req
918         self.assertEquals(len(req_sig), req.auth_length)
919         self.assertEquals(len(req_sig), sig_size)
920
921         stub_sig_ofs = len(req.u.stub_and_verifier) - sig_size
922         stub = req.u.stub_and_verifier[0:stub_sig_ofs] + req_sig
923         req.u.stub_and_verifier = stub
924
925         return req
926
927     def verify_pdu(self, p, ptype, call_id,
928                    rpc_vers=5,
929                    rpc_vers_minor=0,
930                    pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
931                               samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
932                    drep=[samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0],
933                    auth_length=None):
934
935         self.assertIsNotNone(p, "No valid pdu")
936
937         if getattr(p.u, 'auth_info', None):
938             ai = p.u.auth_info
939         else:
940             ai = b""
941
942         self.assertEqual(p.rpc_vers, rpc_vers)
943         self.assertEqual(p.rpc_vers_minor, rpc_vers_minor)
944         self.assertEqual(p.ptype, ptype)
945         self.assertEqual(p.pfc_flags, pfc_flags)
946         self.assertEqual(p.drep, drep)
947         self.assertGreaterEqual(p.frag_length,
948                                 samba.dcerpc.dcerpc.DCERPC_NCACN_PAYLOAD_OFFSET)
949         if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
950             self.assertEqual(p.auth_length,
951                              len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
952         elif auth_length is not None:
953             self.assertEqual(p.auth_length, auth_length)
954         else:
955             self.assertEqual(p.auth_length, 0)
956         self.assertEqual(p.call_id, call_id)
957
958         return
959
960     def generate_bind(self, call_id,
961                       pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
962                                  samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
963                       max_xmit_frag=None,
964                       max_recv_frag=None,
965                       assoc_group_id=0,
966                       ctx_list=[],
967                       auth_info=b"",
968                       ndr_print=None, hexdump=None):
969
970         if max_xmit_frag is None:
971             max_xmit_frag=self.max_xmit_frag
972         if max_recv_frag is None:
973             max_recv_frag=self.max_recv_frag
974
975         b = samba.dcerpc.dcerpc.bind()
976         b.max_xmit_frag = max_xmit_frag
977         b.max_recv_frag = max_recv_frag
978         b.assoc_group_id = assoc_group_id
979         b.num_contexts = len(ctx_list)
980         b.ctx_list = ctx_list
981         b.auth_info = auth_info
982
983         p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_BIND,
984                               pfc_flags=pfc_flags,
985                               call_id=call_id,
986                               payload=b,
987                               ndr_print=ndr_print, hexdump=hexdump)
988
989         return p
990
991     def generate_alter(self, call_id,
992                        pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
993                                   samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
994                        max_xmit_frag=None,
995                        max_recv_frag=None,
996                        assoc_group_id=0,
997                        ctx_list=[],
998                        auth_info=b"",
999                        ndr_print=None, hexdump=None):
1000
1001         if max_xmit_frag is None:
1002             max_xmit_frag=self.max_xmit_frag
1003         if max_recv_frag is None:
1004             max_recv_frag=self.max_recv_frag
1005
1006         a = samba.dcerpc.dcerpc.bind()
1007         a.max_xmit_frag = max_xmit_frag
1008         a.max_recv_frag = max_recv_frag
1009         a.assoc_group_id = assoc_group_id
1010         a.num_contexts = len(ctx_list)
1011         a.ctx_list = ctx_list
1012         a.auth_info = auth_info
1013
1014         p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ALTER,
1015                               pfc_flags=pfc_flags,
1016                               call_id=call_id,
1017                               payload=a,
1018                               ndr_print=ndr_print, hexdump=hexdump)
1019
1020         return p
1021
1022     def generate_auth3(self, call_id,
1023                        pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
1024                                   samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
1025                        auth_info=b"",
1026                        ndr_print=None, hexdump=None):
1027
1028         a = samba.dcerpc.dcerpc.auth3()
1029         a.auth_info = auth_info
1030
1031         p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_AUTH3,
1032                               pfc_flags=pfc_flags,
1033                               call_id=call_id,
1034                               payload=a,
1035                               ndr_print=ndr_print, hexdump=hexdump)
1036
1037         return p
1038
1039     def generate_request(self, call_id,
1040                          pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
1041                                     samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
1042                          alloc_hint=None,
1043                          context_id=None,
1044                          opnum=None,
1045                          object=None,
1046                          stub=None,
1047                          auth_info=b"",
1048                          ndr_print=None, hexdump=None):
1049
1050         if alloc_hint is None:
1051             alloc_hint = len(stub)
1052
1053         r = samba.dcerpc.dcerpc.request()
1054         r.alloc_hint = alloc_hint
1055         r.context_id = context_id
1056         r.opnum = opnum
1057         if object is not None:
1058             r.object = object
1059         r.stub_and_verifier = stub + auth_info
1060
1061         p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_REQUEST,
1062                               pfc_flags=pfc_flags,
1063                               call_id=call_id,
1064                               payload=r,
1065                               ndr_print=ndr_print, hexdump=hexdump)
1066
1067         if len(auth_info) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH:
1068             p.auth_length = len(auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
1069
1070         return p
1071
1072     def generate_co_cancel(self, call_id,
1073                            pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
1074                                       samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
1075                            auth_info=b"",
1076                            ndr_print=None, hexdump=None):
1077
1078         c = samba.dcerpc.dcerpc.co_cancel()
1079         c.auth_info = auth_info
1080
1081         p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_CO_CANCEL,
1082                               pfc_flags=pfc_flags,
1083                               call_id=call_id,
1084                               payload=c,
1085                               ndr_print=ndr_print, hexdump=hexdump)
1086
1087         return p
1088
1089     def generate_orphaned(self, call_id,
1090                           pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
1091                                      samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
1092                           auth_info=b"",
1093                           ndr_print=None, hexdump=None):
1094
1095         o = samba.dcerpc.dcerpc.orphaned()
1096         o.auth_info = auth_info
1097
1098         p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ORPHANED,
1099                               pfc_flags=pfc_flags,
1100                               call_id=call_id,
1101                               payload=o,
1102                               ndr_print=ndr_print, hexdump=hexdump)
1103
1104         return p
1105
1106     def generate_shutdown(self, call_id,
1107                           pfc_flags=(samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
1108                                      samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST),
1109                           ndr_print=None, hexdump=None):
1110
1111         s = samba.dcerpc.dcerpc.shutdown()
1112
1113         p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_SHUTDOWN,
1114                               pfc_flags=pfc_flags,
1115                               call_id=call_id,
1116                               payload=s,
1117                               ndr_print=ndr_print, hexdump=hexdump)
1118
1119         return p
1120
1121     def assertIsConnected(self):
1122         self.assertIsNotNone(self.s, msg="Not connected")
1123         return
1124
1125     def assertNotConnected(self):
1126         self.assertIsNone(self.s, msg="Is connected")
1127         return
1128
1129     def assertNDRSyntaxEquals(self, s1, s2):
1130         self.assertEqual(s1.uuid, s2.uuid)
1131         self.assertEqual(s1.if_version, s2.if_version)
1132         return
1133
1134     def assertPadding(self, pad, length):
1135         self.assertEquals(len(pad), length)
1136         #
1137         # sometimes windows sends random bytes
1138         #
1139         # we have IGNORE_RANDOM_PAD=1 to
1140         # disable the check
1141         #
1142         if self.ignore_random_pad:
1143             return
1144         zero_pad = b'\0' * length
1145         self.assertEquals(pad, zero_pad)