1 # Tests of malformed DNS packets
2 # Copyright (C) Catalyst.NET ltd
4 # written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Sanity tests for DNS and NBT server parsing.
21 We don't use a proper client library so we can make improper packets.
28 from samba.dcerpc import dns, nbt
30 from samba.tests import TestCase
35 for i in range(1, 0xffff):
39 SERVER = os.environ['SERVER_IP']
40 SERVER_NAME = f"{os.environ['SERVER']}.{os.environ['REALM']}"
44 def encode_netbios_bytes(chars):
45 """Even RFC 1002 uses distancing quotes when calling this "compression"."""
47 chars = (chars + b' ')[:16]
49 out.append((c >> 4) + 65)
50 out.append((c & 15) + 65)
54 class TestDnsPacketBase(TestCase):
58 # we need to ensure the DNS server is responsive before
61 ok = self._known_good_query()
64 print(f"the server is STILL unresponsive after {40 * TIMEOUT} seconds")
66 def decode_reply(self, data):
68 id, flags, n_q, n_a, n_rec, n_exta = struct.unpack('!6H',
74 def construct_query(self, names):
75 """Create a query packet containing one query record.
77 *names* is either a single string name in the usual dotted
78 form, or a list of names. In the latter case, each name can
79 be a dotted string or a list of byte components, which allows
80 dots in components. Where I say list, I mean non-string
85 # these 3 are all the same
88 [[b"example", b"com"]]
90 # this is three names in the same request
92 [b"example", b"com", b"..!"],
93 (b"first component", b" 2nd component")]
95 header = struct.pack('!6H',
97 0x0100, # query, with recursion
98 len(names), # number of queries
101 0x0000, # no extra records
103 tail = struct.pack('!BHH',
106 0x0001, # class IN-ternet
110 if isinstance(name, str):
111 bits = name.encode('utf8').split(b'.')
116 encoded_bits.append(b'%c%s' % (len(b), b))
117 encoded_bits.append(tail)
119 return header + b''.join(encoded_bits)
121 def _test_query(self, names=(), expected_rcode=None):
123 if isinstance(names, str):
126 packet = self.construct_query(names)
127 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
128 s.sendto(packet, self.server)
129 r, _, _ = select.select([s], [], [], TIMEOUT)
131 # It is reasonable to not reply to these packets (Windows
132 # doesn't), but it is not reasonable to render the server
135 ok = self._known_good_query()
136 self.assertTrue(ok, f"the server is unresponsive")
138 def _known_good_query(self):
139 if self.server[1] == 53:
141 expected_rcode = dns.DNS_RCODE_OK
143 name = [encode_netbios_bytes(b'nxdomain'), b'nxdomain']
144 expected_rcode = nbt.NBT_RCODE_NAM
146 packet = self.construct_query([name])
147 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
148 s.sendto(packet, self.server)
149 r, _, _ = select.select([s], [], [], TIMEOUT)
154 data, addr = s.recvfrom(4096)
156 rcode = self.decode_reply(data)['rcode']
157 return expected_rcode == rcode
159 def _test_empty_packet(self):
162 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
163 s.sendto(packet, self.server)
166 # It is reasonable not to reply to an empty packet
167 # but it is not reasonable to render the server
169 ok = self._known_good_query()
170 self.assertTrue(ok, f"the server is unresponsive")
173 class TestDnsPackets(TestDnsPacketBase):
174 server = (SERVER, 53)
175 qtype = 1 # dns type A
177 def _test_many_repeated_components(self, label, n, expected_rcode=None):
179 self._test_query([name],
180 expected_rcode=expected_rcode)
182 def test_127_very_dotty_components(self):
184 self._test_many_repeated_components(label, 127)
186 def test_127_half_dotty_components(self):
187 label = b'x.' * 31 + b'x'
188 self._test_many_repeated_components(label, 127)
190 def test_empty_packet(self):
191 self._test_empty_packet()
194 class TestNbtPackets(TestDnsPacketBase):
195 server = (SERVER, 137)
196 qtype = 0x20 # NBT_QTYPE_NETBIOS
198 def _test_nbt_encode_query(self, names, *args, **kwargs):
199 if isinstance(names, str):
204 if isinstance(name, str):
205 bits = name.encode('utf8').split(b'.')
209 encoded = [encode_netbios_bytes(bits[0])]
210 encoded.extend(bits[1:])
211 nbt_names.append(encoded)
213 self._test_query(nbt_names, *args, **kwargs)
215 def _test_many_repeated_components(self, label, n, expected_rcode=None):
217 name[0] = encode_netbios_bytes(label)
218 self._test_query([name],
219 expected_rcode=expected_rcode)
221 def test_127_very_dotty_components(self):
223 self._test_many_repeated_components(label, 127)
225 def test_127_half_dotty_components(self):
226 label = b'x.' * 31 + b'x'
227 self._test_many_repeated_components(label, 127)
229 def test_empty_packet(self):
230 self._test_empty_packet()