1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
25 from ldb import ERR_OPERATIONS_ERROR
30 import samba.ndr as ndr
31 from samba import credentials
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
39 import samba.dcerpc.dnsp
42 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
43 sambaopts = options.SambaOptions(parser)
44 parser.add_option_group(sambaopts)
46 # This timeout only has relevance when testing against Windows
47 # Format errors tend to return patchy responses, so a timeout is needed.
48 parser.add_option("--timeout", type="int", dest="timeout",
49 help="Specify timeout for DNS requests")
51 # use command line creds if available
52 credopts = options.CredentialsOptions(parser)
53 parser.add_option_group(credopts)
54 subunitopts = SubunitOptions(parser)
55 parser.add_option_group(subunitopts)
57 opts, args = parser.parse_args()
59 lp = sambaopts.get_loadparm()
60 creds = credopts.get_credentials(lp)
62 timeout = opts.timeout
70 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
73 class TestSimpleQueries(DNSTest):
75 super(TestSimpleQueries, self).setUp()
76 global server, server_ip, lp, creds, timeout
77 self.server = server_name
78 self.server_ip = server_ip
81 self.timeout = timeout
83 def test_one_a_query(self):
84 "create a query packet containing one query record"
85 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
88 name = "%s.%s" % (self.server, self.get_dns_domain())
89 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
90 print("asking for ", q.name)
93 self.finish_name_packet(p, questions)
94 (response, response_packet) =\
95 self.dns_transaction_udp(p, host=server_ip)
96 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
97 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
98 self.assertEquals(response.ancount, 1)
99 self.assertEquals(response.answers[0].rdata,
102 def test_one_SOA_query(self):
103 "create a query packet containing one query record for the SOA"
104 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
107 name = "%s" % (self.get_dns_domain())
108 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
109 print("asking for ", q.name)
112 self.finish_name_packet(p, questions)
113 (response, response_packet) =\
114 self.dns_transaction_udp(p, host=server_ip)
115 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
116 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
117 self.assertEquals(response.ancount, 1)
119 response.answers[0].rdata.mname.upper(),
120 ("%s.%s" % (self.server, self.get_dns_domain())).upper())
122 def test_one_a_query_tcp(self):
123 "create a query packet containing one query record via TCP"
124 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
127 name = "%s.%s" % (self.server, self.get_dns_domain())
128 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
129 print("asking for ", q.name)
132 self.finish_name_packet(p, questions)
133 (response, response_packet) =\
134 self.dns_transaction_tcp(p, host=server_ip)
135 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
136 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
137 self.assertEquals(response.ancount, 1)
138 self.assertEquals(response.answers[0].rdata,
141 def test_one_mx_query(self):
142 "create a query packet causing an empty RCODE_OK answer"
143 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
146 name = "%s.%s" % (self.server, self.get_dns_domain())
147 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
148 print("asking for ", q.name)
151 self.finish_name_packet(p, questions)
152 (response, response_packet) =\
153 self.dns_transaction_udp(p, host=server_ip)
154 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
155 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
156 self.assertEquals(response.ancount, 0)
158 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
161 name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
162 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
163 print("asking for ", q.name)
166 self.finish_name_packet(p, questions)
167 (response, response_packet) =\
168 self.dns_transaction_udp(p, host=server_ip)
169 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
170 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
171 self.assertEquals(response.ancount, 0)
173 def test_two_queries(self):
174 "create a query packet containing two query records"
175 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
178 name = "%s.%s" % (self.server, self.get_dns_domain())
179 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
182 name = "%s.%s" % ('bogusname', self.get_dns_domain())
183 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
186 self.finish_name_packet(p, questions)
188 (response, response_packet) =\
189 self.dns_transaction_udp(p, host=server_ip)
190 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
191 except socket.timeout:
192 # Windows chooses not to respond to incorrectly formatted queries.
193 # Although this appears to be non-deterministic even for the same
194 # request twice, it also appears to be based on a how poorly the
195 # request is formatted.
198 def test_qtype_all_query(self):
199 "create a QTYPE_ALL query"
200 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
203 name = "%s.%s" % (self.server, self.get_dns_domain())
204 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
205 print("asking for ", q.name)
208 self.finish_name_packet(p, questions)
209 (response, response_packet) =\
210 self.dns_transaction_udp(p, host=server_ip)
213 dc_ipv6 = os.getenv('SERVER_IPV6')
214 if dc_ipv6 is not None:
217 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
218 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
219 self.assertEquals(response.ancount, num_answers)
220 self.assertEquals(response.answers[0].rdata,
222 if dc_ipv6 is not None:
223 self.assertEquals(response.answers[1].rdata, dc_ipv6)
225 def test_qclass_none_query(self):
226 "create a QCLASS_NONE query"
227 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
230 name = "%s.%s" % (self.server, self.get_dns_domain())
231 q = self.make_name_question(
237 self.finish_name_packet(p, questions)
239 (response, response_packet) =\
240 self.dns_transaction_udp(p, host=server_ip)
241 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
242 except socket.timeout:
243 # Windows chooses not to respond to incorrectly formatted queries.
244 # Although this appears to be non-deterministic even for the same
245 # request twice, it also appears to be based on a how poorly the
246 # request is formatted.
249 def test_soa_hostname_query(self):
250 "create a SOA query for a hostname"
251 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
254 name = "%s.%s" % (self.server, self.get_dns_domain())
255 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
258 self.finish_name_packet(p, questions)
259 (response, response_packet) =\
260 self.dns_transaction_udp(p, host=server_ip)
261 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
262 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
263 # We don't get SOA records for single hosts
264 self.assertEquals(response.ancount, 0)
265 # But we do respond with an authority section
266 self.assertEqual(response.nscount, 1)
268 def test_soa_domain_query(self):
269 "create a SOA query for a domain"
270 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
273 name = self.get_dns_domain()
274 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
277 self.finish_name_packet(p, questions)
278 (response, response_packet) =\
279 self.dns_transaction_udp(p, host=server_ip)
280 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
281 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
282 self.assertEquals(response.ancount, 1)
283 self.assertEquals(response.answers[0].rdata.minimum, 3600)
286 class TestDNSUpdates(DNSTest):
288 super(TestDNSUpdates, self).setUp()
289 global server, server_ip, lp, creds, timeout
290 self.server = server_name
291 self.server_ip = server_ip
294 self.timeout = timeout
296 def test_two_updates(self):
297 "create two update requests"
298 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
301 name = "%s.%s" % (self.server, self.get_dns_domain())
302 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
305 name = self.get_dns_domain()
306 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
309 self.finish_name_packet(p, updates)
311 (response, response_packet) =\
312 self.dns_transaction_udp(p, host=server_ip)
313 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
314 except socket.timeout:
315 # Windows chooses not to respond to incorrectly formatted queries.
316 # Although this appears to be non-deterministic even for the same
317 # request twice, it also appears to be based on a how poorly the
318 # request is formatted.
321 def test_update_wrong_qclass(self):
322 "create update with DNS_QCLASS_NONE"
323 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
326 name = self.get_dns_domain()
327 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
330 self.finish_name_packet(p, updates)
331 (response, response_packet) =\
332 self.dns_transaction_udp(p, host=server_ip)
333 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
335 def test_update_prereq_with_non_null_ttl(self):
336 "test update with a non-null TTL"
337 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
340 name = self.get_dns_domain()
342 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
344 self.finish_name_packet(p, updates)
348 r.name = "%s.%s" % (self.server, self.get_dns_domain())
349 r.rr_type = dns.DNS_QTYPE_TXT
350 r.rr_class = dns.DNS_QCLASS_NONE
355 p.ancount = len(prereqs)
359 (response, response_packet) =\
360 self.dns_transaction_udp(p, host=server_ip)
361 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
362 except socket.timeout:
363 # Windows chooses not to respond to incorrectly formatted queries.
364 # Although this appears to be non-deterministic even for the same
365 # request twice, it also appears to be based on a how poorly the
366 # request is formatted.
369 def test_update_prereq_with_non_null_length(self):
370 "test update with a non-null length"
371 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
374 name = self.get_dns_domain()
376 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
378 self.finish_name_packet(p, updates)
382 r.name = "%s.%s" % (self.server, self.get_dns_domain())
383 r.rr_type = dns.DNS_QTYPE_TXT
384 r.rr_class = dns.DNS_QCLASS_ANY
389 p.ancount = len(prereqs)
392 (response, response_packet) =\
393 self.dns_transaction_udp(p, host=server_ip)
394 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
396 def test_update_prereq_nonexisting_name(self):
397 "test update with a nonexisting name"
398 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
401 name = self.get_dns_domain()
403 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
405 self.finish_name_packet(p, updates)
409 r.name = "idontexist.%s" % self.get_dns_domain()
410 r.rr_type = dns.DNS_QTYPE_TXT
411 r.rr_class = dns.DNS_QCLASS_ANY
416 p.ancount = len(prereqs)
419 (response, response_packet) =\
420 self.dns_transaction_udp(p, host=server_ip)
421 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
423 def test_update_add_txt_record(self):
424 "test adding records works"
425 prefix, txt = 'textrec', ['"This is a test"']
426 p = self.make_txt_update(prefix, txt)
427 (response, response_packet) =\
428 self.dns_transaction_udp(p, host=server_ip)
429 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
430 self.check_query_txt(prefix, txt)
432 def test_delete_record(self):
433 "Test if deleting records works"
435 NAME = "deleterec.%s" % self.get_dns_domain()
437 # First, create a record to make sure we have a record to delete.
438 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
441 name = self.get_dns_domain()
443 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
445 self.finish_name_packet(p, updates)
450 r.rr_type = dns.DNS_QTYPE_TXT
451 r.rr_class = dns.DNS_QCLASS_IN
454 rdata = self.make_txt_record(['"This is a test"'])
457 p.nscount = len(updates)
460 (response, response_packet) =\
461 self.dns_transaction_udp(p, host=server_ip)
462 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
464 # Now check the record is around
465 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
467 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
470 self.finish_name_packet(p, questions)
471 (response, response_packet) =\
472 self.dns_transaction_udp(p, host=server_ip)
473 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
475 # Now delete the record
476 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
479 name = self.get_dns_domain()
481 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
483 self.finish_name_packet(p, updates)
488 r.rr_type = dns.DNS_QTYPE_TXT
489 r.rr_class = dns.DNS_QCLASS_NONE
492 rdata = self.make_txt_record(['"This is a test"'])
495 p.nscount = len(updates)
498 (response, response_packet) =\
499 self.dns_transaction_udp(p, host=server_ip)
500 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
502 # And finally check it's gone
503 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
506 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
509 self.finish_name_packet(p, questions)
510 (response, response_packet) =\
511 self.dns_transaction_udp(p, host=server_ip)
512 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
514 def test_readd_record(self):
515 "Test if adding, deleting and then readding a records works"
517 NAME = "readdrec.%s" % self.get_dns_domain()
520 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
523 name = self.get_dns_domain()
525 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
527 self.finish_name_packet(p, updates)
532 r.rr_type = dns.DNS_QTYPE_TXT
533 r.rr_class = dns.DNS_QCLASS_IN
536 rdata = self.make_txt_record(['"This is a test"'])
539 p.nscount = len(updates)
542 (response, response_packet) =\
543 self.dns_transaction_udp(p, host=server_ip)
544 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
546 # Now check the record is around
547 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
549 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
552 self.finish_name_packet(p, questions)
553 (response, response_packet) =\
554 self.dns_transaction_udp(p, host=server_ip)
555 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
557 # Now delete the record
558 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
561 name = self.get_dns_domain()
563 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
565 self.finish_name_packet(p, updates)
570 r.rr_type = dns.DNS_QTYPE_TXT
571 r.rr_class = dns.DNS_QCLASS_NONE
574 rdata = self.make_txt_record(['"This is a test"'])
577 p.nscount = len(updates)
580 (response, response_packet) =\
581 self.dns_transaction_udp(p, host=server_ip)
582 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
585 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
588 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
591 self.finish_name_packet(p, questions)
592 (response, response_packet) =\
593 self.dns_transaction_udp(p, host=server_ip)
594 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
596 # recreate the record
597 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
600 name = self.get_dns_domain()
602 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
604 self.finish_name_packet(p, updates)
609 r.rr_type = dns.DNS_QTYPE_TXT
610 r.rr_class = dns.DNS_QCLASS_IN
613 rdata = self.make_txt_record(['"This is a test"'])
616 p.nscount = len(updates)
619 (response, response_packet) =\
620 self.dns_transaction_udp(p, host=server_ip)
621 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
623 # Now check the record is around
624 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
626 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
629 self.finish_name_packet(p, questions)
630 (response, response_packet) =\
631 self.dns_transaction_udp(p, host=server_ip)
632 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
634 def test_update_add_mx_record(self):
635 "test adding MX records works"
636 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
639 name = self.get_dns_domain()
641 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
643 self.finish_name_packet(p, updates)
647 r.name = "%s" % self.get_dns_domain()
648 r.rr_type = dns.DNS_QTYPE_MX
649 r.rr_class = dns.DNS_QCLASS_IN
652 rdata = dns.mx_record()
653 rdata.preference = 10
654 rdata.exchange = 'mail.%s' % self.get_dns_domain()
657 p.nscount = len(updates)
660 (response, response_packet) =\
661 self.dns_transaction_udp(p, host=server_ip)
662 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
664 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
667 name = "%s" % self.get_dns_domain()
668 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
671 self.finish_name_packet(p, questions)
672 (response, response_packet) =\
673 self.dns_transaction_udp(p, host=server_ip)
674 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
675 self.assertEqual(response.ancount, 1)
676 ans = response.answers[0]
677 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
678 self.assertEqual(ans.rdata.preference, 10)
679 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
682 class TestComplexQueries(DNSTest):
683 def make_dns_update(self, key, value, qtype):
684 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
686 name = self.get_dns_domain()
687 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
688 self.finish_name_packet(p, [u])
693 r.rr_class = dns.DNS_QCLASS_IN
699 (response, response_packet) =\
700 self.dns_transaction_udp(p, host=server_ip)
701 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
704 super(TestComplexQueries, self).setUp()
706 global server, server_ip, lp, creds, timeout
707 self.server = server_name
708 self.server_ip = server_ip
711 self.timeout = timeout
713 def test_one_a_query(self):
714 "create a query packet containing one query record"
719 name = "cname_test.%s" % self.get_dns_domain()
720 rdata = "%s.%s" % (self.server, self.get_dns_domain())
721 self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
723 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
727 name = "cname_test.%s" % self.get_dns_domain()
728 q = self.make_name_question(name,
731 print("asking for ", q.name)
734 self.finish_name_packet(p, questions)
735 (response, response_packet) =\
736 self.dns_transaction_udp(p, host=self.server_ip)
737 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
738 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
739 self.assertEquals(response.ancount, 2)
740 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
741 self.assertEquals(response.answers[0].rdata, "%s.%s" %
742 (self.server, self.get_dns_domain()))
743 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
744 self.assertEquals(response.answers[1].rdata,
749 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
752 name = self.get_dns_domain()
754 u = self.make_name_question(name,
758 self.finish_name_packet(p, updates)
762 r.name = "cname_test.%s" % self.get_dns_domain()
763 r.rr_type = dns.DNS_QTYPE_CNAME
764 r.rr_class = dns.DNS_QCLASS_NONE
767 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
769 p.nscount = len(updates)
772 (response, response_packet) =\
773 self.dns_transaction_udp(p, host=self.server_ip)
774 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
776 def test_cname_two_chain(self):
777 name0 = "cnamechain0.%s" % self.get_dns_domain()
778 name1 = "cnamechain1.%s" % self.get_dns_domain()
779 name2 = "cnamechain2.%s" % self.get_dns_domain()
780 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
781 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
782 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
784 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
786 q = self.make_name_question(name1, dns.DNS_QTYPE_A,
790 self.finish_name_packet(p, questions)
791 (response, response_packet) =\
792 self.dns_transaction_udp(p, host=server_ip)
793 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
794 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
795 self.assertEquals(response.ancount, 3)
797 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
798 self.assertEquals(response.answers[0].name, name1)
799 self.assertEquals(response.answers[0].rdata, name2)
801 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
802 self.assertEquals(response.answers[1].name, name2)
803 self.assertEquals(response.answers[1].rdata, name0)
805 self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
806 self.assertEquals(response.answers[2].rdata,
809 def test_invalid_empty_cname(self):
810 name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
812 self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
813 except AssertionError:
816 self.fail("Successfully added empty CNAME, which is invalid.")
818 def test_cname_two_chain_not_matching_qtype(self):
819 name0 = "cnamechain0.%s" % self.get_dns_domain()
820 name1 = "cnamechain1.%s" % self.get_dns_domain()
821 name2 = "cnamechain2.%s" % self.get_dns_domain()
822 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
823 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
824 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
826 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
828 q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
832 self.finish_name_packet(p, questions)
833 (response, response_packet) =\
834 self.dns_transaction_udp(p, host=server_ip)
835 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
836 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
838 # CNAME should return all intermediate results!
839 # Only the A records exists, not the TXT.
840 self.assertEquals(response.ancount, 2)
842 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
843 self.assertEquals(response.answers[0].name, name1)
844 self.assertEquals(response.answers[0].rdata, name2)
846 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
847 self.assertEquals(response.answers[1].name, name2)
848 self.assertEquals(response.answers[1].rdata, name0)
851 class TestInvalidQueries(DNSTest):
853 super(TestInvalidQueries, self).setUp()
854 global server, server_ip, lp, creds, timeout
855 self.server = server_name
856 self.server_ip = server_ip
859 self.timeout = timeout
861 def test_one_a_query(self):
862 """send 0 bytes follows by create a query packet
863 containing one query record"""
867 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
868 s.connect((self.server_ip, 53))
874 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
877 name = "%s.%s" % (self.server, self.get_dns_domain())
878 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
879 print("asking for ", q.name)
882 self.finish_name_packet(p, questions)
883 (response, response_packet) =\
884 self.dns_transaction_udp(p, host=self.server_ip)
885 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
886 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
887 self.assertEquals(response.ancount, 1)
888 self.assertEquals(response.answers[0].rdata,
891 def test_one_a_reply(self):
892 "send a reply instead of a query"
895 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
898 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
899 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
900 print("asking for ", q.name)
903 self.finish_name_packet(p, questions)
904 p.operation |= dns.DNS_FLAG_REPLY
907 send_packet = ndr.ndr_pack(p)
908 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
909 s.settimeout(timeout)
910 host = self.server_ip
911 s.connect((host, 53))
912 tcp_packet = struct.pack('!H', len(send_packet))
913 tcp_packet += send_packet
914 s.send(tcp_packet, 0)
915 recv_packet = s.recv(0xffff + 2, 0)
916 self.assertEquals(0, len(recv_packet))
917 except socket.timeout:
918 # Windows chooses not to respond to incorrectly formatted queries.
919 # Although this appears to be non-deterministic even for the same
920 # request twice, it also appears to be based on a how poorly the
921 # request is formatted.
928 class TestZones(DNSTest):
930 super(TestZones, self).setUp()
931 global server, server_ip, lp, creds, timeout
932 self.server = server_name
933 self.server_ip = server_ip
936 self.timeout = timeout
938 self.zone = "test.lan"
939 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
943 self.samdb = SamDB(url="ldap://" + self.server_ip,
944 lp=self.get_loadparm(),
945 session_info=system_session(),
946 credentials=self.creds)
947 self.zone_dn = "DC=" + self.zone +\
948 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
949 str(self.samdb.get_default_basedn())
952 super(TestZones, self).tearDown()
955 self.delete_zone(self.zone)
956 except RuntimeError as e:
957 (num, string) = e.args
958 if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
961 def create_zone(self, zone, aging_enabled=False):
962 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
963 zone_create.pszZoneName = zone
964 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
965 zone_create.fAging = int(aging_enabled)
966 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
967 zone_create.fDsIntegrated = 1
968 zone_create.fLoadExisting = 1
969 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
971 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
972 self.rpc_conn.DnssrvOperation2(client_version,
978 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
980 except WERRORError as e:
983 def set_params(self, **kwargs):
984 zone = kwargs.pop('zone', None)
985 for key, val in kwargs.items():
986 name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
987 name_param.dwParam = val
988 name_param.pszNodeName = key
990 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
991 nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
993 self.rpc_conn.DnssrvOperation2(client_version,
998 'ResetDwordProperty',
1001 except WERRORError as e:
1004 def ldap_modify_dnsrecs(self, name, func):
1005 dn = 'DC={},{}'.format(name, self.zone_dn)
1006 dns_recs = self.ldap_get_dns_records(name)
1007 for rec in dns_recs:
1009 update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1010 self.samdb.modify(ldb.Message.from_dict(self.samdb,
1012 ldb.FLAG_MOD_REPLACE))
1014 def dns_update_record(self, prefix, txt):
1015 p = self.make_txt_update(prefix, txt, self.zone)
1016 (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1017 self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1018 recs = self.ldap_get_dns_records(prefix)
1019 recs = [r for r in recs if r.data.str == txt]
1020 self.assertEqual(len(recs), 1)
1023 def dns_tombstone(self, prefix, txt, zone):
1024 name = prefix + "." + zone
1026 to = dnsp.DnssrvRpcRecord()
1027 to.dwTimeStamp = 1000
1028 to.wType = dnsp.DNS_TYPE_TOMBSTONE
1030 self.samdb.dns_replace(name, [to])
1032 def ldap_get_records(self, name):
1033 # The use of SCOPE_SUBTREE here avoids raising an exception in the
1034 # 0 results case for a test below.
1036 expr = "(&(objectClass=dnsNode)(name={}))".format(name)
1037 return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1038 expression=expr, attrs=["*"])
1040 def ldap_get_dns_records(self, name):
1041 records = self.ldap_get_records(name)
1042 return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1043 for r in records[0].get('dnsRecord')]
1045 def ldap_get_zone_settings(self):
1046 records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1047 expression="(&(objectClass=dnsZone)" +
1048 "(name={}))".format(self.zone),
1049 attrs=["dNSProperty"])
1050 self.assertEqual(len(records), 1)
1051 props = [ndr_unpack(dnsp.DnsProperty, r)
1052 for r in records[0].get('dNSProperty')]
1054 # We have no choice but to repeat these here.
1055 zone_prop_ids = {0x00: "EMPTY",
1057 0x02: "ALLOW_UPDATE",
1058 0x08: "SECURE_TIME",
1059 0x10: "NOREFRESH_INTERVAL",
1060 0x11: "SCAVENGING_SERVERS",
1061 0x12: "AGING_ENABLED_TIME",
1062 0x20: "REFRESH_INTERVAL",
1063 0x40: "AGING_STATE",
1064 0x80: "DELETED_FROM_HOSTNAME",
1065 0x81: "MASTER_SERVERS",
1066 0x82: "AUTO_NS_SERVERS",
1067 0x83: "DCPROMO_CONVERT",
1068 0x90: "SCAVENGING_SERVERS_DA",
1069 0x91: "MASTER_SERVERS_DA",
1070 0x92: "NS_SERVERS_DA",
1071 0x100: "NODE_DBFLAGS"}
1072 return {zone_prop_ids[p.id].lower(): p.data for p in props}
1074 def set_aging(self, enable=False):
1075 self.create_zone(self.zone, aging_enabled=enable)
1076 self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1077 Aging=int(bool(enable)), zone=self.zone,
1078 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1080 def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1081 self.set_aging(enable=True)
1082 settings = self.ldap_get_zone_settings()
1083 self.assertTrue(settings['aging_state'] is not None)
1084 self.assertTrue(settings['aging_state'])
1086 rec = self.dns_update_record('agingtest', ['test txt'])
1087 self.assertNotEqual(rec.dwTimeStamp, 0)
1089 def test_set_aging_disabled(self):
1090 self.set_aging(enable=False)
1091 settings = self.ldap_get_zone_settings()
1092 self.assertTrue(settings['aging_state'] is not None)
1093 self.assertFalse(settings['aging_state'])
1095 rec = self.dns_update_record('agingtest', ['test txt'])
1096 self.assertNotEqual(rec.dwTimeStamp, 0)
1098 def test_aging_update(self, enable=True):
1099 name, txt = 'agingtest', ['test txt']
1100 self.set_aging(enable=True)
1101 before_mod = self.dns_update_record(name, txt)
1103 self.set_params(zone=self.zone, Aging=0)
1107 self.assertTrue(rec.dwTimeStamp > 0)
1108 rec.dwTimeStamp -= dec
1109 self.ldap_modify_dnsrecs(name, mod_ts)
1110 after_mod = self.ldap_get_dns_records(name)
1111 self.assertEqual(len(after_mod), 1)
1112 after_mod = after_mod[0]
1113 self.assertEqual(after_mod.dwTimeStamp,
1114 before_mod.dwTimeStamp - dec)
1115 after_update = self.dns_update_record(name, txt)
1116 after_should_equal = before_mod if enable else after_mod
1117 self.assertEqual(after_should_equal.dwTimeStamp,
1118 after_update.dwTimeStamp)
1120 def test_aging_update_disabled(self):
1121 self.test_aging_update(enable=False)
1123 def test_aging_refresh(self):
1124 name, txt = 'agingtest', ['test txt']
1125 self.create_zone(self.zone, aging_enabled=True)
1127 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1128 Aging=1, zone=self.zone,
1129 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1130 before_mod = self.dns_update_record(name, txt)
1133 self.assertTrue(rec.dwTimeStamp > 0)
1134 rec.dwTimeStamp -= interval / 2
1135 self.ldap_modify_dnsrecs(name, mod_ts)
1136 update_during_norefresh = self.dns_update_record(name, txt)
1139 self.assertTrue(rec.dwTimeStamp > 0)
1140 rec.dwTimeStamp -= interval + interval / 2
1141 self.ldap_modify_dnsrecs(name, mod_ts)
1142 update_during_refresh = self.dns_update_record(name, txt)
1143 self.assertEqual(update_during_norefresh.dwTimeStamp,
1144 before_mod.dwTimeStamp - interval / 2)
1145 self.assertEqual(update_during_refresh.dwTimeStamp,
1146 before_mod.dwTimeStamp)
1148 def test_rpc_add_no_timestamp(self):
1149 name, txt = 'agingtest', ['test txt']
1150 self.set_aging(enable=True)
1151 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1152 rec_buf.rec = TXTRecord(txt)
1153 self.rpc_conn.DnssrvUpdateRecord2(
1154 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1161 recs = self.ldap_get_dns_records(name)
1162 self.assertEqual(len(recs), 1)
1163 self.assertEqual(recs[0].dwTimeStamp, 0)
1165 def test_static_record_dynamic_update(self):
1166 name, txt = 'agingtest', ['test txt']
1167 txt2 = ['test txt2']
1168 self.set_aging(enable=True)
1169 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1170 rec_buf.rec = TXTRecord(txt)
1171 self.rpc_conn.DnssrvUpdateRecord2(
1172 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1180 rec2 = self.dns_update_record(name, txt2)
1181 self.assertEqual(rec2.dwTimeStamp, 0)
1183 def test_dynamic_record_static_update(self):
1184 name, txt = 'agingtest', ['test txt']
1185 txt2 = ['test txt2']
1186 txt3 = ['test txt3']
1187 self.set_aging(enable=True)
1189 self.dns_update_record(name, txt)
1191 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1192 rec_buf.rec = TXTRecord(txt2)
1193 self.rpc_conn.DnssrvUpdateRecord2(
1194 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1202 self.dns_update_record(name, txt3)
1204 recs = self.ldap_get_dns_records(name)
1205 # Put in dict because ldap recs might be out of order
1206 recs = {str(r.data.str): r for r in recs}
1207 self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1208 self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1209 self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1211 def test_dns_tombstone_custom_match_rule(self):
1212 lp = self.get_loadparm()
1213 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1214 session_info=system_session(),
1215 credentials=self.creds)
1217 name, txt = 'agingtest', ['test txt']
1218 name2, txt2 = 'agingtest2', ['test txt2']
1219 name3, txt3 = 'agingtest3', ['test txt3']
1220 name4, txt4 = 'agingtest4', ['test txt4']
1221 name5, txt5 = 'agingtest5', ['test txt5']
1223 self.create_zone(self.zone, aging_enabled=True)
1225 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1226 Aging=1, zone=self.zone,
1227 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1229 self.dns_update_record(name, txt)
1231 self.dns_update_record(name2, txt)
1232 self.dns_update_record(name2, txt2)
1234 self.dns_update_record(name3, txt)
1235 self.dns_update_record(name3, txt2)
1236 last_update = self.dns_update_record(name3, txt3)
1238 # Modify txt1 of the first 2 names
1240 if rec.data.str == txt:
1241 rec.dwTimeStamp -= 2
1242 self.ldap_modify_dnsrecs(name, mod_ts)
1243 self.ldap_modify_dnsrecs(name2, mod_ts)
1245 # create a static dns record.
1246 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1247 rec_buf.rec = TXTRecord(txt4)
1248 self.rpc_conn.DnssrvUpdateRecord2(
1249 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1257 # Create a tomb stoned record.
1258 self.dns_update_record(name5, txt5)
1259 self.dns_tombstone(name5, txt5, self.zone)
1261 self.ldap_get_dns_records(name3)
1262 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
1263 expr = expr.format(int(last_update.dwTimeStamp) - 1)
1265 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1266 expression=expr, attrs=["*"])
1267 except ldb.LdbError as e:
1269 updated_names = {str(r.get('name')) for r in res}
1270 self.assertEqual(updated_names, set([name, name2]))
1272 def test_dns_tombstone_custom_match_rule_no_records(self):
1273 lp = self.get_loadparm()
1274 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1275 session_info=system_session(),
1276 credentials=self.creds)
1278 self.create_zone(self.zone, aging_enabled=True)
1280 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1281 Aging=1, zone=self.zone,
1282 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1284 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
1285 expr = expr.format(1)
1288 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1289 expression=expr, attrs=["*"])
1290 except ldb.LdbError as e:
1292 self.assertEqual(0, len(res))
1294 def test_dns_tombstone_custom_match_rule_fail(self):
1295 self.create_zone(self.zone, aging_enabled=True)
1296 samdb = SamDB(url=lp.samdb_url(),
1298 session_info=system_session(),
1299 credentials=self.creds)
1301 # Property name in not dnsRecord
1302 expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1303 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1304 expression=expr, attrs=["*"])
1305 self.assertEquals(len(res), 0)
1307 # No value for tombstone time
1309 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1310 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1311 expression=expr, attrs=["*"])
1312 self.assertEquals(len(res), 0)
1313 self.fail("Exception: ldb.ldbError not generated")
1314 except ldb.LdbError as e:
1316 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1318 # Tombstone time = -
1320 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1321 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1322 expression=expr, attrs=["*"])
1323 self.assertEquals(len(res), 0)
1324 self.fail("Exception: ldb.ldbError not generated")
1325 except ldb.LdbError as e:
1327 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1329 # Tombstone time longer than 64 characters
1331 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
1332 expr = expr.format("1" * 65)
1333 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1334 expression=expr, attrs=["*"])
1335 self.assertEquals(len(res), 0)
1336 self.fail("Exception: ldb.ldbError not generated")
1337 except ldb.LdbError as e:
1339 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1341 # Non numeric Tombstone time
1343 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1344 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1345 expression=expr, attrs=["*"])
1346 self.assertEquals(len(res), 0)
1347 self.fail("Exception: ldb.ldbError not generated")
1348 except ldb.LdbError as e:
1350 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1352 # Non system session
1354 db = SamDB(url="ldap://" + self.server_ip,
1355 lp=self.get_loadparm(),
1356 credentials=self.creds)
1358 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1359 res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1360 expression=expr, attrs=["*"])
1361 self.assertEquals(len(res), 0)
1362 self.fail("Exception: ldb.ldbError not generated")
1363 except ldb.LdbError as e:
1365 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1367 def test_basic_scavenging(self):
1368 lp = self.get_loadparm()
1369 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1370 session_info=system_session(),
1371 credentials=self.creds)
1373 self.create_zone(self.zone, aging_enabled=True)
1375 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1376 zone=self.zone, Aging=1,
1377 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1378 name, txt = 'agingtest', ['test txt']
1379 name2, txt2 = 'agingtest2', ['test txt2']
1380 name3, txt3 = 'agingtest3', ['test txt3']
1381 self.dns_update_record(name, txt)
1382 self.dns_update_record(name2, txt)
1383 self.dns_update_record(name2, txt2)
1384 self.dns_update_record(name3, txt)
1385 self.dns_update_record(name3, txt2)
1386 last_add = self.dns_update_record(name3, txt3)
1389 self.assertTrue(rec.dwTimeStamp > 0)
1390 if rec.data.str == txt:
1391 rec.dwTimeStamp -= interval * 5
1392 self.ldap_modify_dnsrecs(name, mod_ts)
1393 self.ldap_modify_dnsrecs(name2, mod_ts)
1394 self.ldap_modify_dnsrecs(name3, mod_ts)
1395 self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1396 dsdb._scavenge_dns_records(self.samdb)
1398 recs = self.ldap_get_dns_records(name)
1399 self.assertEqual(len(recs), 1)
1400 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1402 recs = self.ldap_get_dns_records(name2)
1403 self.assertEqual(len(recs), 1)
1404 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1405 self.assertEqual(recs[0].data.str, txt2)
1407 recs = self.ldap_get_dns_records(name3)
1408 self.assertEqual(len(recs), 2)
1409 txts = {str(r.data.str) for r in recs}
1410 self.assertEqual(txts, {str(txt2), str(txt3)})
1411 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1412 self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1414 for make_it_work in [False, True]:
1415 inc = -1 if make_it_work else 1
1418 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1419 self.ldap_modify_dnsrecs(name, mod_ts)
1420 dsdb._dns_delete_tombstones(self.samdb)
1421 recs = self.ldap_get_records(name)
1423 self.assertEqual(len(recs), 0)
1425 self.assertEqual(len(recs), 1)
1427 def delete_zone(self, zone):
1428 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1434 dnsserver.DNSSRV_TYPEID_NULL,
1437 def test_soa_query(self):
1439 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1442 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1444 self.finish_name_packet(p, questions)
1446 (response, response_packet) =\
1447 self.dns_transaction_udp(p, host=server_ip)
1448 # Windows returns OK while BIND logically seems to return NXDOMAIN
1449 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1450 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1451 self.assertEquals(response.ancount, 0)
1453 self.create_zone(zone)
1454 (response, response_packet) =\
1455 self.dns_transaction_udp(p, host=server_ip)
1456 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1457 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1458 self.assertEquals(response.ancount, 1)
1459 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1461 self.delete_zone(zone)
1462 (response, response_packet) =\
1463 self.dns_transaction_udp(p, host=server_ip)
1464 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1465 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1466 self.assertEquals(response.ancount, 0)
1469 class TestRPCRoundtrip(DNSTest):
1471 super(TestRPCRoundtrip, self).setUp()
1472 global server, server_ip, lp, creds
1473 self.server = server_name
1474 self.server_ip = server_ip
1477 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1483 super(TestRPCRoundtrip, self).tearDown()
1485 def test_update_add_txt_rpc_to_dns(self):
1486 prefix, txt = 'rpctextrec', ['"This is a test"']
1488 name = "%s.%s" % (prefix, self.get_dns_domain())
1490 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1491 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1492 add_rec_buf.rec = rec
1494 self.rpc_conn.DnssrvUpdateRecord2(
1495 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1498 self.get_dns_domain(),
1503 except WERRORError as e:
1507 self.check_query_txt(prefix, txt)
1509 self.rpc_conn.DnssrvUpdateRecord2(
1510 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1513 self.get_dns_domain(),
1518 def test_update_add_null_padded_txt_record(self):
1519 "test adding records works"
1520 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1521 p = self.make_txt_update(prefix, txt)
1522 (response, response_packet) =\
1523 self.dns_transaction_udp(p, host=server_ip)
1524 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1525 self.check_query_txt(prefix, txt)
1526 self.assertIsNotNone(
1527 dns_record_match(self.rpc_conn,
1529 self.get_dns_domain(),
1530 "%s.%s" % (prefix, self.get_dns_domain()),
1532 '"\\"This is a test\\"" "" ""'))
1534 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1535 p = self.make_txt_update(prefix, txt)
1536 (response, response_packet) =\
1537 self.dns_transaction_udp(p, host=server_ip)
1538 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1539 self.check_query_txt(prefix, txt)
1540 self.assertIsNotNone(
1544 self.get_dns_domain(),
1545 "%s.%s" % (prefix, self.get_dns_domain()),
1547 '"\\"This is a test\\"" "" "" "more text"'))
1549 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1550 p = self.make_txt_update(prefix, txt)
1551 (response, response_packet) =\
1552 self.dns_transaction_udp(p, host=server_ip)
1553 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1554 self.check_query_txt(prefix, txt)
1555 self.assertIsNotNone(
1559 self.get_dns_domain(),
1560 "%s.%s" % (prefix, self.get_dns_domain()),
1562 '"" "" "\\"This is a test\\""'))
1564 def test_update_add_padding_rpc_to_dns(self):
1565 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1566 prefix = 'rpc' + prefix
1567 name = "%s.%s" % (prefix, self.get_dns_domain())
1569 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1570 '"\\"This is a test\\"" "" ""')
1571 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1572 add_rec_buf.rec = rec
1574 self.rpc_conn.DnssrvUpdateRecord2(
1575 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1578 self.get_dns_domain(),
1583 except WERRORError as e:
1587 self.check_query_txt(prefix, txt)
1589 self.rpc_conn.DnssrvUpdateRecord2(
1590 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1593 self.get_dns_domain(),
1598 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1599 prefix = 'rpc' + prefix
1600 name = "%s.%s" % (prefix, self.get_dns_domain())
1602 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1603 '"\\"This is a test\\"" "" "" "more text"')
1604 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1605 add_rec_buf.rec = rec
1607 self.rpc_conn.DnssrvUpdateRecord2(
1608 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1611 self.get_dns_domain(),
1616 except WERRORError as e:
1620 self.check_query_txt(prefix, txt)
1622 self.rpc_conn.DnssrvUpdateRecord2(
1623 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1626 self.get_dns_domain(),
1631 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1632 prefix = 'rpc' + prefix
1633 name = "%s.%s" % (prefix, self.get_dns_domain())
1635 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1636 '"" "" "\\"This is a test\\""')
1637 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1638 add_rec_buf.rec = rec
1640 self.rpc_conn.DnssrvUpdateRecord2(
1641 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1644 self.get_dns_domain(),
1648 except WERRORError as e:
1652 self.check_query_txt(prefix, txt)
1654 self.rpc_conn.DnssrvUpdateRecord2(
1655 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1658 self.get_dns_domain(),
1663 # Test is incomplete due to strlen against txt records
1664 def test_update_add_null_char_txt_record(self):
1665 "test adding records works"
1666 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1667 p = self.make_txt_update(prefix, txt)
1668 (response, response_packet) =\
1669 self.dns_transaction_udp(p, host=server_ip)
1670 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1671 self.check_query_txt(prefix, ['NULL'])
1672 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1673 self.get_dns_domain(),
1674 "%s.%s" % (prefix, self.get_dns_domain()),
1675 dnsp.DNS_TYPE_TXT, '"NULL"'))
1677 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1678 p = self.make_txt_update(prefix, txt)
1679 (response, response_packet) =\
1680 self.dns_transaction_udp(p, host=server_ip)
1681 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1682 self.check_query_txt(prefix, ['NULL', 'NULL'])
1683 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1684 self.get_dns_domain(),
1685 "%s.%s" % (prefix, self.get_dns_domain()),
1686 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1688 def test_update_add_null_char_rpc_to_dns(self):
1689 prefix = 'rpcnulltextrec'
1690 name = "%s.%s" % (prefix, self.get_dns_domain())
1692 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1693 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1694 add_rec_buf.rec = rec
1696 self.rpc_conn.DnssrvUpdateRecord2(
1697 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1700 self.get_dns_domain(),
1705 except WERRORError as e:
1709 self.check_query_txt(prefix, ['NULL'])
1711 self.rpc_conn.DnssrvUpdateRecord2(
1712 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1715 self.get_dns_domain(),
1720 def test_update_add_hex_char_txt_record(self):
1721 "test adding records works"
1722 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1723 p = self.make_txt_update(prefix, txt)
1724 (response, response_packet) =\
1725 self.dns_transaction_udp(p, host=server_ip)
1726 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1727 self.check_query_txt(prefix, txt)
1728 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1729 self.get_dns_domain(),
1730 "%s.%s" % (prefix, self.get_dns_domain()),
1731 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1733 def test_update_add_hex_rpc_to_dns(self):
1734 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1735 prefix = 'rpc' + prefix
1736 name = "%s.%s" % (prefix, self.get_dns_domain())
1738 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1739 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1740 add_rec_buf.rec = rec
1742 self.rpc_conn.DnssrvUpdateRecord2(
1743 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1746 self.get_dns_domain(),
1751 except WERRORError as e:
1755 self.check_query_txt(prefix, txt)
1757 self.rpc_conn.DnssrvUpdateRecord2(
1758 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1761 self.get_dns_domain(),
1766 def test_update_add_slash_txt_record(self):
1767 "test adding records works"
1768 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1769 p = self.make_txt_update(prefix, txt)
1770 (response, response_packet) =\
1771 self.dns_transaction_udp(p, host=server_ip)
1772 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1773 self.check_query_txt(prefix, txt)
1774 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1775 self.get_dns_domain(),
1776 "%s.%s" % (prefix, self.get_dns_domain()),
1777 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1779 # This test fails against Windows as it eliminates slashes in RPC
1780 # One typical use for a slash is in records like 'var=value' to
1781 # escape '=' characters.
1782 def test_update_add_slash_rpc_to_dns(self):
1783 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1784 prefix = 'rpc' + prefix
1785 name = "%s.%s" % (prefix, self.get_dns_domain())
1787 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1788 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1789 add_rec_buf.rec = rec
1791 self.rpc_conn.DnssrvUpdateRecord2(
1792 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1795 self.get_dns_domain(),
1800 except WERRORError as e:
1804 self.check_query_txt(prefix, txt)
1807 self.rpc_conn.DnssrvUpdateRecord2(
1808 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1811 self.get_dns_domain(),
1816 def test_update_add_two_txt_records(self):
1817 "test adding two txt records works"
1818 prefix, txt = 'textrec2', ['"This is a test"',
1819 '"and this is a test, too"']
1820 p = self.make_txt_update(prefix, txt)
1821 (response, response_packet) =\
1822 self.dns_transaction_udp(p, host=server_ip)
1823 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1824 self.check_query_txt(prefix, txt)
1825 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1826 self.get_dns_domain(),
1827 "%s.%s" % (prefix, self.get_dns_domain()),
1828 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1829 ' "\\"and this is a test, too\\""'))
1831 def test_update_add_two_rpc_to_dns(self):
1832 prefix, txt = 'textrec2', ['"This is a test"',
1833 '"and this is a test, too"']
1834 prefix = 'rpc' + prefix
1835 name = "%s.%s" % (prefix, self.get_dns_domain())
1837 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1838 '"\\"This is a test\\""' +
1839 ' "\\"and this is a test, too\\""')
1840 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1841 add_rec_buf.rec = rec
1843 self.rpc_conn.DnssrvUpdateRecord2(
1844 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1847 self.get_dns_domain(),
1852 except WERRORError as e:
1856 self.check_query_txt(prefix, txt)
1858 self.rpc_conn.DnssrvUpdateRecord2(
1859 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1862 self.get_dns_domain(),
1867 def test_update_add_empty_txt_records(self):
1868 "test adding two txt records works"
1869 prefix, txt = 'emptytextrec', []
1870 p = self.make_txt_update(prefix, txt)
1871 (response, response_packet) =\
1872 self.dns_transaction_udp(p, host=server_ip)
1873 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1874 self.check_query_txt(prefix, txt)
1875 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1876 self.get_dns_domain(),
1877 "%s.%s" % (prefix, self.get_dns_domain()),
1878 dnsp.DNS_TYPE_TXT, ''))
1880 def test_update_add_empty_rpc_to_dns(self):
1881 prefix, txt = 'rpcemptytextrec', []
1883 name = "%s.%s" % (prefix, self.get_dns_domain())
1885 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1886 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1887 add_rec_buf.rec = rec
1889 self.rpc_conn.DnssrvUpdateRecord2(
1890 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1893 self.get_dns_domain(),
1897 except WERRORError as e:
1901 self.check_query_txt(prefix, txt)
1903 self.rpc_conn.DnssrvUpdateRecord2(
1904 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1907 self.get_dns_domain(),
1912 TestProgram(module=__name__, opts=subunitopts)