1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
29 import samba.ndr as ndr
30 from samba import credentials
31 from samba.dcerpc import dns, dnsp, dnsserver
32 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
33 from samba.tests.subunitrun import SubunitOptions, TestProgram
34 from samba import werror, WERRORError
35 from samba.tests.dns_base import DNSTest
36 import samba.getopt as options
39 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
40 sambaopts = options.SambaOptions(parser)
41 parser.add_option_group(sambaopts)
43 # This timeout only has relevance when testing against Windows
44 # Format errors tend to return patchy responses, so a timeout is needed.
45 parser.add_option("--timeout", type="int", dest="timeout",
46 help="Specify timeout for DNS requests")
48 # use command line creds if available
49 credopts = options.CredentialsOptions(parser)
50 parser.add_option_group(credopts)
51 subunitopts = SubunitOptions(parser)
52 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
56 lp = sambaopts.get_loadparm()
57 creds = credopts.get_credentials(lp)
59 timeout = opts.timeout
67 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
70 class TestSimpleQueries(DNSTest):
72 super(TestSimpleQueries, self).setUp()
73 global server, server_ip, lp, creds, timeout
74 self.server = server_name
75 self.server_ip = server_ip
78 self.timeout = timeout
80 def test_one_a_query(self):
81 "create a query packet containing one query record"
82 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
85 name = "%s.%s" % (self.server, self.get_dns_domain())
86 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
87 print("asking for ", q.name)
90 self.finish_name_packet(p, questions)
91 (response, response_packet) =\
92 self.dns_transaction_udp(p, host=server_ip)
93 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
94 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
95 self.assertEquals(response.ancount, 1)
96 self.assertEquals(response.answers[0].rdata,
99 def test_one_SOA_query(self):
100 "create a query packet containing one query record for the SOA"
101 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
104 name = "%s" % (self.get_dns_domain())
105 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
106 print("asking for ", q.name)
109 self.finish_name_packet(p, questions)
110 (response, response_packet) =\
111 self.dns_transaction_udp(p, host=server_ip)
112 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
113 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
114 self.assertEquals(response.ancount, 1)
116 response.answers[0].rdata.mname.upper(),
117 ("%s.%s" % (self.server, self.get_dns_domain())).upper())
119 def test_one_a_query_tcp(self):
120 "create a query packet containing one query record via TCP"
121 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
124 name = "%s.%s" % (self.server, self.get_dns_domain())
125 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
126 print("asking for ", q.name)
129 self.finish_name_packet(p, questions)
130 (response, response_packet) =\
131 self.dns_transaction_tcp(p, host=server_ip)
132 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
133 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
134 self.assertEquals(response.ancount, 1)
135 self.assertEquals(response.answers[0].rdata,
138 def test_one_mx_query(self):
139 "create a query packet causing an empty RCODE_OK answer"
140 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
143 name = "%s.%s" % (self.server, self.get_dns_domain())
144 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
145 print("asking for ", q.name)
148 self.finish_name_packet(p, questions)
149 (response, response_packet) =\
150 self.dns_transaction_udp(p, host=server_ip)
151 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
152 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
153 self.assertEquals(response.ancount, 0)
155 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
158 name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
159 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
160 print("asking for ", q.name)
163 self.finish_name_packet(p, questions)
164 (response, response_packet) =\
165 self.dns_transaction_udp(p, host=server_ip)
166 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
167 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
168 self.assertEquals(response.ancount, 0)
170 def test_two_queries(self):
171 "create a query packet containing two query records"
172 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
175 name = "%s.%s" % (self.server, self.get_dns_domain())
176 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
179 name = "%s.%s" % ('bogusname', self.get_dns_domain())
180 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
183 self.finish_name_packet(p, questions)
185 (response, response_packet) =\
186 self.dns_transaction_udp(p, host=server_ip)
187 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
188 except socket.timeout:
189 # Windows chooses not to respond to incorrectly formatted queries.
190 # Although this appears to be non-deterministic even for the same
191 # request twice, it also appears to be based on a how poorly the
192 # request is formatted.
195 def test_qtype_all_query(self):
196 "create a QTYPE_ALL query"
197 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
200 name = "%s.%s" % (self.server, self.get_dns_domain())
201 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
202 print("asking for ", q.name)
205 self.finish_name_packet(p, questions)
206 (response, response_packet) =\
207 self.dns_transaction_udp(p, host=server_ip)
210 dc_ipv6 = os.getenv('SERVER_IPV6')
211 if dc_ipv6 is not None:
214 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
215 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
216 self.assertEquals(response.ancount, num_answers)
217 self.assertEquals(response.answers[0].rdata,
219 if dc_ipv6 is not None:
220 self.assertEquals(response.answers[1].rdata, dc_ipv6)
222 def test_qclass_none_query(self):
223 "create a QCLASS_NONE query"
224 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
227 name = "%s.%s" % (self.server, self.get_dns_domain())
228 q = self.make_name_question(
234 self.finish_name_packet(p, questions)
236 (response, response_packet) =\
237 self.dns_transaction_udp(p, host=server_ip)
238 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
239 except socket.timeout:
240 # Windows chooses not to respond to incorrectly formatted queries.
241 # Although this appears to be non-deterministic even for the same
242 # request twice, it also appears to be based on a how poorly the
243 # request is formatted.
246 def test_soa_hostname_query(self):
247 "create a SOA query for a hostname"
248 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
251 name = "%s.%s" % (self.server, self.get_dns_domain())
252 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
255 self.finish_name_packet(p, questions)
256 (response, response_packet) =\
257 self.dns_transaction_udp(p, host=server_ip)
258 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
259 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
260 # We don't get SOA records for single hosts
261 self.assertEquals(response.ancount, 0)
262 # But we do respond with an authority section
263 self.assertEqual(response.nscount, 1)
265 def test_soa_domain_query(self):
266 "create a SOA query for a domain"
267 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
270 name = self.get_dns_domain()
271 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
274 self.finish_name_packet(p, questions)
275 (response, response_packet) =\
276 self.dns_transaction_udp(p, host=server_ip)
277 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
278 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
279 self.assertEquals(response.ancount, 1)
280 self.assertEquals(response.answers[0].rdata.minimum, 3600)
283 class TestDNSUpdates(DNSTest):
285 super(TestDNSUpdates, self).setUp()
286 global server, server_ip, lp, creds, timeout
287 self.server = server_name
288 self.server_ip = server_ip
291 self.timeout = timeout
293 def test_two_updates(self):
294 "create two update requests"
295 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
298 name = "%s.%s" % (self.server, self.get_dns_domain())
299 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
302 name = self.get_dns_domain()
303 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
306 self.finish_name_packet(p, updates)
308 (response, response_packet) =\
309 self.dns_transaction_udp(p, host=server_ip)
310 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
311 except socket.timeout:
312 # Windows chooses not to respond to incorrectly formatted queries.
313 # Although this appears to be non-deterministic even for the same
314 # request twice, it also appears to be based on a how poorly the
315 # request is formatted.
318 def test_update_wrong_qclass(self):
319 "create update with DNS_QCLASS_NONE"
320 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
323 name = self.get_dns_domain()
324 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
327 self.finish_name_packet(p, updates)
328 (response, response_packet) =\
329 self.dns_transaction_udp(p, host=server_ip)
330 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
332 def test_update_prereq_with_non_null_ttl(self):
333 "test update with a non-null TTL"
334 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
337 name = self.get_dns_domain()
339 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
341 self.finish_name_packet(p, updates)
345 r.name = "%s.%s" % (self.server, self.get_dns_domain())
346 r.rr_type = dns.DNS_QTYPE_TXT
347 r.rr_class = dns.DNS_QCLASS_NONE
352 p.ancount = len(prereqs)
356 (response, response_packet) =\
357 self.dns_transaction_udp(p, host=server_ip)
358 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
359 except socket.timeout:
360 # Windows chooses not to respond to incorrectly formatted queries.
361 # Although this appears to be non-deterministic even for the same
362 # request twice, it also appears to be based on a how poorly the
363 # request is formatted.
366 def test_update_prereq_with_non_null_length(self):
367 "test update with a non-null length"
368 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
371 name = self.get_dns_domain()
373 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
375 self.finish_name_packet(p, updates)
379 r.name = "%s.%s" % (self.server, self.get_dns_domain())
380 r.rr_type = dns.DNS_QTYPE_TXT
381 r.rr_class = dns.DNS_QCLASS_ANY
386 p.ancount = len(prereqs)
389 (response, response_packet) =\
390 self.dns_transaction_udp(p, host=server_ip)
391 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
393 def test_update_prereq_nonexisting_name(self):
394 "test update with a nonexisting name"
395 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
398 name = self.get_dns_domain()
400 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
402 self.finish_name_packet(p, updates)
406 r.name = "idontexist.%s" % self.get_dns_domain()
407 r.rr_type = dns.DNS_QTYPE_TXT
408 r.rr_class = dns.DNS_QCLASS_ANY
413 p.ancount = len(prereqs)
416 (response, response_packet) =\
417 self.dns_transaction_udp(p, host=server_ip)
418 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
420 def test_update_add_txt_record(self):
421 "test adding records works"
422 prefix, txt = 'textrec', ['"This is a test"']
423 p = self.make_txt_update(prefix, txt)
424 (response, response_packet) =\
425 self.dns_transaction_udp(p, host=server_ip)
426 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
427 self.check_query_txt(prefix, txt)
429 def test_delete_record(self):
430 "Test if deleting records works"
432 NAME = "deleterec.%s" % self.get_dns_domain()
434 # First, create a record to make sure we have a record to delete.
435 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
438 name = self.get_dns_domain()
440 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
442 self.finish_name_packet(p, updates)
447 r.rr_type = dns.DNS_QTYPE_TXT
448 r.rr_class = dns.DNS_QCLASS_IN
451 rdata = self.make_txt_record(['"This is a test"'])
454 p.nscount = len(updates)
457 (response, response_packet) =\
458 self.dns_transaction_udp(p, host=server_ip)
459 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
461 # Now check the record is around
462 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
464 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
467 self.finish_name_packet(p, questions)
468 (response, response_packet) =\
469 self.dns_transaction_udp(p, host=server_ip)
470 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
472 # Now delete the record
473 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
476 name = self.get_dns_domain()
478 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
480 self.finish_name_packet(p, updates)
485 r.rr_type = dns.DNS_QTYPE_TXT
486 r.rr_class = dns.DNS_QCLASS_NONE
489 rdata = self.make_txt_record(['"This is a test"'])
492 p.nscount = len(updates)
495 (response, response_packet) =\
496 self.dns_transaction_udp(p, host=server_ip)
497 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
499 # And finally check it's gone
500 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
503 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
506 self.finish_name_packet(p, questions)
507 (response, response_packet) =\
508 self.dns_transaction_udp(p, host=server_ip)
509 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
511 def test_readd_record(self):
512 "Test if adding, deleting and then readding a records works"
514 NAME = "readdrec.%s" % self.get_dns_domain()
517 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
520 name = self.get_dns_domain()
522 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
524 self.finish_name_packet(p, updates)
529 r.rr_type = dns.DNS_QTYPE_TXT
530 r.rr_class = dns.DNS_QCLASS_IN
533 rdata = self.make_txt_record(['"This is a test"'])
536 p.nscount = len(updates)
539 (response, response_packet) =\
540 self.dns_transaction_udp(p, host=server_ip)
541 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
543 # Now check the record is around
544 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
546 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
549 self.finish_name_packet(p, questions)
550 (response, response_packet) =\
551 self.dns_transaction_udp(p, host=server_ip)
552 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
554 # Now delete the record
555 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
558 name = self.get_dns_domain()
560 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
562 self.finish_name_packet(p, updates)
567 r.rr_type = dns.DNS_QTYPE_TXT
568 r.rr_class = dns.DNS_QCLASS_NONE
571 rdata = self.make_txt_record(['"This is a test"'])
574 p.nscount = len(updates)
577 (response, response_packet) =\
578 self.dns_transaction_udp(p, host=server_ip)
579 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
582 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
585 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
588 self.finish_name_packet(p, questions)
589 (response, response_packet) =\
590 self.dns_transaction_udp(p, host=server_ip)
591 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
593 # recreate the record
594 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
597 name = self.get_dns_domain()
599 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
601 self.finish_name_packet(p, updates)
606 r.rr_type = dns.DNS_QTYPE_TXT
607 r.rr_class = dns.DNS_QCLASS_IN
610 rdata = self.make_txt_record(['"This is a test"'])
613 p.nscount = len(updates)
616 (response, response_packet) =\
617 self.dns_transaction_udp(p, host=server_ip)
618 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
620 # Now check the record is around
621 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
623 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
626 self.finish_name_packet(p, questions)
627 (response, response_packet) =\
628 self.dns_transaction_udp(p, host=server_ip)
629 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
631 def test_update_add_mx_record(self):
632 "test adding MX records works"
633 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
636 name = self.get_dns_domain()
638 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
640 self.finish_name_packet(p, updates)
644 r.name = "%s" % self.get_dns_domain()
645 r.rr_type = dns.DNS_QTYPE_MX
646 r.rr_class = dns.DNS_QCLASS_IN
649 rdata = dns.mx_record()
650 rdata.preference = 10
651 rdata.exchange = 'mail.%s' % self.get_dns_domain()
654 p.nscount = len(updates)
657 (response, response_packet) =\
658 self.dns_transaction_udp(p, host=server_ip)
659 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
661 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
664 name = "%s" % self.get_dns_domain()
665 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
668 self.finish_name_packet(p, questions)
669 (response, response_packet) =\
670 self.dns_transaction_udp(p, host=server_ip)
671 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
672 self.assertEqual(response.ancount, 1)
673 ans = response.answers[0]
674 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
675 self.assertEqual(ans.rdata.preference, 10)
676 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
679 class TestComplexQueries(DNSTest):
680 def make_dns_update(self, key, value, qtype):
681 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
683 name = self.get_dns_domain()
684 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
685 self.finish_name_packet(p, [u])
690 r.rr_class = dns.DNS_QCLASS_IN
696 (response, response_packet) =\
697 self.dns_transaction_udp(p, host=server_ip)
698 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
701 super(TestComplexQueries, self).setUp()
703 global server, server_ip, lp, creds, timeout
704 self.server = server_name
705 self.server_ip = server_ip
708 self.timeout = timeout
710 def test_one_a_query(self):
711 "create a query packet containing one query record"
716 name = "cname_test.%s" % self.get_dns_domain()
717 rdata = "%s.%s" % (self.server, self.get_dns_domain())
718 self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
720 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
724 name = "cname_test.%s" % self.get_dns_domain()
725 q = self.make_name_question(name,
728 print("asking for ", q.name)
731 self.finish_name_packet(p, questions)
732 (response, response_packet) =\
733 self.dns_transaction_udp(p, host=self.server_ip)
734 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
735 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
736 self.assertEquals(response.ancount, 2)
737 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
738 self.assertEquals(response.answers[0].rdata, "%s.%s" %
739 (self.server, self.get_dns_domain()))
740 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
741 self.assertEquals(response.answers[1].rdata,
746 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
749 name = self.get_dns_domain()
751 u = self.make_name_question(name,
755 self.finish_name_packet(p, updates)
759 r.name = "cname_test.%s" % self.get_dns_domain()
760 r.rr_type = dns.DNS_QTYPE_CNAME
761 r.rr_class = dns.DNS_QCLASS_NONE
764 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
766 p.nscount = len(updates)
769 (response, response_packet) =\
770 self.dns_transaction_udp(p, host=self.server_ip)
771 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
773 def test_cname_two_chain(self):
774 name0 = "cnamechain0.%s" % self.get_dns_domain()
775 name1 = "cnamechain1.%s" % self.get_dns_domain()
776 name2 = "cnamechain2.%s" % self.get_dns_domain()
777 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
778 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
779 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
781 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
783 q = self.make_name_question(name1, dns.DNS_QTYPE_A,
787 self.finish_name_packet(p, questions)
788 (response, response_packet) =\
789 self.dns_transaction_udp(p, host=server_ip)
790 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
791 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
792 self.assertEquals(response.ancount, 3)
794 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
795 self.assertEquals(response.answers[0].name, name1)
796 self.assertEquals(response.answers[0].rdata, name2)
798 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
799 self.assertEquals(response.answers[1].name, name2)
800 self.assertEquals(response.answers[1].rdata, name0)
802 self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
803 self.assertEquals(response.answers[2].rdata,
806 def test_invalid_empty_cname(self):
807 name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
809 self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
810 except AssertionError:
813 self.fail("Successfully added empty CNAME, which is invalid.")
815 def test_cname_two_chain_not_matching_qtype(self):
816 name0 = "cnamechain0.%s" % self.get_dns_domain()
817 name1 = "cnamechain1.%s" % self.get_dns_domain()
818 name2 = "cnamechain2.%s" % self.get_dns_domain()
819 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
820 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
821 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
823 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
825 q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
829 self.finish_name_packet(p, questions)
830 (response, response_packet) =\
831 self.dns_transaction_udp(p, host=server_ip)
832 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
833 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
835 # CNAME should return all intermediate results!
836 # Only the A records exists, not the TXT.
837 self.assertEquals(response.ancount, 2)
839 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
840 self.assertEquals(response.answers[0].name, name1)
841 self.assertEquals(response.answers[0].rdata, name2)
843 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
844 self.assertEquals(response.answers[1].name, name2)
845 self.assertEquals(response.answers[1].rdata, name0)
848 class TestInvalidQueries(DNSTest):
850 super(TestInvalidQueries, self).setUp()
851 global server, server_ip, lp, creds, timeout
852 self.server = server_name
853 self.server_ip = server_ip
856 self.timeout = timeout
858 def test_one_a_query(self):
859 """send 0 bytes follows by create a query packet
860 containing one query record"""
864 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
865 s.connect((self.server_ip, 53))
871 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
874 name = "%s.%s" % (self.server, self.get_dns_domain())
875 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
876 print("asking for ", q.name)
879 self.finish_name_packet(p, questions)
880 (response, response_packet) =\
881 self.dns_transaction_udp(p, host=self.server_ip)
882 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
883 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
884 self.assertEquals(response.ancount, 1)
885 self.assertEquals(response.answers[0].rdata,
888 def test_one_a_reply(self):
889 "send a reply instead of a query"
892 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
895 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
896 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
897 print("asking for ", q.name)
900 self.finish_name_packet(p, questions)
901 p.operation |= dns.DNS_FLAG_REPLY
904 send_packet = ndr.ndr_pack(p)
905 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
906 s.settimeout(timeout)
907 host = self.server_ip
908 s.connect((host, 53))
909 tcp_packet = struct.pack('!H', len(send_packet))
910 tcp_packet += send_packet
911 s.send(tcp_packet, 0)
912 recv_packet = s.recv(0xffff + 2, 0)
913 self.assertEquals(0, len(recv_packet))
914 except socket.timeout:
915 # Windows chooses not to respond to incorrectly formatted queries.
916 # Although this appears to be non-deterministic even for the same
917 # request twice, it also appears to be based on a how poorly the
918 # request is formatted.
925 class TestZones(DNSTest):
927 super(TestZones, self).setUp()
928 global server, server_ip, lp, creds, timeout
929 self.server = server_name
930 self.server_ip = server_ip
933 self.timeout = timeout
935 self.zone = "test.lan"
936 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
940 self.samdb = SamDB(url="ldap://" + self.server_ip,
941 lp=self.get_loadparm(),
942 session_info=system_session(),
943 credentials=self.creds)
945 self.zone_dn = "DC=" + self.zone +\
946 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
947 str(self.samdb.get_default_basedn())
950 super(TestZones, self).tearDown()
953 self.delete_zone(self.zone)
954 except RuntimeError as e:
955 (num, string) = e.args
956 if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
959 def create_zone(self, zone, aging_enabled=False):
960 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
961 zone_create.pszZoneName = zone
962 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
963 zone_create.fAging = int(aging_enabled)
964 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
965 zone_create.fDsIntegrated = 1
966 zone_create.fLoadExisting = 1
967 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
969 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
970 self.rpc_conn.DnssrvOperation2(client_version,
976 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
978 except WERRORError as e:
981 def set_params(self, **kwargs):
982 zone = kwargs.pop('zone', None)
983 for key, val in kwargs.items():
984 name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
985 name_param.dwParam = val
986 name_param.pszNodeName = key
988 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
989 nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
991 self.rpc_conn.DnssrvOperation2(client_version,
996 'ResetDwordProperty',
999 except WERRORError as e:
1002 def ldap_modify_dnsrecs(self, name, func):
1003 dn = 'DC={},{}'.format(name, self.zone_dn)
1004 dns_recs = self.ldap_get_dns_records(name)
1005 for rec in dns_recs:
1007 update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1008 self.samdb.modify(ldb.Message.from_dict(self.samdb,
1010 ldb.FLAG_MOD_REPLACE))
1012 def dns_update_record(self, prefix, txt):
1013 p = self.make_txt_update(prefix, txt, self.zone)
1014 (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1015 self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1016 recs = self.ldap_get_dns_records(prefix)
1017 recs = [r for r in recs if r.data.str == txt]
1018 self.assertEqual(len(recs), 1)
1021 def ldap_get_records(self, name):
1022 # The use of SCOPE_SUBTREE here avoids raising an exception in the
1023 # 0 results case for a test below.
1025 expr = "(&(objectClass=dnsNode)(name={}))".format(name)
1026 return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1027 expression=expr, attrs=["*"])
1029 def ldap_get_dns_records(self, name):
1030 records = self.ldap_get_records(name)
1031 return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1032 for r in records[0].get('dnsRecord')]
1034 def ldap_get_zone_settings(self):
1035 records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1036 expression="(&(objectClass=dnsZone)" +
1037 "(name={}))".format(self.zone),
1038 attrs=["dNSProperty"])
1039 self.assertEqual(len(records), 1)
1040 props = [ndr_unpack(dnsp.DnsProperty, r)
1041 for r in records[0].get('dNSProperty')]
1043 # We have no choice but to repeat these here.
1044 zone_prop_ids = {0x00: "EMPTY",
1046 0x02: "ALLOW_UPDATE",
1047 0x08: "SECURE_TIME",
1048 0x10: "NOREFRESH_INTERVAL",
1049 0x11: "SCAVENGING_SERVERS",
1050 0x12: "AGING_ENABLED_TIME",
1051 0x20: "REFRESH_INTERVAL",
1052 0x40: "AGING_STATE",
1053 0x80: "DELETED_FROM_HOSTNAME",
1054 0x81: "MASTER_SERVERS",
1055 0x82: "AUTO_NS_SERVERS",
1056 0x83: "DCPROMO_CONVERT",
1057 0x90: "SCAVENGING_SERVERS_DA",
1058 0x91: "MASTER_SERVERS_DA",
1059 0x92: "NS_SERVERS_DA",
1060 0x100: "NODE_DBFLAGS"}
1061 return {zone_prop_ids[p.id].lower(): p.data for p in props}
1063 def set_aging(self, enable=False):
1064 self.create_zone(self.zone, aging_enabled=enable)
1065 self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1066 Aging=int(bool(enable)), zone=self.zone,
1067 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1069 def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1070 self.set_aging(enable=True)
1071 settings = self.ldap_get_zone_settings()
1072 self.assertTrue(settings['aging_state'] is not None)
1073 self.assertTrue(settings['aging_state'])
1075 rec = self.dns_update_record('agingtest', ['test txt'])
1076 self.assertNotEqual(rec.dwTimeStamp, 0)
1078 def test_set_aging_disabled(self):
1079 self.set_aging(enable=False)
1080 settings = self.ldap_get_zone_settings()
1081 self.assertTrue(settings['aging_state'] is not None)
1082 self.assertFalse(settings['aging_state'])
1084 rec = self.dns_update_record('agingtest', ['test txt'])
1085 self.assertNotEqual(rec.dwTimeStamp, 0)
1087 def test_aging_update(self, enable=True):
1088 name, txt = 'agingtest', ['test txt']
1089 self.set_aging(enable=True)
1090 before_mod = self.dns_update_record(name, txt)
1092 self.set_params(zone=self.zone, Aging=0)
1096 self.assertTrue(rec.dwTimeStamp > 0)
1097 rec.dwTimeStamp -= dec
1098 self.ldap_modify_dnsrecs(name, mod_ts)
1099 after_mod = self.ldap_get_dns_records(name)
1100 self.assertEqual(len(after_mod), 1)
1101 after_mod = after_mod[0]
1102 self.assertEqual(after_mod.dwTimeStamp,
1103 before_mod.dwTimeStamp - dec)
1104 after_update = self.dns_update_record(name, txt)
1105 after_should_equal = before_mod if enable else after_mod
1106 self.assertEqual(after_should_equal.dwTimeStamp,
1107 after_update.dwTimeStamp)
1109 def test_aging_update_disabled(self):
1110 self.test_aging_update(enable=False)
1112 def test_aging_refresh(self):
1113 name, txt = 'agingtest', ['test txt']
1114 self.create_zone(self.zone, aging_enabled=True)
1116 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1117 Aging=1, zone=self.zone,
1118 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1119 before_mod = self.dns_update_record(name, txt)
1122 self.assertTrue(rec.dwTimeStamp > 0)
1123 rec.dwTimeStamp -= interval / 2
1124 self.ldap_modify_dnsrecs(name, mod_ts)
1125 update_during_norefresh = self.dns_update_record(name, txt)
1128 self.assertTrue(rec.dwTimeStamp > 0)
1129 rec.dwTimeStamp -= interval + interval / 2
1130 self.ldap_modify_dnsrecs(name, mod_ts)
1131 update_during_refresh = self.dns_update_record(name, txt)
1132 self.assertEqual(update_during_norefresh.dwTimeStamp,
1133 before_mod.dwTimeStamp - interval / 2)
1134 self.assertEqual(update_during_refresh.dwTimeStamp,
1135 before_mod.dwTimeStamp)
1137 def test_rpc_add_no_timestamp(self):
1138 name, txt = 'agingtest', ['test txt']
1139 self.set_aging(enable=True)
1140 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1141 rec_buf.rec = TXTRecord(txt)
1142 self.rpc_conn.DnssrvUpdateRecord2(
1143 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1150 recs = self.ldap_get_dns_records(name)
1151 self.assertEqual(len(recs), 1)
1152 self.assertEqual(recs[0].dwTimeStamp, 0)
1154 def test_static_record_dynamic_update(self):
1155 name, txt = 'agingtest', ['test txt']
1156 txt2 = ['test txt2']
1157 self.set_aging(enable=True)
1158 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1159 rec_buf.rec = TXTRecord(txt)
1160 self.rpc_conn.DnssrvUpdateRecord2(
1161 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1169 rec2 = self.dns_update_record(name, txt2)
1170 self.assertEqual(rec2.dwTimeStamp, 0)
1172 def test_dynamic_record_static_update(self):
1173 name, txt = 'agingtest', ['test txt']
1174 txt2 = ['test txt2']
1175 txt3 = ['test txt3']
1176 self.set_aging(enable=True)
1178 self.dns_update_record(name, txt)
1180 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1181 rec_buf.rec = TXTRecord(txt2)
1182 self.rpc_conn.DnssrvUpdateRecord2(
1183 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1191 self.dns_update_record(name, txt3)
1193 recs = self.ldap_get_dns_records(name)
1194 # Put in dict because ldap recs might be out of order
1195 recs = {str(r.data.str): r for r in recs}
1196 self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1197 self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1198 self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1200 def test_dns_tombstone_custom_match_rule(self):
1201 lp = self.get_loadparm()
1202 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1203 session_info=system_session(),
1204 credentials=self.creds)
1206 name, txt = 'agingtest', ['test txt']
1207 name2, txt2 = 'agingtest2', ['test txt2']
1208 name3, txt3 = 'agingtest3', ['test txt3']
1209 self.create_zone(self.zone, aging_enabled=True)
1211 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1212 Aging=1, zone=self.zone,
1213 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1215 self.dns_update_record(name, txt),
1217 self.dns_update_record(name2, txt),
1218 self.dns_update_record(name2, txt2),
1220 self.dns_update_record(name3, txt),
1221 self.dns_update_record(name3, txt2),
1222 last_update = self.dns_update_record(name3, txt3)
1224 # Modify txt1 of the first 2 names
1226 if rec.data.str == txt:
1227 rec.dwTimeStamp -= 2
1228 self.ldap_modify_dnsrecs(name, mod_ts)
1229 self.ldap_modify_dnsrecs(name2, mod_ts)
1231 self.ldap_get_dns_records(name3)
1232 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
1233 expr = expr.format(int(last_update.dwTimeStamp) - 1)
1235 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1236 expression=expr, attrs=["*"])
1237 except ldb.LdbError as e:
1239 updated_names = {str(r.get('name')) for r in res}
1240 self.assertEqual(updated_names, set([name, name2]))
1242 def test_dns_tombstone_custom_match_rule_fail(self):
1243 self.create_zone(self.zone, aging_enabled=True)
1245 # The check here is that this does not blow up on silly input
1246 expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1247 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1248 expression=expr, attrs=["*"])
1249 self.assertEquals(len(res), 0)
1251 def test_basic_scavenging(self):
1252 lp = self.get_loadparm()
1253 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1254 session_info=system_session(),
1255 credentials=self.creds)
1257 self.create_zone(self.zone, aging_enabled=True)
1259 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1260 zone=self.zone, Aging=1,
1261 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1262 name, txt = 'agingtest', ['test txt']
1263 name2, txt2 = 'agingtest2', ['test txt2']
1264 name3, txt3 = 'agingtest3', ['test txt3']
1265 self.dns_update_record(name, txt)
1266 self.dns_update_record(name2, txt)
1267 self.dns_update_record(name2, txt2)
1268 self.dns_update_record(name3, txt)
1269 self.dns_update_record(name3, txt2)
1270 last_add = self.dns_update_record(name3, txt3)
1273 self.assertTrue(rec.dwTimeStamp > 0)
1274 if rec.data.str == txt:
1275 rec.dwTimeStamp -= interval * 5
1276 self.ldap_modify_dnsrecs(name, mod_ts)
1277 self.ldap_modify_dnsrecs(name2, mod_ts)
1278 self.ldap_modify_dnsrecs(name3, mod_ts)
1279 self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1280 dsdb._scavenge_dns_records(self.samdb)
1282 recs = self.ldap_get_dns_records(name)
1283 self.assertEqual(len(recs), 1)
1284 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1286 recs = self.ldap_get_dns_records(name2)
1287 self.assertEqual(len(recs), 1)
1288 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1289 self.assertEqual(recs[0].data.str, txt2)
1291 recs = self.ldap_get_dns_records(name3)
1292 self.assertEqual(len(recs), 2)
1293 txts = {str(r.data.str) for r in recs}
1294 self.assertEqual(txts, {str(txt2), str(txt3)})
1295 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1296 self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1298 for make_it_work in [False, True]:
1299 inc = -1 if make_it_work else 1
1302 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1303 self.ldap_modify_dnsrecs(name, mod_ts)
1304 dsdb._dns_delete_tombstones(self.samdb)
1305 recs = self.ldap_get_records(name)
1307 self.assertEqual(len(recs), 0)
1309 self.assertEqual(len(recs), 1)
1311 def delete_zone(self, zone):
1312 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1318 dnsserver.DNSSRV_TYPEID_NULL,
1321 def test_soa_query(self):
1323 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1326 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1328 self.finish_name_packet(p, questions)
1330 (response, response_packet) =\
1331 self.dns_transaction_udp(p, host=server_ip)
1332 # Windows returns OK while BIND logically seems to return NXDOMAIN
1333 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1334 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1335 self.assertEquals(response.ancount, 0)
1337 self.create_zone(zone)
1338 (response, response_packet) =\
1339 self.dns_transaction_udp(p, host=server_ip)
1340 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1341 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1342 self.assertEquals(response.ancount, 1)
1343 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1345 self.delete_zone(zone)
1346 (response, response_packet) =\
1347 self.dns_transaction_udp(p, host=server_ip)
1348 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1349 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1350 self.assertEquals(response.ancount, 0)
1353 class TestRPCRoundtrip(DNSTest):
1355 super(TestRPCRoundtrip, self).setUp()
1356 global server, server_ip, lp, creds
1357 self.server = server_name
1358 self.server_ip = server_ip
1361 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1367 super(TestRPCRoundtrip, self).tearDown()
1369 def test_update_add_txt_rpc_to_dns(self):
1370 prefix, txt = 'rpctextrec', ['"This is a test"']
1372 name = "%s.%s" % (prefix, self.get_dns_domain())
1374 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1375 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1376 add_rec_buf.rec = rec
1378 self.rpc_conn.DnssrvUpdateRecord2(
1379 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1382 self.get_dns_domain(),
1387 except WERRORError as e:
1391 self.check_query_txt(prefix, txt)
1393 self.rpc_conn.DnssrvUpdateRecord2(
1394 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1397 self.get_dns_domain(),
1402 def test_update_add_null_padded_txt_record(self):
1403 "test adding records works"
1404 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1405 p = self.make_txt_update(prefix, txt)
1406 (response, response_packet) =\
1407 self.dns_transaction_udp(p, host=server_ip)
1408 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1409 self.check_query_txt(prefix, txt)
1410 self.assertIsNotNone(
1411 dns_record_match(self.rpc_conn,
1413 self.get_dns_domain(),
1414 "%s.%s" % (prefix, self.get_dns_domain()),
1416 '"\\"This is a test\\"" "" ""'))
1418 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1419 p = self.make_txt_update(prefix, txt)
1420 (response, response_packet) =\
1421 self.dns_transaction_udp(p, host=server_ip)
1422 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1423 self.check_query_txt(prefix, txt)
1424 self.assertIsNotNone(
1428 self.get_dns_domain(),
1429 "%s.%s" % (prefix, self.get_dns_domain()),
1431 '"\\"This is a test\\"" "" "" "more text"'))
1433 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1434 p = self.make_txt_update(prefix, txt)
1435 (response, response_packet) =\
1436 self.dns_transaction_udp(p, host=server_ip)
1437 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1438 self.check_query_txt(prefix, txt)
1439 self.assertIsNotNone(
1443 self.get_dns_domain(),
1444 "%s.%s" % (prefix, self.get_dns_domain()),
1446 '"" "" "\\"This is a test\\""'))
1448 def test_update_add_padding_rpc_to_dns(self):
1449 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1450 prefix = 'rpc' + prefix
1451 name = "%s.%s" % (prefix, self.get_dns_domain())
1453 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1454 '"\\"This is a test\\"" "" ""')
1455 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1456 add_rec_buf.rec = rec
1458 self.rpc_conn.DnssrvUpdateRecord2(
1459 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1462 self.get_dns_domain(),
1467 except WERRORError as e:
1471 self.check_query_txt(prefix, txt)
1473 self.rpc_conn.DnssrvUpdateRecord2(
1474 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1477 self.get_dns_domain(),
1482 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1483 prefix = 'rpc' + prefix
1484 name = "%s.%s" % (prefix, self.get_dns_domain())
1486 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1487 '"\\"This is a test\\"" "" "" "more text"')
1488 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1489 add_rec_buf.rec = rec
1491 self.rpc_conn.DnssrvUpdateRecord2(
1492 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1495 self.get_dns_domain(),
1500 except WERRORError as e:
1504 self.check_query_txt(prefix, txt)
1506 self.rpc_conn.DnssrvUpdateRecord2(
1507 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1510 self.get_dns_domain(),
1515 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1516 prefix = 'rpc' + prefix
1517 name = "%s.%s" % (prefix, self.get_dns_domain())
1519 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1520 '"" "" "\\"This is a test\\""')
1521 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1522 add_rec_buf.rec = rec
1524 self.rpc_conn.DnssrvUpdateRecord2(
1525 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1528 self.get_dns_domain(),
1532 except WERRORError as e:
1536 self.check_query_txt(prefix, txt)
1538 self.rpc_conn.DnssrvUpdateRecord2(
1539 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1542 self.get_dns_domain(),
1547 # Test is incomplete due to strlen against txt records
1548 def test_update_add_null_char_txt_record(self):
1549 "test adding records works"
1550 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1551 p = self.make_txt_update(prefix, txt)
1552 (response, response_packet) =\
1553 self.dns_transaction_udp(p, host=server_ip)
1554 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1555 self.check_query_txt(prefix, ['NULL'])
1556 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1557 self.get_dns_domain(),
1558 "%s.%s" % (prefix, self.get_dns_domain()),
1559 dnsp.DNS_TYPE_TXT, '"NULL"'))
1561 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1562 p = self.make_txt_update(prefix, txt)
1563 (response, response_packet) =\
1564 self.dns_transaction_udp(p, host=server_ip)
1565 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1566 self.check_query_txt(prefix, ['NULL', 'NULL'])
1567 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1568 self.get_dns_domain(),
1569 "%s.%s" % (prefix, self.get_dns_domain()),
1570 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1572 def test_update_add_null_char_rpc_to_dns(self):
1573 prefix = 'rpcnulltextrec'
1574 name = "%s.%s" % (prefix, self.get_dns_domain())
1576 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1577 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1578 add_rec_buf.rec = rec
1580 self.rpc_conn.DnssrvUpdateRecord2(
1581 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1584 self.get_dns_domain(),
1589 except WERRORError as e:
1593 self.check_query_txt(prefix, ['NULL'])
1595 self.rpc_conn.DnssrvUpdateRecord2(
1596 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1599 self.get_dns_domain(),
1604 def test_update_add_hex_char_txt_record(self):
1605 "test adding records works"
1606 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1607 p = self.make_txt_update(prefix, txt)
1608 (response, response_packet) =\
1609 self.dns_transaction_udp(p, host=server_ip)
1610 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1611 self.check_query_txt(prefix, txt)
1612 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1613 self.get_dns_domain(),
1614 "%s.%s" % (prefix, self.get_dns_domain()),
1615 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1617 def test_update_add_hex_rpc_to_dns(self):
1618 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1619 prefix = 'rpc' + prefix
1620 name = "%s.%s" % (prefix, self.get_dns_domain())
1622 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1623 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1624 add_rec_buf.rec = rec
1626 self.rpc_conn.DnssrvUpdateRecord2(
1627 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1630 self.get_dns_domain(),
1635 except WERRORError as e:
1639 self.check_query_txt(prefix, txt)
1641 self.rpc_conn.DnssrvUpdateRecord2(
1642 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1645 self.get_dns_domain(),
1650 def test_update_add_slash_txt_record(self):
1651 "test adding records works"
1652 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1653 p = self.make_txt_update(prefix, txt)
1654 (response, response_packet) =\
1655 self.dns_transaction_udp(p, host=server_ip)
1656 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1657 self.check_query_txt(prefix, txt)
1658 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1659 self.get_dns_domain(),
1660 "%s.%s" % (prefix, self.get_dns_domain()),
1661 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1663 # This test fails against Windows as it eliminates slashes in RPC
1664 # One typical use for a slash is in records like 'var=value' to
1665 # escape '=' characters.
1666 def test_update_add_slash_rpc_to_dns(self):
1667 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1668 prefix = 'rpc' + prefix
1669 name = "%s.%s" % (prefix, self.get_dns_domain())
1671 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1672 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1673 add_rec_buf.rec = rec
1675 self.rpc_conn.DnssrvUpdateRecord2(
1676 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1679 self.get_dns_domain(),
1684 except WERRORError as e:
1688 self.check_query_txt(prefix, txt)
1691 self.rpc_conn.DnssrvUpdateRecord2(
1692 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1695 self.get_dns_domain(),
1700 def test_update_add_two_txt_records(self):
1701 "test adding two txt records works"
1702 prefix, txt = 'textrec2', ['"This is a test"',
1703 '"and this is a test, too"']
1704 p = self.make_txt_update(prefix, txt)
1705 (response, response_packet) =\
1706 self.dns_transaction_udp(p, host=server_ip)
1707 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1708 self.check_query_txt(prefix, txt)
1709 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1710 self.get_dns_domain(),
1711 "%s.%s" % (prefix, self.get_dns_domain()),
1712 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1713 ' "\\"and this is a test, too\\""'))
1715 def test_update_add_two_rpc_to_dns(self):
1716 prefix, txt = 'textrec2', ['"This is a test"',
1717 '"and this is a test, too"']
1718 prefix = 'rpc' + prefix
1719 name = "%s.%s" % (prefix, self.get_dns_domain())
1721 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1722 '"\\"This is a test\\""' +
1723 ' "\\"and this is a test, too\\""')
1724 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1725 add_rec_buf.rec = rec
1727 self.rpc_conn.DnssrvUpdateRecord2(
1728 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1731 self.get_dns_domain(),
1736 except WERRORError as e:
1740 self.check_query_txt(prefix, txt)
1742 self.rpc_conn.DnssrvUpdateRecord2(
1743 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1746 self.get_dns_domain(),
1751 def test_update_add_empty_txt_records(self):
1752 "test adding two txt records works"
1753 prefix, txt = 'emptytextrec', []
1754 p = self.make_txt_update(prefix, txt)
1755 (response, response_packet) =\
1756 self.dns_transaction_udp(p, host=server_ip)
1757 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1758 self.check_query_txt(prefix, txt)
1759 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1760 self.get_dns_domain(),
1761 "%s.%s" % (prefix, self.get_dns_domain()),
1762 dnsp.DNS_TYPE_TXT, ''))
1764 def test_update_add_empty_rpc_to_dns(self):
1765 prefix, txt = 'rpcemptytextrec', []
1767 name = "%s.%s" % (prefix, self.get_dns_domain())
1769 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1770 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1771 add_rec_buf.rec = rec
1773 self.rpc_conn.DnssrvUpdateRecord2(
1774 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1777 self.get_dns_domain(),
1781 except WERRORError as e:
1785 self.check_query_txt(prefix, txt)
1787 self.rpc_conn.DnssrvUpdateRecord2(
1788 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1791 self.get_dns_domain(),
1796 TestProgram(module=__name__, opts=subunitopts)