1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 import samba.ndr as ndr
23 import samba.dcerpc.dns as dns
24 from samba import credentials, param
25 from samba.tests import TestCase
26 from samba.dcerpc import dnsp, dnsserver
29 class DNSTest(TestCase):
31 def errstr(self, errcode):
32 "Return a readable error code"
47 return string_codes[errcode]
50 def assert_dns_rcode_equals(self, packet, rcode):
51 "Helper function to check return code"
52 p_errcode = packet.operation & 0x000F
53 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
54 (self.errstr(rcode), self.errstr(p_errcode)))
56 def assert_dns_opcode_equals(self, packet, opcode):
57 "Helper function to check opcode"
58 p_opcode = packet.operation & 0x7800
59 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
62 def make_name_packet(self, opcode, qid=None):
63 "Helper creating a dns.name_packet"
66 p.id = random.randint(0x0, 0xffff)
71 def finish_name_packet(self, packet, questions):
72 "Helper to finalize a dns.name_packet"
73 packet.qdcount = len(questions)
74 packet.questions = questions
76 def make_name_question(self, name, qtype, qclass):
77 "Helper creating a dns.name_question"
78 q = dns.name_question()
80 q.question_type = qtype
81 q.question_class = qclass
84 def get_dns_domain(self):
85 "Helper to get dns domain"
86 return os.getenv('REALM', 'example.com').lower()
88 def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
89 "send a DNS query and read the reply"
92 send_packet = ndr.ndr_pack(packet)
94 print self.hexdump(send_packet)
95 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
97 s.send(send_packet, 0)
98 recv_packet = s.recv(2048, 0)
100 print self.hexdump(recv_packet)
101 return ndr.ndr_unpack(dns.name_packet, recv_packet)
106 def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
107 "send a DNS query and read the reply"
110 send_packet = ndr.ndr_pack(packet)
112 print self.hexdump(send_packet)
113 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
114 s.connect((host, 53))
115 tcp_packet = struct.pack('!H', len(send_packet))
116 tcp_packet += send_packet
117 s.send(tcp_packet, 0)
118 recv_packet = s.recv(0xffff + 2, 0)
120 print self.hexdump(recv_packet)
121 return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
126 class TestSimpleQueries(DNSTest):
128 def test_one_a_query(self):
129 "create a query packet containing one query record"
130 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
133 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
134 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
135 print "asking for ", q.name
138 self.finish_name_packet(p, questions)
139 response = self.dns_transaction_udp(p)
140 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
141 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
142 self.assertEquals(response.ancount, 1)
143 self.assertEquals(response.answers[0].rdata,
144 os.getenv('SERVER_IP'))
146 def test_one_a_query_tcp(self):
147 "create a query packet containing one query record via TCP"
148 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
151 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
152 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
153 print "asking for ", q.name
156 self.finish_name_packet(p, questions)
157 response = self.dns_transaction_tcp(p)
158 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
159 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
160 self.assertEquals(response.ancount, 1)
161 self.assertEquals(response.answers[0].rdata,
162 os.getenv('SERVER_IP'))
164 def test_one_mx_query(self):
165 "create a query packet causing an empty RCODE_OK answer"
166 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
169 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
170 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
171 print "asking for ", q.name
174 self.finish_name_packet(p, questions)
175 response = self.dns_transaction_udp(p)
176 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
177 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
178 self.assertEquals(response.ancount, 0)
180 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
183 name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
184 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
185 print "asking for ", q.name
188 self.finish_name_packet(p, questions)
189 response = self.dns_transaction_udp(p)
190 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
191 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
192 self.assertEquals(response.ancount, 0)
194 def test_two_queries(self):
195 "create a query packet containing two query records"
196 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
199 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
200 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
203 name = "%s.%s" % ('bogusname', self.get_dns_domain())
204 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
207 self.finish_name_packet(p, questions)
208 response = self.dns_transaction_udp(p)
209 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
211 def test_qtype_all_query(self):
212 "create a QTYPE_ALL query"
213 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
216 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
217 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
218 print "asking for ", q.name
221 self.finish_name_packet(p, questions)
222 response = self.dns_transaction_udp(p)
225 dc_ipv6 = os.getenv('SERVER_IPV6')
226 if dc_ipv6 is not None:
229 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
230 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
231 self.assertEquals(response.ancount, num_answers)
232 self.assertEquals(response.answers[0].rdata,
233 os.getenv('SERVER_IP'))
234 if dc_ipv6 is not None:
235 self.assertEquals(response.answers[1].rdata, dc_ipv6)
237 def test_qclass_none_query(self):
238 "create a QCLASS_NONE query"
239 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
242 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
243 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
246 self.finish_name_packet(p, questions)
247 response = self.dns_transaction_udp(p)
248 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
250 # Only returns an authority section entry in BIND and Win DNS
251 # FIXME: Enable one Samba implements this feature
252 # def test_soa_hostname_query(self):
253 # "create a SOA query for a hostname"
254 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
257 # name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
258 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
259 # questions.append(q)
261 # self.finish_name_packet(p, questions)
262 # response = self.dns_transaction_udp(p)
263 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
264 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
265 # # We don't get SOA records for single hosts
266 # self.assertEquals(response.ancount, 0)
268 def test_soa_domain_query(self):
269 "create a SOA query for a domain"
270 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
273 name = self.get_dns_domain()
274 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
277 self.finish_name_packet(p, questions)
278 response = self.dns_transaction_udp(p)
279 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
280 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
281 self.assertEquals(response.ancount, 1)
282 self.assertEquals(response.answers[0].rdata.minimum, 3600)
285 class TestDNSUpdates(DNSTest):
287 def test_two_updates(self):
288 "create two update requests"
289 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
292 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
293 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
296 name = self.get_dns_domain()
297 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
300 self.finish_name_packet(p, updates)
301 response = self.dns_transaction_udp(p)
302 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
304 def test_update_wrong_qclass(self):
305 "create update with DNS_QCLASS_NONE"
306 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
309 name = self.get_dns_domain()
310 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
313 self.finish_name_packet(p, updates)
314 response = self.dns_transaction_udp(p)
315 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
317 def test_update_prereq_with_non_null_ttl(self):
318 "test update with a non-null TTL"
319 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
322 name = self.get_dns_domain()
324 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
326 self.finish_name_packet(p, updates)
330 r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
331 r.rr_type = dns.DNS_QTYPE_TXT
332 r.rr_class = dns.DNS_QCLASS_NONE
337 p.ancount = len(prereqs)
340 response = self.dns_transaction_udp(p)
341 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
343 # I'd love to test this one, but it segfaults. :)
344 # def test_update_prereq_with_non_null_length(self):
345 # "test update with a non-null length"
346 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
349 # name = self.get_dns_domain()
351 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
353 # self.finish_name_packet(p, updates)
357 # r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
358 # r.rr_type = dns.DNS_QTYPE_TXT
359 # r.rr_class = dns.DNS_QCLASS_ANY
364 # p.ancount = len(prereqs)
365 # p.answers = prereqs
367 # response = self.dns_transaction_udp(p)
368 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
370 def test_update_prereq_nonexisting_name(self):
371 "test update with a nonexisting name"
372 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
375 name = self.get_dns_domain()
377 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
379 self.finish_name_packet(p, updates)
383 r.name = "idontexist.%s" % self.get_dns_domain()
384 r.rr_type = dns.DNS_QTYPE_TXT
385 r.rr_class = dns.DNS_QCLASS_ANY
390 p.ancount = len(prereqs)
393 response = self.dns_transaction_udp(p)
394 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
396 def test_update_add_txt_record(self):
397 "test adding records works"
398 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
401 name = self.get_dns_domain()
403 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
405 self.finish_name_packet(p, updates)
409 r.name = "textrec.%s" % self.get_dns_domain()
410 r.rr_type = dns.DNS_QTYPE_TXT
411 r.rr_class = dns.DNS_QCLASS_IN
414 rdata = dns.txt_record()
415 rdata.txt = '"This is a test"'
418 p.nscount = len(updates)
421 response = self.dns_transaction_udp(p)
422 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
424 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
427 name = "textrec.%s" % self.get_dns_domain()
428 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
431 self.finish_name_packet(p, questions)
432 response = self.dns_transaction_udp(p)
433 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
434 self.assertEquals(response.ancount, 1)
435 self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
437 def test_update_add_two_txt_records(self):
438 "test adding two txt records works"
439 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
442 name = self.get_dns_domain()
444 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
446 self.finish_name_packet(p, updates)
450 r.name = "textrec2.%s" % self.get_dns_domain()
451 r.rr_type = dns.DNS_QTYPE_TXT
452 r.rr_class = dns.DNS_QCLASS_IN
455 rdata = dns.txt_record()
456 rdata.txt = '"This is a test" "and this is a test, too"'
459 p.nscount = len(updates)
462 response = self.dns_transaction_udp(p)
463 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
465 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
468 name = "textrec2.%s" % self.get_dns_domain()
469 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
472 self.finish_name_packet(p, questions)
473 response = self.dns_transaction_udp(p)
474 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
475 self.assertEquals(response.ancount, 1)
476 self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
478 def test_delete_record(self):
479 "Test if deleting records works"
481 NAME = "deleterec.%s" % self.get_dns_domain()
483 # First, create a record to make sure we have a record to delete.
484 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
487 name = self.get_dns_domain()
489 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
491 self.finish_name_packet(p, updates)
496 r.rr_type = dns.DNS_QTYPE_TXT
497 r.rr_class = dns.DNS_QCLASS_IN
500 rdata = dns.txt_record()
501 rdata.txt = '"This is a test"'
504 p.nscount = len(updates)
507 response = self.dns_transaction_udp(p)
508 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
510 # Now check the record is around
511 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
513 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
516 self.finish_name_packet(p, questions)
517 response = self.dns_transaction_udp(p)
518 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
520 # Now delete the record
521 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
524 name = self.get_dns_domain()
526 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
528 self.finish_name_packet(p, updates)
533 r.rr_type = dns.DNS_QTYPE_TXT
534 r.rr_class = dns.DNS_QCLASS_NONE
537 rdata = dns.txt_record()
538 rdata.txt = '"This is a test"'
541 p.nscount = len(updates)
544 response = self.dns_transaction_udp(p)
545 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
547 # And finally check it's gone
548 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
551 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
554 self.finish_name_packet(p, questions)
555 response = self.dns_transaction_udp(p)
556 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
558 def test_readd_record(self):
559 "Test if adding, deleting and then readding a records works"
561 NAME = "readdrec.%s" % self.get_dns_domain()
564 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
567 name = self.get_dns_domain()
569 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
571 self.finish_name_packet(p, updates)
576 r.rr_type = dns.DNS_QTYPE_TXT
577 r.rr_class = dns.DNS_QCLASS_IN
580 rdata = dns.txt_record()
581 rdata.txt = '"This is a test"'
584 p.nscount = len(updates)
587 response = self.dns_transaction_udp(p)
588 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
590 # Now check the record is around
591 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
593 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
596 self.finish_name_packet(p, questions)
597 response = self.dns_transaction_udp(p)
598 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
600 # Now delete the record
601 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
604 name = self.get_dns_domain()
606 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
608 self.finish_name_packet(p, updates)
613 r.rr_type = dns.DNS_QTYPE_TXT
614 r.rr_class = dns.DNS_QCLASS_NONE
617 rdata = dns.txt_record()
618 rdata.txt = '"This is a test"'
621 p.nscount = len(updates)
624 response = self.dns_transaction_udp(p)
625 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
628 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
631 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
634 self.finish_name_packet(p, questions)
635 response = self.dns_transaction_udp(p)
636 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
638 # recreate the record
639 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
642 name = self.get_dns_domain()
644 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
646 self.finish_name_packet(p, updates)
651 r.rr_type = dns.DNS_QTYPE_TXT
652 r.rr_class = dns.DNS_QCLASS_IN
655 rdata = dns.txt_record()
656 rdata.txt = '"This is a test"'
659 p.nscount = len(updates)
662 response = self.dns_transaction_udp(p)
663 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
665 # Now check the record is around
666 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
668 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
671 self.finish_name_packet(p, questions)
672 response = self.dns_transaction_udp(p)
673 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
675 def test_update_add_mx_record(self):
676 "test adding MX records works"
677 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
680 name = self.get_dns_domain()
682 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
684 self.finish_name_packet(p, updates)
688 r.name = "%s" % self.get_dns_domain()
689 r.rr_type = dns.DNS_QTYPE_MX
690 r.rr_class = dns.DNS_QCLASS_IN
693 rdata = dns.mx_record()
694 rdata.preference = 10
695 rdata.exchange = 'mail.%s' % self.get_dns_domain()
698 p.nscount = len(updates)
701 response = self.dns_transaction_udp(p)
702 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
704 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
707 name = "%s" % self.get_dns_domain()
708 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
711 self.finish_name_packet(p, questions)
712 response = self.dns_transaction_udp(p)
713 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
714 self.assertEqual(response.ancount, 1)
715 ans = response.answers[0]
716 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
717 self.assertEqual(ans.rdata.preference, 10)
718 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
721 class TestComplexQueries(DNSTest):
724 super(TestComplexQueries, self).setUp()
725 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
728 name = self.get_dns_domain()
730 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
732 self.finish_name_packet(p, updates)
736 r.name = "cname_test.%s" % self.get_dns_domain()
737 r.rr_type = dns.DNS_QTYPE_CNAME
738 r.rr_class = dns.DNS_QCLASS_IN
741 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
743 p.nscount = len(updates)
746 response = self.dns_transaction_udp(p)
747 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
750 super(TestComplexQueries, self).tearDown()
751 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
754 name = self.get_dns_domain()
756 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
758 self.finish_name_packet(p, updates)
762 r.name = "cname_test.%s" % self.get_dns_domain()
763 r.rr_type = dns.DNS_QTYPE_CNAME
764 r.rr_class = dns.DNS_QCLASS_NONE
767 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
769 p.nscount = len(updates)
772 response = self.dns_transaction_udp(p)
773 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
775 def test_one_a_query(self):
776 "create a query packet containing one query record"
777 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
780 name = "cname_test.%s" % self.get_dns_domain()
781 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
782 print "asking for ", q.name
785 self.finish_name_packet(p, questions)
786 response = self.dns_transaction_udp(p)
787 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
788 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
789 self.assertEquals(response.ancount, 2)
790 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
791 self.assertEquals(response.answers[0].rdata, "%s.%s" %
792 (os.getenv('SERVER'), self.get_dns_domain()))
793 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
794 self.assertEquals(response.answers[1].rdata,
795 os.getenv('SERVER_IP'))
797 class TestInvalidQueries(DNSTest):
799 def test_one_a_query(self):
800 "send 0 bytes follows by create a query packet containing one query record"
804 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
805 s.connect((os.getenv('SERVER_IP'), 53))
811 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
814 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
815 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
816 print "asking for ", q.name
819 self.finish_name_packet(p, questions)
820 response = self.dns_transaction_udp(p)
821 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
822 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
823 self.assertEquals(response.ancount, 1)
824 self.assertEquals(response.answers[0].rdata,
825 os.getenv('SERVER_IP'))
827 def test_one_a_reply(self):
828 "send a reply instead of a query"
830 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
833 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
834 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
835 print "asking for ", q.name
838 self.finish_name_packet(p, questions)
839 p.operation |= dns.DNS_FLAG_REPLY
842 send_packet = ndr.ndr_pack(p)
843 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
844 host=os.getenv('SERVER_IP')
845 s.connect((host, 53))
846 tcp_packet = struct.pack('!H', len(send_packet))
847 tcp_packet += send_packet
848 s.send(tcp_packet, 0)
849 recv_packet = s.recv(0xffff + 2, 0)
850 self.assertEquals(0, len(recv_packet))
855 class TestZones(DNSTest):
856 def get_loadparm(self):
857 lp = param.LoadParm()
858 lp.load(os.getenv("SMB_CONF_PATH"))
861 def get_credentials(self, lp):
862 creds = credentials.Credentials()
864 creds.set_machine_account(lp)
865 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
869 super(TestZones, self).setUp()
870 self.lp = self.get_loadparm()
871 self.creds = self.get_credentials(self.lp)
872 self.server = os.getenv("SERVER_IP")
873 self.zone = "test.lan"
874 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
878 super(TestZones, self).tearDown()
880 self.delete_zone(self.zone)
881 except RuntimeError, (num, string):
882 if num != 9601: #WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
885 def create_zone(self, zone):
886 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
887 zone_create.pszZoneName = zone
888 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
889 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
890 zone_create.fAging = 0
891 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
892 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
898 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
901 def delete_zone(self, zone):
902 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
908 dnsserver.DNSSRV_TYPEID_NULL,
911 def test_soa_query(self):
913 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
916 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
918 self.finish_name_packet(p, questions)
920 response = self.dns_transaction_udp(p)
921 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
922 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
923 self.assertEquals(response.ancount, 0)
925 self.create_zone(zone)
926 response = self.dns_transaction_udp(p)
927 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
928 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
929 self.assertEquals(response.ancount, 1)
930 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
932 self.delete_zone(zone)
933 response = self.dns_transaction_udp(p)
934 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
935 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
936 self.assertEquals(response.ancount, 0)
940 if __name__ == "__main__":