1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 import samba.ndr as ndr
23 from samba import credentials, param
24 from samba.tests import TestCase
25 from samba.dcerpc import dns, dnsp, dnsserver
27 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
29 # This timeout only has relevance when testing against Windows
30 # Format errors tend to return patchy responses, so a timeout is needed.
33 def make_txt_record(records):
34 rdata_txt = dns.txt_record()
35 s_list = dnsp.string_list()
36 s_list.count = len(records)
38 rdata_txt.txt = s_list
41 class DNSTest(TestCase):
43 def get_loadparm(self):
45 lp.load(os.getenv("SMB_CONF_PATH"))
48 def errstr(self, errcode):
49 "Return a readable error code"
64 return string_codes[errcode]
67 def assert_dns_rcode_equals(self, packet, rcode):
68 "Helper function to check return code"
69 p_errcode = packet.operation & 0x000F
70 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
71 (self.errstr(rcode), self.errstr(p_errcode)))
73 def assert_dns_opcode_equals(self, packet, opcode):
74 "Helper function to check opcode"
75 p_opcode = packet.operation & 0x7800
76 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
79 def make_name_packet(self, opcode, qid=None):
80 "Helper creating a dns.name_packet"
83 p.id = random.randint(0x0, 0xffff)
88 def finish_name_packet(self, packet, questions):
89 "Helper to finalize a dns.name_packet"
90 packet.qdcount = len(questions)
91 packet.questions = questions
93 def make_name_question(self, name, qtype, qclass):
94 "Helper creating a dns.name_question"
95 q = dns.name_question()
97 q.question_type = qtype
98 q.question_class = qclass
101 def get_dns_domain(self):
102 "Helper to get dns domain"
103 return os.getenv('REALM', 'example.com').lower()
105 def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'),
106 dump=False, timeout=timeout):
107 "send a DNS query and read the reply"
110 send_packet = ndr.ndr_pack(packet)
112 print self.hexdump(send_packet)
113 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
114 s.settimeout(timeout)
115 s.connect((host, 53))
116 s.send(send_packet, 0)
117 recv_packet = s.recv(2048, 0)
119 print self.hexdump(recv_packet)
120 return ndr.ndr_unpack(dns.name_packet, recv_packet)
125 def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'),
126 dump=False, timeout=timeout):
127 "send a DNS query and read the reply"
130 send_packet = ndr.ndr_pack(packet)
132 print self.hexdump(send_packet)
133 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
134 s.settimeout(timeout)
135 s.connect((host, 53))
136 tcp_packet = struct.pack('!H', len(send_packet))
137 tcp_packet += send_packet
138 s.send(tcp_packet, 0)
139 recv_packet = s.recv(0xffff + 2, 0)
141 print self.hexdump(recv_packet)
142 return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
147 def hexdump(self, src, length=8):
150 s,src = src[:length],src[length:]
151 hexa = ' '.join(["%02X"%ord(x) for x in s])
152 s = s.translate(FILTER)
153 result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
157 def make_txt_update(self, prefix, txt_array):
158 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
161 name = self.get_dns_domain()
162 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
164 self.finish_name_packet(p, updates)
168 r.name = "%s.%s" % (prefix, self.get_dns_domain())
169 r.rr_type = dns.DNS_QTYPE_TXT
170 r.rr_class = dns.DNS_QCLASS_IN
173 rdata = make_txt_record(txt_array)
176 p.nscount = len(updates)
181 def check_query_txt(self, prefix, txt_array):
182 name = "%s.%s" % (prefix, self.get_dns_domain())
183 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
186 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
189 self.finish_name_packet(p, questions)
190 response = self.dns_transaction_udp(p)
191 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
192 self.assertEquals(response.ancount, 1)
193 self.assertEquals(response.answers[0].rdata.txt.str, txt_array)
195 def assertIsNotNone(self, item):
196 self.assertTrue(item is not None)
198 class TestSimpleQueries(DNSTest):
200 def test_one_a_query(self):
201 "create a query packet containing one query record"
202 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
205 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
206 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
207 print "asking for ", q.name
210 self.finish_name_packet(p, questions)
211 response = self.dns_transaction_udp(p)
212 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
213 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
214 self.assertEquals(response.ancount, 1)
215 self.assertEquals(response.answers[0].rdata,
216 os.getenv('SERVER_IP'))
218 def test_one_a_query_tcp(self):
219 "create a query packet containing one query record via TCP"
220 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
223 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
224 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
225 print "asking for ", q.name
228 self.finish_name_packet(p, questions)
229 response = self.dns_transaction_tcp(p)
230 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
231 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
232 self.assertEquals(response.ancount, 1)
233 self.assertEquals(response.answers[0].rdata,
234 os.getenv('SERVER_IP'))
236 def test_one_mx_query(self):
237 "create a query packet causing an empty RCODE_OK answer"
238 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
241 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
242 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
243 print "asking for ", q.name
246 self.finish_name_packet(p, questions)
247 response = self.dns_transaction_udp(p)
248 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
249 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
250 self.assertEquals(response.ancount, 0)
252 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
255 name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
256 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
257 print "asking for ", q.name
260 self.finish_name_packet(p, questions)
261 response = self.dns_transaction_udp(p)
262 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
263 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
264 self.assertEquals(response.ancount, 0)
266 def test_two_queries(self):
267 "create a query packet containing two query records"
268 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
271 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
272 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
275 name = "%s.%s" % ('bogusname', self.get_dns_domain())
276 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
279 self.finish_name_packet(p, questions)
281 response = self.dns_transaction_udp(p)
282 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
283 except socket.timeout:
284 # Windows chooses not to respond to incorrectly formatted queries.
285 # Although this appears to be non-deterministic even for the same
286 # request twice, it also appears to be based on a how poorly the
287 # request is formatted.
290 def test_qtype_all_query(self):
291 "create a QTYPE_ALL query"
292 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
295 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
296 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
297 print "asking for ", q.name
300 self.finish_name_packet(p, questions)
301 response = self.dns_transaction_udp(p)
304 dc_ipv6 = os.getenv('SERVER_IPV6')
305 if dc_ipv6 is not None:
308 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
309 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
310 self.assertEquals(response.ancount, num_answers)
311 self.assertEquals(response.answers[0].rdata,
312 os.getenv('SERVER_IP'))
313 if dc_ipv6 is not None:
314 self.assertEquals(response.answers[1].rdata, dc_ipv6)
316 def test_qclass_none_query(self):
317 "create a QCLASS_NONE query"
318 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
321 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
322 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
325 self.finish_name_packet(p, questions)
327 response = self.dns_transaction_udp(p)
328 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
329 except socket.timeout:
330 # Windows chooses not to respond to incorrectly formatted queries.
331 # Although this appears to be non-deterministic even for the same
332 # request twice, it also appears to be based on a how poorly the
333 # request is formatted.
336 # Only returns an authority section entry in BIND and Win DNS
337 # FIXME: Enable one Samba implements this feature
338 # def test_soa_hostname_query(self):
339 # "create a SOA query for a hostname"
340 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
343 # name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
344 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
345 # questions.append(q)
347 # self.finish_name_packet(p, questions)
348 # response = self.dns_transaction_udp(p)
349 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
350 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
351 # # We don't get SOA records for single hosts
352 # self.assertEquals(response.ancount, 0)
354 def test_soa_domain_query(self):
355 "create a SOA query for a domain"
356 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
359 name = self.get_dns_domain()
360 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
363 self.finish_name_packet(p, questions)
364 response = self.dns_transaction_udp(p)
365 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
366 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
367 self.assertEquals(response.ancount, 1)
368 self.assertEquals(response.answers[0].rdata.minimum, 3600)
371 class TestDNSUpdates(DNSTest):
373 def test_two_updates(self):
374 "create two update requests"
375 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
378 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
379 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
382 name = self.get_dns_domain()
383 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
386 self.finish_name_packet(p, updates)
388 response = self.dns_transaction_udp(p)
389 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
390 except socket.timeout:
391 # Windows chooses not to respond to incorrectly formatted queries.
392 # Although this appears to be non-deterministic even for the same
393 # request twice, it also appears to be based on a how poorly the
394 # request is formatted.
397 def test_update_wrong_qclass(self):
398 "create update with DNS_QCLASS_NONE"
399 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
402 name = self.get_dns_domain()
403 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
406 self.finish_name_packet(p, updates)
407 response = self.dns_transaction_udp(p)
408 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
410 def test_update_prereq_with_non_null_ttl(self):
411 "test update with a non-null TTL"
412 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
415 name = self.get_dns_domain()
417 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
419 self.finish_name_packet(p, updates)
423 r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
424 r.rr_type = dns.DNS_QTYPE_TXT
425 r.rr_class = dns.DNS_QCLASS_NONE
430 p.ancount = len(prereqs)
434 response = self.dns_transaction_udp(p)
435 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
436 except socket.timeout:
437 # Windows chooses not to respond to incorrectly formatted queries.
438 # Although this appears to be non-deterministic even for the same
439 # request twice, it also appears to be based on a how poorly the
440 # request is formatted.
443 def test_update_prereq_with_non_null_length(self):
444 "test update with a non-null length"
445 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
448 name = self.get_dns_domain()
450 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
452 self.finish_name_packet(p, updates)
456 r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
457 r.rr_type = dns.DNS_QTYPE_TXT
458 r.rr_class = dns.DNS_QCLASS_ANY
463 p.ancount = len(prereqs)
466 response = self.dns_transaction_udp(p)
467 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
469 def test_update_prereq_nonexisting_name(self):
470 "test update with a nonexisting name"
471 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
474 name = self.get_dns_domain()
476 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
478 self.finish_name_packet(p, updates)
482 r.name = "idontexist.%s" % self.get_dns_domain()
483 r.rr_type = dns.DNS_QTYPE_TXT
484 r.rr_class = dns.DNS_QCLASS_ANY
489 p.ancount = len(prereqs)
492 response = self.dns_transaction_udp(p)
493 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
495 def test_update_add_txt_record(self):
496 "test adding records works"
497 prefix, txt = 'textrec', ['"This is a test"']
498 p = self.make_txt_update(prefix, txt)
499 response = self.dns_transaction_udp(p)
500 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
501 self.check_query_txt(prefix, txt)
503 def test_delete_record(self):
504 "Test if deleting records works"
506 NAME = "deleterec.%s" % self.get_dns_domain()
508 # First, create a record to make sure we have a record to delete.
509 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
512 name = self.get_dns_domain()
514 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
516 self.finish_name_packet(p, updates)
521 r.rr_type = dns.DNS_QTYPE_TXT
522 r.rr_class = dns.DNS_QCLASS_IN
525 rdata = make_txt_record(['"This is a test"'])
528 p.nscount = len(updates)
531 response = self.dns_transaction_udp(p)
532 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
534 # Now check the record is around
535 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
537 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
540 self.finish_name_packet(p, questions)
541 response = self.dns_transaction_udp(p)
542 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
544 # Now delete the record
545 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
548 name = self.get_dns_domain()
550 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
552 self.finish_name_packet(p, updates)
557 r.rr_type = dns.DNS_QTYPE_TXT
558 r.rr_class = dns.DNS_QCLASS_NONE
561 rdata = make_txt_record(['"This is a test"'])
564 p.nscount = len(updates)
567 response = self.dns_transaction_udp(p)
568 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
570 # And finally check it's gone
571 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
574 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
577 self.finish_name_packet(p, questions)
578 response = self.dns_transaction_udp(p)
579 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
581 def test_readd_record(self):
582 "Test if adding, deleting and then readding a records works"
584 NAME = "readdrec.%s" % self.get_dns_domain()
587 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
590 name = self.get_dns_domain()
592 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
594 self.finish_name_packet(p, updates)
599 r.rr_type = dns.DNS_QTYPE_TXT
600 r.rr_class = dns.DNS_QCLASS_IN
603 rdata = make_txt_record(['"This is a test"'])
606 p.nscount = len(updates)
609 response = self.dns_transaction_udp(p)
610 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
612 # Now check the record is around
613 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
615 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
618 self.finish_name_packet(p, questions)
619 response = self.dns_transaction_udp(p)
620 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
622 # Now delete the record
623 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
626 name = self.get_dns_domain()
628 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
630 self.finish_name_packet(p, updates)
635 r.rr_type = dns.DNS_QTYPE_TXT
636 r.rr_class = dns.DNS_QCLASS_NONE
639 rdata = make_txt_record(['"This is a test"'])
642 p.nscount = len(updates)
645 response = self.dns_transaction_udp(p)
646 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
649 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
652 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
655 self.finish_name_packet(p, questions)
656 response = self.dns_transaction_udp(p)
657 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
659 # recreate the record
660 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
663 name = self.get_dns_domain()
665 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
667 self.finish_name_packet(p, updates)
672 r.rr_type = dns.DNS_QTYPE_TXT
673 r.rr_class = dns.DNS_QCLASS_IN
676 rdata = make_txt_record(['"This is a test"'])
679 p.nscount = len(updates)
682 response = self.dns_transaction_udp(p)
683 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
685 # Now check the record is around
686 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
688 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
691 self.finish_name_packet(p, questions)
692 response = self.dns_transaction_udp(p)
693 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
695 def test_update_add_mx_record(self):
696 "test adding MX records works"
697 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
700 name = self.get_dns_domain()
702 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
704 self.finish_name_packet(p, updates)
708 r.name = "%s" % self.get_dns_domain()
709 r.rr_type = dns.DNS_QTYPE_MX
710 r.rr_class = dns.DNS_QCLASS_IN
713 rdata = dns.mx_record()
714 rdata.preference = 10
715 rdata.exchange = 'mail.%s' % self.get_dns_domain()
718 p.nscount = len(updates)
721 response = self.dns_transaction_udp(p)
722 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
724 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
727 name = "%s" % self.get_dns_domain()
728 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
731 self.finish_name_packet(p, questions)
732 response = self.dns_transaction_udp(p)
733 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
734 self.assertEqual(response.ancount, 1)
735 ans = response.answers[0]
736 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
737 self.assertEqual(ans.rdata.preference, 10)
738 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
741 class TestComplexQueries(DNSTest):
744 super(TestComplexQueries, self).setUp()
745 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
748 name = self.get_dns_domain()
750 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
752 self.finish_name_packet(p, updates)
756 r.name = "cname_test.%s" % self.get_dns_domain()
757 r.rr_type = dns.DNS_QTYPE_CNAME
758 r.rr_class = dns.DNS_QCLASS_IN
761 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
763 p.nscount = len(updates)
766 response = self.dns_transaction_udp(p)
767 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
770 super(TestComplexQueries, self).tearDown()
771 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
774 name = self.get_dns_domain()
776 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
778 self.finish_name_packet(p, updates)
782 r.name = "cname_test.%s" % self.get_dns_domain()
783 r.rr_type = dns.DNS_QTYPE_CNAME
784 r.rr_class = dns.DNS_QCLASS_NONE
787 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
789 p.nscount = len(updates)
792 response = self.dns_transaction_udp(p)
793 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
795 def test_one_a_query(self):
796 "create a query packet containing one query record"
797 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
800 name = "cname_test.%s" % self.get_dns_domain()
801 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
802 print "asking for ", q.name
805 self.finish_name_packet(p, questions)
806 response = self.dns_transaction_udp(p)
807 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
808 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
809 self.assertEquals(response.ancount, 2)
810 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
811 self.assertEquals(response.answers[0].rdata, "%s.%s" %
812 (os.getenv('SERVER'), self.get_dns_domain()))
813 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
814 self.assertEquals(response.answers[1].rdata,
815 os.getenv('SERVER_IP'))
817 class TestInvalidQueries(DNSTest):
819 def test_one_a_query(self):
820 "send 0 bytes follows by create a query packet containing one query record"
824 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
825 s.connect((os.getenv('SERVER_IP'), 53))
831 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
834 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
835 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
836 print "asking for ", q.name
839 self.finish_name_packet(p, questions)
840 response = self.dns_transaction_udp(p)
841 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
842 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
843 self.assertEquals(response.ancount, 1)
844 self.assertEquals(response.answers[0].rdata,
845 os.getenv('SERVER_IP'))
847 def test_one_a_reply(self):
848 "send a reply instead of a query"
851 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
854 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
855 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
856 print "asking for ", q.name
859 self.finish_name_packet(p, questions)
860 p.operation |= dns.DNS_FLAG_REPLY
863 send_packet = ndr.ndr_pack(p)
864 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
865 s.settimeout(timeout)
866 host=os.getenv('SERVER_IP')
867 s.connect((host, 53))
868 tcp_packet = struct.pack('!H', len(send_packet))
869 tcp_packet += send_packet
870 s.send(tcp_packet, 0)
871 recv_packet = s.recv(0xffff + 2, 0)
872 self.assertEquals(0, len(recv_packet))
873 except socket.timeout:
874 # Windows chooses not to respond to incorrectly formatted queries.
875 # Although this appears to be non-deterministic even for the same
876 # request twice, it also appears to be based on a how poorly the
877 # request is formatted.
883 class TestRPCRoundtrip(DNSTest):
884 def get_credentials(self, lp):
885 creds = credentials.Credentials()
887 creds.set_machine_account(lp)
888 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
892 super(TestRPCRoundtrip, self).setUp()
893 self.lp = self.get_loadparm()
894 self.creds = self.get_credentials(self.lp)
895 self.server = os.getenv("SERVER_IP")
896 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
900 super(TestRPCRoundtrip, self).tearDown()
902 def test_update_add_null_padded_txt_record(self):
903 "test adding records works"
904 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
905 p = self.make_txt_update(prefix, txt)
906 response = self.dns_transaction_udp(p)
907 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
908 self.check_query_txt(prefix, txt)
909 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
910 self.get_dns_domain(),
911 "%s.%s" % (prefix, self.get_dns_domain()),
912 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
914 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
915 p = self.make_txt_update(prefix, txt)
916 response = self.dns_transaction_udp(p)
917 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
918 self.check_query_txt(prefix, txt)
919 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
920 self.get_dns_domain(),
921 "%s.%s" % (prefix, self.get_dns_domain()),
922 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
924 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
925 p = self.make_txt_update(prefix, txt)
926 response = self.dns_transaction_udp(p)
927 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
928 self.check_query_txt(prefix, txt)
929 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
930 self.get_dns_domain(),
931 "%s.%s" % (prefix, self.get_dns_domain()),
932 dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
934 # Test is incomplete due to strlen against txt records
935 def test_update_add_null_char_txt_record(self):
936 "test adding records works"
937 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
938 p = self.make_txt_update(prefix, txt)
939 response = self.dns_transaction_udp(p)
940 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
941 self.check_query_txt(prefix, ['NULL'])
942 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
943 self.get_dns_domain(),
944 "%s.%s" % (prefix, self.get_dns_domain()),
945 dnsp.DNS_TYPE_TXT, '"NULL"'))
947 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
948 p = self.make_txt_update(prefix, txt)
949 response = self.dns_transaction_udp(p)
950 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
951 self.check_query_txt(prefix, ['NULL', 'NULL'])
952 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
953 self.get_dns_domain(),
954 "%s.%s" % (prefix, self.get_dns_domain()),
955 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
957 def test_update_add_hex_char_txt_record(self):
958 "test adding records works"
959 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
960 p = self.make_txt_update(prefix, txt)
961 response = self.dns_transaction_udp(p)
962 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
963 self.check_query_txt(prefix, txt)
964 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
965 self.get_dns_domain(),
966 "%s.%s" % (prefix, self.get_dns_domain()),
967 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
969 def test_update_add_slash_txt_record(self):
970 "test adding records works"
971 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
972 p = self.make_txt_update(prefix, txt)
973 response = self.dns_transaction_udp(p)
974 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
975 self.check_query_txt(prefix, txt)
976 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
977 self.get_dns_domain(),
978 "%s.%s" % (prefix, self.get_dns_domain()),
979 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
981 def test_update_add_two_txt_records(self):
982 "test adding two txt records works"
983 prefix, txt = 'textrec2', ['"This is a test"',
984 '"and this is a test, too"']
985 p = self.make_txt_update(prefix, txt)
986 response = self.dns_transaction_udp(p)
987 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
988 self.check_query_txt(prefix, txt)
989 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
990 self.get_dns_domain(),
991 "%s.%s" % (prefix, self.get_dns_domain()),
992 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
993 ' "\\"and this is a test, too\\""'))
995 def test_update_add_empty_txt_records(self):
996 "test adding two txt records works"
997 prefix, txt = 'emptytextrec', []
998 p = self.make_txt_update(prefix, txt)
999 response = self.dns_transaction_udp(p)
1000 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1001 self.check_query_txt(prefix, txt)
1002 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
1003 self.get_dns_domain(),
1004 "%s.%s" % (prefix, self.get_dns_domain()),
1005 dnsp.DNS_TYPE_TXT, ''))
1007 if __name__ == "__main__":