python:samba:tests: Fix code spelling
[samba.git] / python / samba / tests / ldap_raw.py
1 # Integration tests for the ldap server, using raw socket IO
2 #
3 # Tests for handling of malformed or large packets.
4 #
5 # Copyright (C) Catalyst.Net Ltd 2020
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20
21 import socket
22 import ssl
23
24 import samba.tests
25 from samba.tests import TestCase
26
27
28 #
29 # LDAP Operations
30 #
31 DELETE = b'\x4a'
32 DELETE_RES = b'\x6b'
33
34 # Bind
35 BIND = b'\x60'
36 BIND_RES = b'\x61'
37 SIMPLE_AUTH = b'\x80'
38 SASL_AUTH = b'\xa3'
39
40 # Search
41 SEARCH = b'\x63'
42 SEARCH_RES = b'\x64'
43 EQUALS = b'\xa3'
44
45
46 #
47 # LDAP response codes.
48 #
49 SUCCESS = b'\x00'
50 OPERATIONS_ERROR = b'\x01'
51 INVALID_CREDENTIALS = b'\x31'
52 INVALID_DN_SYNTAX = b'\x22'
53
54 #
55 # ASN.1 Element types
56 #
57 BOOLEAN = b'\x01'
58 INTEGER = b'\x02'
59 OCTET_STRING = b'\x04'
60 NULL = b'\x05'
61 ENUMERATED = b'\x0a'
62 SEQUENCE = b'\x30'
63 SET = b'\x31'
64
65
66 #
67 # ASN.1 Helper functions.
68 #
69 def encode_element(ber_type, data):
70     ''' Encode an ASN.1 BER element. '''
71     if data is None:
72         return ber_type + encode_length(0)
73     return ber_type + encode_length(len(data)) + data
74
75
76 def encode_length(length):
77     ''' Encode the length of an ASN.1 BER element.  '''
78
79     if length > 0xFFFFFF:
80         return b'\x84' + length.to_bytes(4, "big")
81     if length > 0xFFFF:
82         return b'\x83' + length.to_bytes(3, "big")
83     if length > 0xFF:
84         return b'\x82' + length.to_bytes(2, "big")
85     if length > 0x7F:
86         return b'\x81' + length.to_bytes(1, "big")
87     return length.to_bytes(1, "big")
88
89
90 def encode_string(string):
91     ''' Encode an octet string '''
92     return encode_element(OCTET_STRING, string)
93
94
95 def encode_boolean(boolean):
96     ''' Encode a boolean value '''
97     if boolean:
98         return encode_element(BOOLEAN, b'\xFF')
99     return encode_element(BOOLEAN, b'\x00')
100
101
102 def encode_integer(integer):
103     ''' Encode an integer value '''
104     bit_len = integer.bit_length()
105     byte_len = (bit_len // 8) + 1
106     return encode_element(INTEGER, integer.to_bytes(byte_len, "big"))
107
108
109 def encode_enumerated(enum):
110     ''' Encode an enumerated value '''
111     return encode_element(ENUMERATED, enum.to_bytes(1, "big"))
112
113
114 def encode_sequence(sequence):
115     ''' Encode a sequence '''
116     return encode_element(SEQUENCE, sequence)
117
118
119 def decode_element(data):
120     '''
121     decode an ASN.1 element
122     '''
123     if data is None:
124         return None
125
126     if len(data) < 2:
127         return None
128
129     ber_type = data[0:1]
130     enc = int.from_bytes(data[1:2], byteorder='big')
131     if enc & 0x80:
132         l_end = 2 + (enc & ~0x80)
133         length = int.from_bytes(data[2:l_end], byteorder='big')
134         element = data[l_end:l_end + length]
135         rest = data[l_end + length:]
136     else:
137         length = enc
138         element = data[2:2 + length]
139         rest = data[2 + length:]
140
141     return (ber_type, length, element, rest)
142
143
144 class RawLdapTest(TestCase):
145     """
146     A raw Ldap Test case.
147     The ldap connections are made over https on port 636
148
149     Uses the following environment variables:
150         SERVER
151         USERNAME
152         PASSWORD
153         DNSNAME
154     """
155
156     def setUp(self):
157         super(RawLdapTest, self).setUp()
158
159         self.host = samba.tests.env_get_var_value('SERVER')
160         self.port = 636
161         self.socket = None
162         self.user = samba.tests.env_get_var_value('USERNAME')
163         self.password = samba.tests.env_get_var_value('PASSWORD')
164         self.dns_name = samba.tests.env_get_var_value('DNSNAME')
165         self.connect()
166
167     def tearDown(self):
168         self.disconnect()
169         super(RawLdapTest, self).tearDown()
170
171     def disconnect(self):
172         ''' Disconnect from and clean up the connection to the server '''
173         if self.socket is None:
174             return
175         self.socket.close()
176         self.socket = None
177
178     def connect(self):
179         ''' Establish an ldaps connection to the test server '''
180         #
181         # Disable host name and certificate verification
182         context = ssl.create_default_context()
183         context.check_hostname = False
184         context.verify_mode = ssl.CERT_NONE
185
186         sock = None
187         try:
188             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
189             sock.settimeout(10)
190             sock.connect((self.host, self.port))
191             self.socket = context.wrap_socket(sock, server_hostname=self.host)
192         except socket.error:
193             sock.close()
194             if self.socket is not None:
195                 self.socket.close()
196             raise
197
198     def send(self, req):
199         ''' Send the request to the server '''
200         try:
201             self.socket.sendall(req)
202         except socket.error:
203             self.disconnect()
204             raise
205
206     def recv(self, num_recv=0xffff, timeout=None):
207         ''' receive an array of bytes from the server '''
208         data = None
209         try:
210             if timeout is not None:
211                 self.socket.settimeout(timeout)
212             data = self.socket.recv(num_recv, 0)
213             self.socket.settimeout(10)
214             if len(data) == 0:
215                 self.disconnect()
216                 return None
217         except socket.timeout:
218             # We ignore timeout's as the ldap server will drop the connection
219             # on the errors we're testing. So returning None on a timeout is
220             # the desired behaviour.
221             self.socket.settimeout(10)
222         except socket.error:
223             self.disconnect()
224             raise
225         return data
226
227     def bind(self):
228         '''
229             Perform a simple bind
230         '''
231
232         user = self.user.encode('UTF8')
233         ou = self.dns_name.replace('.', ',dc=').encode('UTF8')
234         dn = b'cn=' + user + b',cn=users,dc=' + ou
235
236         password = self.password.encode('UTF8')
237
238         # Lets build an simple bind request
239         bind = encode_integer(3)                  # ldap version
240         bind += encode_string(dn)
241         bind += encode_element(SIMPLE_AUTH, password)
242
243         bind_op = encode_element(BIND, bind)
244
245         msg_no = encode_integer(1)
246         packet = encode_sequence(msg_no + bind_op)
247
248         self.send(packet)
249         data = self.recv()
250         self.assertIsNotNone(data)
251
252         #
253         # Decode and validate the response
254
255         # Should be a sequence
256         (ber_type, length, element, rest) = decode_element(data)
257         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
258         self.assertTrue(length > 0)
259         self.assertEqual(0, len(rest))
260
261         # message id should be 1
262         (ber_type, length, element, rest) = decode_element(element)
263         self.assertEqual(INTEGER.hex(), ber_type.hex())
264         msg_no = int.from_bytes(element, byteorder='big')
265         self.assertEqual(1, msg_no)
266         self.assertGreater(len(rest), 0)
267
268         # Should have a Bind response element
269         (ber_type, length, element, rest) = decode_element(rest)
270         self.assertEqual(BIND_RES.hex(), ber_type.hex())
271         self.assertEqual(0, len(rest))
272
273         # Check the response code
274         (ber_type, length, element, rest) = decode_element(element)
275         self.assertEqual(ENUMERATED.hex(), ber_type.hex())
276         self.assertEqual(SUCCESS.hex(), element.hex())
277         self.assertGreater(len(rest), 0)
278
279     def test_decode_element(self):
280         ''' Tests for the decode_element method '''
281
282         # Boolean true value
283         data = b'\x01\x01\xff'
284         (ber_type, length, element, rest) = decode_element(data)
285         self.assertEqual(BOOLEAN.hex(), ber_type.hex())
286         self.assertEqual(1, length)
287         self.assertEqual(b'\xff'.hex(), element.hex())
288         self.assertEqual(0, len(rest))
289
290         # Boolean false value
291         data = b'\x01\x01\x00'
292         (ber_type, length, element, rest) = decode_element(data)
293         self.assertEqual(BOOLEAN.hex(), ber_type.hex())
294         self.assertEqual(1, length)
295         self.assertEqual(b'\x00'.hex(), element.hex())
296         self.assertEqual(0, len(rest))
297
298         # Boolean true value with trailing data
299         data = b'\x01\x01\xff\x05\x00'
300         (ber_type, length, element, rest) = decode_element(data)
301         self.assertEqual(BOOLEAN.hex(), ber_type.hex())
302         self.assertEqual(1, length)
303         self.assertEqual(b'\xff'.hex(), element.hex())
304         self.assertEqual(b'\x05\x00'.hex(), rest.hex())
305
306         # Octet string byte length encoding
307         data = b'\x04\x02\xca\xfe\x05\x00'
308         (ber_type, length, element, rest) = decode_element(data)
309         self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
310         self.assertEqual(2, length)
311         self.assertEqual(b'\xca\xfe'.hex(), element.hex())
312         self.assertEqual(b'\x05\x00'.hex(), rest.hex())
313
314         # Octet string 81 byte length encoding
315         data = b'\x04\x81\x02\xca\xfe\x05\x00'
316         (ber_type, length, element, rest) = decode_element(data)
317         self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
318         self.assertEqual(2, length)
319         self.assertEqual(b'\xca\xfe'.hex(), element.hex())
320         self.assertEqual(b'\x05\x00'.hex(), rest.hex())
321
322         # Octet string 82 byte length encoding
323         data = b'\x04\x82\x00\x02\xca\xfe\x05\x00'
324         (ber_type, length, element, rest) = decode_element(data)
325         self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
326         self.assertEqual(2, length)
327         self.assertEqual(b'\xca\xfe'.hex(), element.hex())
328         self.assertEqual(b'\x05\x00'.hex(), rest.hex())
329
330         # Octet string 85 byte length encoding
331         # For Samba we limit the length encoding to 4 bytes, but it's useful
332         # to be able to decode longer lengths in a test.
333         data = b'\x04\x85\x00\x00\x00\x00\x02\xca\xfe\x05\x00'
334         (ber_type, length, element, rest) = decode_element(data)
335         self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
336         self.assertEqual(2, length)
337         self.assertEqual(b'\xca\xfe'.hex(), element.hex())
338         self.assertEqual(b'\x05\x00'.hex(), rest.hex())
339
340     def test_search_equals_maximum_permitted_size(self):
341         '''
342         Check that an LDAP search request equal to the maximum size is accepted
343         This test is done on a authenticated connection so that the maximum
344         non search request is 16MiB.
345         '''
346         self.bind()
347
348         # Lets build an ldap search packet to query the RootDSE
349         header = encode_string(None)        # Base DN, ""
350         header += encode_enumerated(0)      # Enumeration scope
351         header += encode_enumerated(0)      # Enumeration dereference
352         header += encode_integer(0)         # Integer size limit
353         header += encode_integer(0)         # Integer time limit
354         header += encode_boolean(False)     # Boolean attributes only
355
356         #
357         # build an equality search of the form x...x=y...y
358         # With the length of x...x and y...y chosen to generate an
359         # ldap request of 256000 bytes.
360         x = encode_string(b'x' * 127974)
361         y = encode_string(b'y' * 127979)
362         equals = encode_element(EQUALS, x + y)
363         trailer = encode_sequence(None)
364         search = encode_element(SEARCH, header + equals + trailer)
365
366         msg_no = encode_integer(2)
367         packet = encode_sequence(msg_no + search)
368         #
369         # The length of the packet should be equal to the
370         # Maximum length of a search query
371         self.assertEqual(256000, len(packet))
372
373         self.send(packet)
374         data = self.recv()
375         self.assertIsNotNone(data)
376
377         #
378         # Decode and validate the response
379
380         # Should be a sequence
381         (ber_type, length, element, rest) = decode_element(data)
382         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
383         self.assertTrue(length > 0)
384         self.assertEqual(0, len(rest))
385
386         # message id should be 2
387         (ber_type, length, element, rest) = decode_element(element)
388         self.assertEqual(INTEGER.hex(), ber_type.hex())
389         msg_no = int.from_bytes(element, byteorder='big')
390         self.assertEqual(2, msg_no)
391         self.assertGreater(len(rest), 0)
392
393         # Should have a Search response element
394         (ber_type, length, element, rest) = decode_element(rest)
395         self.assertEqual(SEARCH_RES.hex(), ber_type.hex())
396         self.assertEqual(0, len(rest))
397
398         # Should have an empty matching DN
399         (ber_type, length, element, rest) = decode_element(element)
400         self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
401         self.assertEqual(0, len(element))
402         self.assertGreater(len(rest), 0)
403
404         # Then a sequence of attribute sequences
405         (ber_type, length, element, rest) = decode_element(rest)
406         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
407         self.assertEqual(0, len(rest))
408
409         # Check the first attribute sequence, it should  be
410         # "configurationNamingContext"
411         # The remaining attribute sequences will be ignored but
412         # check that they exist.
413         (ber_type, length, element, rest) = decode_element(element)
414         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
415         # Check that there are remaining attribute sequences.
416         self.assertGreater(len(rest), 0)
417
418         # Check the name of the first attribute
419         (ber_type, length, element, rest) = decode_element(element)
420         self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
421         self.assertGreater(len(rest), 0)
422         self.assertEqual(b'configurationNamingContext', element)
423
424         # And check that there is an attribute value set
425         (ber_type, length, element, rest) = decode_element(rest)
426         self.assertEqual(SET.hex(), ber_type.hex())
427         self.assertGreater(len(element), 0)
428         self.assertEqual(0, len(rest))
429
430     def test_search_exceeds_maximum_permitted_size(self):
431         '''
432         Test that a search query longer than the maximum permitted
433         size is rejected.
434         This test is done on a authenticated connection so that the maximum
435         non search request is 16MiB.
436         '''
437
438         self.bind()
439
440         # Lets build an ldap search packet to query the RootDSE
441         header = encode_string(None)        # Base DN, ""
442         header += encode_enumerated(0)      # Enumeration scope
443         header += encode_enumerated(0)      # Enumeration dereference
444         header += encode_integer(0)         # Integer size limit
445         header += encode_integer(0)         # Integer time limit
446         header += encode_boolean(False)     # Boolean attributes only
447
448         #
449         # build an equality search of the form x...x=y...y
450         # With the length of x...x and y...y chosen to generate an
451         # ldap request of 256001 bytes.
452         x = encode_string(b'x' * 127979)
453         y = encode_string(b'y' * 127975)
454         equals = encode_element(EQUALS, x + y)
455         trailer = encode_sequence(None)
456         search = encode_element(SEARCH, header + equals + trailer)
457
458         msg_no = encode_integer(2)
459         packet = encode_sequence(msg_no + search)
460         #
461         # The length of the sequence data should be one greater than the
462         # Maximum length of a search query
463         self.assertEqual(256001, len(packet))
464
465         self.send(packet)
466         data = self.recv()
467         #
468         # The connection should be closed by the server and we should not
469         # see any data.
470         self.assertIsNone(data)
471
472     def test_simple_anonymous_bind(self):
473         '''
474             Test a simple anonymous bind
475         '''
476
477         # Lets build an anonymous simple bind request
478         bind = encode_integer(3)                  # ldap version
479         bind += encode_string(b'')                # Empty name
480         bind += encode_element(SIMPLE_AUTH, b'')  # Empty password
481
482         bind_op = encode_element(BIND, bind)
483
484         msg_no = encode_integer(1)
485         packet = encode_sequence(msg_no + bind_op)
486
487         self.send(packet)
488         data = self.recv()
489         self.assertIsNotNone(data)
490
491         #
492         # Decode and validate the response
493
494         # Should be a sequence
495         (ber_type, length, element, rest) = decode_element(data)
496         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
497         self.assertTrue(length > 0)
498         self.assertEqual(0, len(rest))
499
500         # message id should be 1
501         (ber_type, length, element, rest) = decode_element(element)
502         self.assertEqual(INTEGER.hex(), ber_type.hex())
503         msg_no = int.from_bytes(element, byteorder='big')
504         self.assertEqual(1, msg_no)
505         self.assertGreater(len(rest), 0)
506
507         # Should have a Bind response element
508         (ber_type, length, element, rest) = decode_element(rest)
509         self.assertEqual(BIND_RES.hex(), ber_type.hex())
510         self.assertEqual(0, len(rest))
511
512         # Check the response code
513         (ber_type, length, element, rest) = decode_element(element)
514         self.assertEqual(ENUMERATED.hex(), ber_type.hex())
515         self.assertEqual(SUCCESS.hex(), element.hex())
516         self.assertGreater(len(rest), 0)
517
518     def test_simple_bind_at_limit(self):
519         '''
520             Test a simple bind, with a large invalid
521             user name. As the resulting packet is equal
522             to the maximum unauthenticated packet size we should see
523             an INVALID_CREDENTIALS response
524         '''
525
526         # Lets build a simple bind request
527         bind = encode_integer(3)                  # ldap version
528         bind += encode_string(b' ' * 255977)      # large name
529         bind += encode_element(SIMPLE_AUTH, b'')  # Empty password
530
531         bind_op = encode_element(BIND, bind)
532
533         msg_no = encode_integer(1)
534         packet = encode_sequence(msg_no + bind_op)
535         #
536         # The length of the sequence data should be equal to the maximum
537         # Unauthenticated packet length
538         self.assertEqual(256000, len(packet))
539
540         self.send(packet)
541         data = self.recv()
542         self.assertIsNotNone(data)
543
544         #
545         # Decode and validate the response
546
547         # Should be a sequence
548         (ber_type, length, element, rest) = decode_element(data)
549         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
550         self.assertTrue(length > 0)
551         self.assertEqual(0, len(rest))
552
553         # message id should be 1
554         (ber_type, length, element, rest) = decode_element(element)
555         self.assertEqual(INTEGER.hex(), ber_type.hex())
556         msg_no = int.from_bytes(element, byteorder='big')
557         self.assertEqual(1, msg_no)
558         self.assertGreater(len(rest), 0)
559
560         # Should have a Bind response element
561         (ber_type, length, element, rest) = decode_element(rest)
562         self.assertEqual(BIND_RES.hex(), ber_type.hex())
563         self.assertEqual(0, len(rest))
564
565         # Check the response code
566         (ber_type, length, element, rest) = decode_element(element)
567         self.assertEqual(ENUMERATED.hex(), ber_type.hex())
568         self.assertEqual(INVALID_CREDENTIALS.hex(), element.hex())
569         self.assertGreater(len(rest), 0)
570
571     def test_simple_bind_gt_limit(self):
572         '''
573             Test a simple bind, with a large invalid
574             user name. As the resulting packet is one greater than
575             the maximum unauthenticated packet size we should see
576             the connection reset.
577         '''
578
579         # Lets build a simple bind request
580         bind = encode_integer(3)                  # ldap version
581         bind += encode_string(b' ' * 255978)      # large name
582         bind += encode_element(SIMPLE_AUTH, b'')  # Empty password
583
584         bind_op = encode_element(BIND, bind)
585
586         msg_no = encode_integer(1)
587         packet = encode_sequence(msg_no + bind_op)
588         #
589         # The length of the sequence data should be equal to the maximum
590         # Unauthenticated packet length
591         self.assertEqual(256001, len(packet))
592
593         self.send(packet)
594         data = self.recv()
595         self.assertIsNone(data)
596
597     def test_unauthenticated_delete_at_limit(self):
598         '''
599             Test a delete, with a large invalid DN
600             As the resulting packet is equal to the maximum unauthenticated
601             packet size we should see an INVALID_DN_SYNTAX response
602         '''
603
604         # Lets build a delete request, with a large invalid DN
605         dn = b' ' * 255987
606         del_op = encode_element(DELETE, dn)
607
608         msg_no = encode_integer(1)
609         packet = encode_sequence(msg_no + del_op)
610         #
611         # The length of the sequence data should be equal to the maximum
612         # Unauthenticated packet length
613         self.assertEqual(256000, len(packet))
614
615         self.send(packet)
616         data = self.recv()
617         self.assertIsNotNone(data)
618
619         #
620         # Decode and validate the response
621
622         # Should be a sequence
623         (ber_type, length, element, rest) = decode_element(data)
624         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
625         self.assertTrue(length > 0)
626         self.assertEqual(0, len(rest))
627
628         # message id should be 1
629         (ber_type, length, element, rest) = decode_element(element)
630         self.assertEqual(INTEGER.hex(), ber_type.hex())
631         msg_no = int.from_bytes(element, byteorder='big')
632         self.assertEqual(1, msg_no)
633         self.assertGreater(len(rest), 0)
634
635         # Should have a delete response element
636         (ber_type, length, element, rest) = decode_element(rest)
637         self.assertEqual(DELETE_RES.hex(), ber_type.hex())
638         self.assertEqual(0, len(rest))
639
640         # Check the response code
641         (ber_type, length, element, rest) = decode_element(element)
642         self.assertEqual(ENUMERATED.hex(), ber_type.hex())
643         self.assertEqual(INVALID_DN_SYNTAX.hex(), element.hex())
644         self.assertGreater(len(rest), 0)
645
646     def test_unauthenticated_delete_gt_limit(self):
647         '''
648             Test a delete, with a large invalid DN
649             As the resulting packet is greater than the maximum unauthenticated
650             packet size we should see a connection reset
651         '''
652
653         # Lets build a delete request, with a large invalid DN
654         dn = b' ' * 255988
655         del_op = encode_element(DELETE, dn)
656
657         msg_no = encode_integer(1)
658         packet = encode_sequence(msg_no + del_op)
659         #
660         # The length of the sequence data should one greater than the maximum
661         # unauthenticated packet length
662         self.assertEqual(256001, len(packet))
663
664         self.send(packet)
665         data = self.recv()
666         self.assertIsNone(data)
667
668     def test_authenticated_delete_at_limit(self):
669         '''
670             Test a delete, with a large invalid DN
671             As the resulting packet is equal to the maximum authenticated
672             packet size we should see an INVALID_DN_SYNTAX response
673         '''
674
675         # Lets build a delete request, with a large invalid DN
676         dn = b' ' * 16777203
677         del_op = encode_element(DELETE, dn)
678
679         self.bind()
680
681         msg_no = encode_integer(2)
682         packet = encode_sequence(msg_no + del_op)
683         #
684         # The length of the sequence data should be equal to the maximum
685         # authenticated packet length currently 16MiB
686         self.assertEqual(16 * 1024 * 1024, len(packet))
687
688         self.send(packet)
689         data = self.recv()
690         self.assertIsNotNone(data)
691
692         #
693         # Decode and validate the response
694
695         # Should be a sequence
696         (ber_type, length, element, rest) = decode_element(data)
697         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
698         self.assertTrue(length > 0)
699         self.assertEqual(0, len(rest))
700
701         # message id should be 2
702         (ber_type, length, element, rest) = decode_element(element)
703         self.assertEqual(INTEGER.hex(), ber_type.hex())
704         msg_no = int.from_bytes(element, byteorder='big')
705         self.assertEqual(2, msg_no)
706         self.assertGreater(len(rest), 0)
707
708         # Should have a delete response element
709         (ber_type, length, element, rest) = decode_element(rest)
710         self.assertEqual(DELETE_RES.hex(), ber_type.hex())
711         self.assertEqual(0, len(rest))
712
713         # Check the response code
714         (ber_type, length, element, rest) = decode_element(element)
715         self.assertEqual(ENUMERATED.hex(), ber_type.hex())
716         self.assertEqual(INVALID_DN_SYNTAX.hex(), element.hex())
717         self.assertGreater(len(rest), 0)
718
719     def test_authenticated_delete_gt_limit(self):
720         '''
721             Test a delete, with a large invalid DN
722             As the resulting packet is one greater than the maximum
723             authenticated packet size we should see a connection reset
724         '''
725
726         # Lets build a delete request, with a large invalid DN
727         dn = b' ' * 16777204
728         del_op = encode_element(DELETE, dn)
729
730         self.bind()
731
732         msg_no = encode_integer(2)
733         packet = encode_sequence(msg_no + del_op)
734         #
735         # The length of the sequence data should be one greater than the
736         # maximum authenticated packet length currently 16MiB
737         self.assertEqual(16 * 1024 * 1024 + 1, len(packet))
738
739         self.send(packet)
740         data = self.recv()
741         self.assertIsNone(data)
742
743
744 class RawCldapTest(TestCase):
745     """
746     A raw cldap Test case.
747     The ldap connections are made over UDP port 389
748
749     Uses the following environment variables:
750         SERVER
751     """
752
753     def setUp(self):
754         super(RawCldapTest, self).setUp()
755
756         self.host = samba.tests.env_get_var_value('SERVER')
757         self.port = 389
758         self.socket = None
759         self.connect()
760
761     def tearDown(self):
762         self.disconnect()
763         super(RawCldapTest, self).tearDown()
764
765     def disconnect(self):
766         ''' Disconnect from and clean up the connection to the server '''
767         if self.socket is None:
768             return
769         self.socket.close()
770         self.socket = None
771
772     def connect(self):
773         ''' Establish an UDP connection to the test server '''
774
775         try:
776             self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
777             self.socket.settimeout(10)
778             self.socket.connect((self.host, self.port))
779         except socket.error:
780             if self.socket is not None:
781                 self.socket.close()
782             raise
783
784     def send(self, req):
785         ''' Send the request to the server '''
786         try:
787             self.socket.sendall(req)
788         except socket.error:
789             self.disconnect()
790             raise
791
792     def recv(self, num_recv=0xffff, timeout=None):
793         ''' receive an array of bytes from the server '''
794         data = None
795         try:
796             if timeout is not None:
797                 self.socket.settimeout(timeout)
798             data = self.socket.recv(num_recv, 0)
799             self.socket.settimeout(10)
800             if len(data) == 0:
801                 self.disconnect()
802                 return None
803         except socket.timeout:
804             # We ignore timeout's as the ldap server will drop the connection
805             # on the errors we're testing. So returning None on a timeout is
806             # the desired behaviour.
807             self.socket.settimeout(10)
808         except socket.error:
809             self.disconnect()
810             raise
811         return data
812
813     def test_search_equals_maximum_permitted_size(self):
814         '''
815         Check that an CLDAP search request equal to the maximum size is
816         accepted
817         '''
818
819         # Lets build an ldap search packet to query the RootDSE
820         header = encode_string(None)        # Base DN, ""
821         header += encode_enumerated(0)      # Enumeration scope
822         header += encode_enumerated(0)      # Enumeration dereference
823         header += encode_integer(0)         # Integer size limit
824         header += encode_integer(0)         # Integer time limit
825         header += encode_boolean(False)     # Boolean attributes only
826
827         #
828         # build an equality search of the form x...x=y...y
829         # With the length of x...x and y...y chosen to generate an
830         # cldap request of 4096 bytes.
831         x = encode_string(b'x' * 2027)
832         y = encode_string(b'y' * 2027)
833         equals = encode_element(EQUALS, x + y)
834         trailer = encode_sequence(None)
835         search = encode_element(SEARCH, header + equals + trailer)
836
837         msg_no = encode_integer(2)
838         packet = encode_sequence(msg_no + search)
839         #
840         # The length of the packet should be equal to the
841         # Maximum length of a cldap packet
842         self.assertEqual(4096, len(packet))
843
844         self.send(packet)
845         data = self.recv()
846         self.assertIsNotNone(data)
847
848         #
849         # Decode and validate the response
850
851         # Should be a sequence
852         (ber_type, length, element, rest) = decode_element(data)
853         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
854         self.assertTrue(length > 0)
855         self.assertGreater(len(rest), 0)
856         # rest should contain a Search request done element, but it's
857         # not validated in this test.
858
859         # message id should be 2
860         (ber_type, length, element, rest) = decode_element(element)
861         self.assertEqual(INTEGER.hex(), ber_type.hex())
862         msg_no = int.from_bytes(element, byteorder='big')
863         self.assertEqual(2, msg_no)
864         self.assertGreater(len(rest), 0)
865
866         # Should have a Search response element
867         (ber_type, length, element, rest) = decode_element(rest)
868         self.assertEqual(SEARCH_RES.hex(), ber_type.hex())
869         self.assertEqual(0, len(rest))
870
871         # Should have an empty matching DN
872         (ber_type, length, element, rest) = decode_element(element)
873         self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
874         self.assertEqual(0, len(element))
875         self.assertGreater(len(rest), 0)
876
877         # Then a sequence of attribute sequences
878         (ber_type, length, element, rest) = decode_element(rest)
879         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
880         self.assertEqual(0, len(rest))
881
882         # Check the first attribute sequence, it should  be
883         # "configurationNamingContext"
884         # The remaining attribute sequences will be ignored but
885         # check that they exist.
886         (ber_type, length, element, rest) = decode_element(element)
887         self.assertEqual(SEQUENCE.hex(), ber_type.hex())
888         # Check that there are remaining attribute sequences.
889         self.assertGreater(len(rest), 0)
890
891         # Check the name of the first attribute
892         (ber_type, length, element, rest) = decode_element(element)
893         self.assertEqual(OCTET_STRING.hex(), ber_type.hex())
894         self.assertGreater(len(rest), 0)
895         self.assertEqual(b'configurationNamingContext', element)
896
897         # And check that there is an attribute value set
898         (ber_type, length, element, rest) = decode_element(rest)
899         self.assertEqual(SET.hex(), ber_type.hex())
900         self.assertGreater(len(element), 0)
901         self.assertEqual(0, len(rest))
902
903     def test_search_exceeds_maximum_permitted_size(self):
904         '''
905         Test that a cldap request longer than the maximum permitted
906         size is rejected.
907         '''
908
909         # Lets build an ldap search packet to query the RootDSE
910         header = encode_string(None)        # Base DN, ""
911         header += encode_enumerated(0)      # Enumeration scope
912         header += encode_enumerated(0)      # Enumeration dereference
913         header += encode_integer(0)         # Integer size limit
914         header += encode_integer(0)         # Integer time limit
915         header += encode_boolean(False)     # Boolean attributes only
916
917         #
918         # build an equality search of the form x...x=y...y
919         # With the length of x...x and y...y chosen to generate an
920         # cldap request of 4097 bytes.
921         x = encode_string(b'x' * 2027)
922         y = encode_string(b'y' * 2028)
923         equals = encode_element(EQUALS, x + y)
924         trailer = encode_sequence(None)
925         search = encode_element(SEARCH, header + equals + trailer)
926
927         msg_no = encode_integer(2)
928         packet = encode_sequence(msg_no + search)
929         #
930         # The length of the sequence data should be one greater than the
931         # Maximum length of a cldap packet
932         self.assertEqual(4097, len(packet))
933
934         self.send(packet)
935         data = self.recv()
936         #
937         # The connection should be closed by the server and we should not
938         # see any data.
939         self.assertIsNone(data)