1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from samba import socket
22 import samba.ndr as ndr
23 import samba.dcerpc.dns as dns
24 from samba.tests import TestCase
26 class DNSTest(TestCase):
28 def errstr(self, errcode):
29 "Return a readable error code"
44 return string_codes[errcode]
47 def assert_dns_rcode_equals(self, packet, rcode):
48 "Helper function to check return code"
49 p_errcode = packet.operation & 0x000F
50 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
51 (self.errstr(rcode), self.errstr(p_errcode)))
53 def assert_dns_opcode_equals(self, packet, opcode):
54 "Helper function to check opcode"
55 p_opcode = packet.operation & 0x7800
56 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
59 def make_name_packet(self, opcode, qid=None):
60 "Helper creating a dns.name_packet"
63 p.id = random.randint(0x0, 0xffff)
68 def finish_name_packet(self, packet, questions):
69 "Helper to finalize a dns.name_packet"
70 packet.qdcount = len(questions)
71 packet.questions = questions
73 def make_name_question(self, name, qtype, qclass):
74 "Helper creating a dns.name_question"
75 q = dns.name_question()
77 q.question_type = qtype
78 q.question_class = qclass
81 def get_dns_domain(self):
82 "Helper to get dns domain"
83 return os.getenv('REALM', 'example.com').lower()
85 def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP')):
86 "send a DNS query and read the reply"
89 send_packet = ndr.ndr_pack(packet)
90 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
92 s.send(send_packet, 0)
93 recv_packet = s.recv(2048, 0)
94 return ndr.ndr_unpack(dns.name_packet, recv_packet)
99 def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP')):
100 "send a DNS query and read the reply"
103 send_packet = ndr.ndr_pack(packet)
104 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
105 s.connect((host, 53))
106 tcp_packet = struct.pack('!H', len(send_packet))
107 tcp_packet += send_packet
108 s.send(tcp_packet, 0)
109 recv_packet = s.recv(0xffff + 2, 0)
110 return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
116 class TestSimpleQueries(DNSTest):
118 def test_one_a_query(self):
119 "create a query packet containing one query record"
120 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
123 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
124 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
125 print "asking for ", q.name
128 self.finish_name_packet(p, questions)
129 response = self.dns_transaction_udp(p)
130 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
131 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
132 self.assertEquals(response.ancount, 1)
133 self.assertEquals(response.answers[0].rdata,
134 os.getenv('SERVER_IP'))
136 def test_one_a_query_tcp(self):
137 "create a query packet containing one query record via TCP"
138 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
141 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
142 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
143 print "asking for ", q.name
146 self.finish_name_packet(p, questions)
147 response = self.dns_transaction_tcp(p)
148 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
149 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
150 self.assertEquals(response.ancount, 1)
151 self.assertEquals(response.answers[0].rdata,
152 os.getenv('SERVER_IP'))
154 def test_two_queries(self):
155 "create a query packet containing two query records"
156 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
159 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
160 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
163 name = "%s.%s" % ('bogusname', self.get_dns_domain())
164 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
167 self.finish_name_packet(p, questions)
168 response = self.dns_transaction_udp(p)
169 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
171 def test_qtype_all_query(self):
172 "create a QTYPE_ALL query"
173 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
176 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
177 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
178 print "asking for ", q.name
181 self.finish_name_packet(p, questions)
182 response = self.dns_transaction_udp(p)
185 dc_ipv6 = os.getenv('SERVER_IPV6')
186 if dc_ipv6 is not None:
189 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
190 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
191 self.assertEquals(response.ancount, num_answers)
192 self.assertEquals(response.answers[0].rdata,
193 os.getenv('SERVER_IP'))
194 if dc_ipv6 is not None:
195 self.assertEquals(response.answers[1].rdata, dc_ipv6)
197 def test_qclass_none_query(self):
198 "create a QCLASS_NONE query"
199 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
202 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
203 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
206 self.finish_name_packet(p, questions)
207 response = self.dns_transaction_udp(p)
208 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
210 # Only returns an authority section entry in BIND and Win DNS
211 # FIXME: Enable one Samba implements this feature
212 # def test_soa_hostname_query(self):
213 # "create a SOA query for a hostname"
214 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
217 # name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
218 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
219 # questions.append(q)
221 # self.finish_name_packet(p, questions)
222 # response = self.dns_transaction_udp(p)
223 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
224 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
225 # # We don't get SOA records for single hosts
226 # self.assertEquals(response.ancount, 0)
228 def test_soa_domain_query(self):
229 "create a SOA query for a domain"
230 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
233 name = self.get_dns_domain()
234 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
237 self.finish_name_packet(p, questions)
238 response = self.dns_transaction_udp(p)
239 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
240 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
241 self.assertEquals(response.ancount, 1)
244 class TestDNSUpdates(DNSTest):
246 def test_two_updates(self):
247 "create two update requests"
248 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
251 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
252 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
255 name = self.get_dns_domain()
256 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
259 self.finish_name_packet(p, updates)
260 response = self.dns_transaction_udp(p)
261 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
263 def test_update_wrong_qclass(self):
264 "create update with DNS_QCLASS_NONE"
265 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
268 name = self.get_dns_domain()
269 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
272 self.finish_name_packet(p, updates)
273 response = self.dns_transaction_udp(p)
274 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
276 def test_update_prereq_with_non_null_ttl(self):
277 "test update with a non-null TTL"
278 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
281 name = self.get_dns_domain()
283 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
285 self.finish_name_packet(p, updates)
289 r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
290 r.rr_type = dns.DNS_QTYPE_TXT
291 r.rr_class = dns.DNS_QCLASS_NONE
296 p.ancount = len(prereqs)
299 response = self.dns_transaction_udp(p)
300 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
302 # I'd love to test this one, but it segfaults. :)
303 # def test_update_prereq_with_non_null_length(self):
304 # "test update with a non-null length"
305 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
308 # name = self.get_dns_domain()
310 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
312 # self.finish_name_packet(p, updates)
316 # r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
317 # r.rr_type = dns.DNS_QTYPE_TXT
318 # r.rr_class = dns.DNS_QCLASS_ANY
323 # p.ancount = len(prereqs)
324 # p.answers = prereqs
326 # response = self.dns_transaction_udp(p)
327 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
329 def test_update_prereq_nonexisting_name(self):
330 "test update with a nonexisting name"
331 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
334 name = self.get_dns_domain()
336 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
338 self.finish_name_packet(p, updates)
342 r.name = "idontexist.%s" % self.get_dns_domain()
343 r.rr_type = dns.DNS_QTYPE_TXT
344 r.rr_class = dns.DNS_QCLASS_ANY
349 p.ancount = len(prereqs)
352 response = self.dns_transaction_udp(p)
353 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
355 def test_update_add_txt_record(self):
356 "test adding records works"
357 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
360 name = self.get_dns_domain()
362 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
364 self.finish_name_packet(p, updates)
368 r.name = "textrec.%s" % self.get_dns_domain()
369 r.rr_type = dns.DNS_QTYPE_TXT
370 r.rr_class = dns.DNS_QCLASS_IN
373 r.rdata = dns.txt_record()
374 r.rdata.txt = '"This is a test"'
376 p.nscount = len(updates)
379 response = self.dns_transaction_udp(p)
380 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
382 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
385 name = "textrec.%s" % self.get_dns_domain()
386 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
389 self.finish_name_packet(p, questions)
390 response = self.dns_transaction_udp(p)
391 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
392 self.assertEquals(response.ancount, 1)
393 self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
395 def test_update_add_two_txt_records(self):
396 "test adding two txt records works"
397 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
400 name = self.get_dns_domain()
402 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
404 self.finish_name_packet(p, updates)
408 r.name = "textrec2.%s" % self.get_dns_domain()
409 r.rr_type = dns.DNS_QTYPE_TXT
410 r.rr_class = dns.DNS_QCLASS_IN
413 r.rdata = dns.txt_record()
414 r.rdata.txt = '"This is a test" "and this is a test, too"'
416 p.nscount = len(updates)
419 response = self.dns_transaction_udp(p)
420 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
422 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
425 name = "textrec2.%s" % self.get_dns_domain()
426 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
429 self.finish_name_packet(p, questions)
430 response = self.dns_transaction_udp(p)
431 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
432 self.assertEquals(response.ancount, 1)
433 self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
435 def test_delete_record(self):
436 "Test if deleting records works"
437 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
440 name = self.get_dns_domain()
442 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
444 self.finish_name_packet(p, updates)
448 r.name = "textrec.%s" % self.get_dns_domain()
449 r.rr_type = dns.DNS_QTYPE_TXT
450 r.rr_class = dns.DNS_QCLASS_NONE
453 r.rdata = dns.txt_record()
454 r.rdata.txt = '"This is a test"'
456 p.nscount = len(updates)
459 response = self.dns_transaction_udp(p)
460 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
462 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
465 name = "textrec.%s" % self.get_dns_domain()
466 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
469 self.finish_name_packet(p, questions)
470 response = self.dns_transaction_udp(p)
471 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
474 class TestComplexQueries(DNSTest):
477 super(TestComplexQueries, self).setUp()
478 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
481 name = self.get_dns_domain()
483 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
485 self.finish_name_packet(p, updates)
489 r.name = "cname_test.%s" % self.get_dns_domain()
490 r.rr_type = dns.DNS_QTYPE_CNAME
491 r.rr_class = dns.DNS_QCLASS_IN
494 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
496 p.nscount = len(updates)
499 response = self.dns_transaction_udp(p)
500 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
503 super(TestComplexQueries, self).tearDown()
504 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
507 name = self.get_dns_domain()
509 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
511 self.finish_name_packet(p, updates)
515 r.name = "cname_test.%s" % self.get_dns_domain()
516 r.rr_type = dns.DNS_QTYPE_CNAME
517 r.rr_class = dns.DNS_QCLASS_NONE
520 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
522 p.nscount = len(updates)
525 response = self.dns_transaction_udp(p)
526 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
528 def test_one_a_query(self):
529 "create a query packet containing one query record"
530 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
533 name = "cname_test.%s" % self.get_dns_domain()
534 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
535 print "asking for ", q.name
538 self.finish_name_packet(p, questions)
539 response = self.dns_transaction_udp(p)
540 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
541 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
542 self.assertEquals(response.ancount, 2)
543 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
544 self.assertEquals(response.answers[0].rdata, "%s.%s" %
545 (os.getenv('SERVER'), self.get_dns_domain()))
546 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
547 self.assertEquals(response.answers[1].rdata,
548 os.getenv('SERVER_IP'))
550 class TestInvalidQueries(DNSTest):
552 def test_one_a_query(self):
553 "send 0 bytes follows by create a query packet containing one query record"
557 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
558 s.connect((os.getenv('SERVER_IP'), 53))
564 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
567 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
568 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
569 print "asking for ", q.name
572 self.finish_name_packet(p, questions)
573 response = self.dns_transaction_udp(p)
574 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
575 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
576 self.assertEquals(response.ancount, 1)
577 self.assertEquals(response.answers[0].rdata,
578 os.getenv('SERVER_IP'))
580 if __name__ == "__main__":