1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
30 import samba.ndr as ndr
31 from samba import credentials, param
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
40 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
41 sambaopts = options.SambaOptions(parser)
42 parser.add_option_group(sambaopts)
44 # This timeout only has relevance when testing against Windows
45 # Format errors tend to return patchy responses, so a timeout is needed.
46 parser.add_option("--timeout", type="int", dest="timeout",
47 help="Specify timeout for DNS requests")
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
55 opts, args = parser.parse_args()
57 lp = sambaopts.get_loadparm()
58 creds = credopts.get_credentials(lp)
60 timeout = opts.timeout
68 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
70 class TestSimpleQueries(DNSTest):
72 super(TestSimpleQueries, self).setUp()
73 global server, server_ip, lp, creds, timeout
74 self.server = server_name
75 self.server_ip = server_ip
78 self.timeout = timeout
80 def test_one_a_query(self):
81 "create a query packet containing one query record"
82 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
85 name = "%s.%s" % (self.server, self.get_dns_domain())
86 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
87 print("asking for ", q.name)
90 self.finish_name_packet(p, questions)
91 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
92 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
93 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
94 self.assertEquals(response.ancount, 1)
95 self.assertEquals(response.answers[0].rdata,
98 def test_one_SOA_query(self):
99 "create a query packet containing one query record for the SOA"
100 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
103 name = "%s" % (self.get_dns_domain())
104 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
105 print("asking for ", q.name)
108 self.finish_name_packet(p, questions)
109 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
110 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
111 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
112 self.assertEquals(response.ancount, 1)
113 self.assertEquals(response.answers[0].rdata.mname.upper(),
114 ("%s.%s" % (self.server, self.get_dns_domain())).upper())
116 def test_one_a_query_tcp(self):
117 "create a query packet containing one query record via TCP"
118 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
121 name = "%s.%s" % (self.server, self.get_dns_domain())
122 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
123 print("asking for ", q.name)
126 self.finish_name_packet(p, questions)
127 (response, response_packet) = self.dns_transaction_tcp(p, host=server_ip)
128 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
129 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
130 self.assertEquals(response.ancount, 1)
131 self.assertEquals(response.answers[0].rdata,
134 def test_one_mx_query(self):
135 "create a query packet causing an empty RCODE_OK answer"
136 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
139 name = "%s.%s" % (self.server, self.get_dns_domain())
140 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
141 print("asking for ", q.name)
144 self.finish_name_packet(p, questions)
145 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
146 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
147 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
148 self.assertEquals(response.ancount, 0)
150 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
153 name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
154 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
155 print("asking for ", q.name)
158 self.finish_name_packet(p, questions)
159 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
160 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
161 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
162 self.assertEquals(response.ancount, 0)
164 def test_two_queries(self):
165 "create a query packet containing two query records"
166 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
169 name = "%s.%s" % (self.server, self.get_dns_domain())
170 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
173 name = "%s.%s" % ('bogusname', self.get_dns_domain())
174 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
177 self.finish_name_packet(p, questions)
179 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
180 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
181 except socket.timeout:
182 # Windows chooses not to respond to incorrectly formatted queries.
183 # Although this appears to be non-deterministic even for the same
184 # request twice, it also appears to be based on a how poorly the
185 # request is formatted.
188 def test_qtype_all_query(self):
189 "create a QTYPE_ALL query"
190 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
193 name = "%s.%s" % (self.server, self.get_dns_domain())
194 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
195 print("asking for ", q.name)
198 self.finish_name_packet(p, questions)
199 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
202 dc_ipv6 = os.getenv('SERVER_IPV6')
203 if dc_ipv6 is not None:
206 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
207 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
208 self.assertEquals(response.ancount, num_answers)
209 self.assertEquals(response.answers[0].rdata,
211 if dc_ipv6 is not None:
212 self.assertEquals(response.answers[1].rdata, dc_ipv6)
214 def test_qclass_none_query(self):
215 "create a QCLASS_NONE query"
216 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
219 name = "%s.%s" % (self.server, self.get_dns_domain())
220 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
223 self.finish_name_packet(p, questions)
225 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
226 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
227 except socket.timeout:
228 # Windows chooses not to respond to incorrectly formatted queries.
229 # Although this appears to be non-deterministic even for the same
230 # request twice, it also appears to be based on a how poorly the
231 # request is formatted.
234 def test_soa_hostname_query(self):
235 "create a SOA query for a hostname"
236 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
239 name = "%s.%s" % (self.server, self.get_dns_domain())
240 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
243 self.finish_name_packet(p, questions)
244 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
245 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
246 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
247 # We don't get SOA records for single hosts
248 self.assertEquals(response.ancount, 0)
249 # But we do respond with an authority section
250 self.assertEqual(response.nscount, 1)
252 def test_soa_domain_query(self):
253 "create a SOA query for a domain"
254 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
257 name = self.get_dns_domain()
258 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
261 self.finish_name_packet(p, questions)
262 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
263 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
264 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
265 self.assertEquals(response.ancount, 1)
266 self.assertEquals(response.answers[0].rdata.minimum, 3600)
269 class TestDNSUpdates(DNSTest):
271 super(TestDNSUpdates, self).setUp()
272 global server, server_ip, lp, creds, timeout
273 self.server = server_name
274 self.server_ip = server_ip
277 self.timeout = timeout
279 def test_two_updates(self):
280 "create two update requests"
281 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
284 name = "%s.%s" % (self.server, self.get_dns_domain())
285 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
288 name = self.get_dns_domain()
289 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
292 self.finish_name_packet(p, updates)
294 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
295 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
296 except socket.timeout:
297 # Windows chooses not to respond to incorrectly formatted queries.
298 # Although this appears to be non-deterministic even for the same
299 # request twice, it also appears to be based on a how poorly the
300 # request is formatted.
303 def test_update_wrong_qclass(self):
304 "create update with DNS_QCLASS_NONE"
305 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
308 name = self.get_dns_domain()
309 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
312 self.finish_name_packet(p, updates)
313 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
314 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
316 def test_update_prereq_with_non_null_ttl(self):
317 "test update with a non-null TTL"
318 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
321 name = self.get_dns_domain()
323 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
325 self.finish_name_packet(p, updates)
329 r.name = "%s.%s" % (self.server, self.get_dns_domain())
330 r.rr_type = dns.DNS_QTYPE_TXT
331 r.rr_class = dns.DNS_QCLASS_NONE
336 p.ancount = len(prereqs)
340 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
341 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
342 except socket.timeout:
343 # Windows chooses not to respond to incorrectly formatted queries.
344 # Although this appears to be non-deterministic even for the same
345 # request twice, it also appears to be based on a how poorly the
346 # request is formatted.
349 def test_update_prereq_with_non_null_length(self):
350 "test update with a non-null length"
351 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
354 name = self.get_dns_domain()
356 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
358 self.finish_name_packet(p, updates)
362 r.name = "%s.%s" % (self.server, self.get_dns_domain())
363 r.rr_type = dns.DNS_QTYPE_TXT
364 r.rr_class = dns.DNS_QCLASS_ANY
369 p.ancount = len(prereqs)
372 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
373 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
375 def test_update_prereq_nonexisting_name(self):
376 "test update with a nonexisting name"
377 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
380 name = self.get_dns_domain()
382 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
384 self.finish_name_packet(p, updates)
388 r.name = "idontexist.%s" % self.get_dns_domain()
389 r.rr_type = dns.DNS_QTYPE_TXT
390 r.rr_class = dns.DNS_QCLASS_ANY
395 p.ancount = len(prereqs)
398 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
399 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
401 def test_update_add_txt_record(self):
402 "test adding records works"
403 prefix, txt = 'textrec', ['"This is a test"']
404 p = self.make_txt_update(prefix, txt)
405 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
406 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
407 self.check_query_txt(prefix, txt)
409 def test_delete_record(self):
410 "Test if deleting records works"
412 NAME = "deleterec.%s" % self.get_dns_domain()
414 # First, create a record to make sure we have a record to delete.
415 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
418 name = self.get_dns_domain()
420 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
422 self.finish_name_packet(p, updates)
427 r.rr_type = dns.DNS_QTYPE_TXT
428 r.rr_class = dns.DNS_QCLASS_IN
431 rdata = self.make_txt_record(['"This is a test"'])
434 p.nscount = len(updates)
437 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
438 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
440 # Now check the record is around
441 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
443 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
446 self.finish_name_packet(p, questions)
447 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
448 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
450 # Now delete the record
451 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
454 name = self.get_dns_domain()
456 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
458 self.finish_name_packet(p, updates)
463 r.rr_type = dns.DNS_QTYPE_TXT
464 r.rr_class = dns.DNS_QCLASS_NONE
467 rdata = self.make_txt_record(['"This is a test"'])
470 p.nscount = len(updates)
473 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
474 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
476 # And finally check it's gone
477 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
480 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
483 self.finish_name_packet(p, questions)
484 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
485 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
487 def test_readd_record(self):
488 "Test if adding, deleting and then readding a records works"
490 NAME = "readdrec.%s" % self.get_dns_domain()
493 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
496 name = self.get_dns_domain()
498 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
500 self.finish_name_packet(p, updates)
505 r.rr_type = dns.DNS_QTYPE_TXT
506 r.rr_class = dns.DNS_QCLASS_IN
509 rdata = self.make_txt_record(['"This is a test"'])
512 p.nscount = len(updates)
515 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
516 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
518 # Now check the record is around
519 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
521 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
524 self.finish_name_packet(p, questions)
525 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
526 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
528 # Now delete the record
529 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
532 name = self.get_dns_domain()
534 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
536 self.finish_name_packet(p, updates)
541 r.rr_type = dns.DNS_QTYPE_TXT
542 r.rr_class = dns.DNS_QCLASS_NONE
545 rdata = self.make_txt_record(['"This is a test"'])
548 p.nscount = len(updates)
551 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
552 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
555 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
558 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
561 self.finish_name_packet(p, questions)
562 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
563 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
565 # recreate the record
566 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
569 name = self.get_dns_domain()
571 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
573 self.finish_name_packet(p, updates)
578 r.rr_type = dns.DNS_QTYPE_TXT
579 r.rr_class = dns.DNS_QCLASS_IN
582 rdata = self.make_txt_record(['"This is a test"'])
585 p.nscount = len(updates)
588 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
589 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
591 # Now check the record is around
592 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
594 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
597 self.finish_name_packet(p, questions)
598 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
599 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
601 def test_update_add_mx_record(self):
602 "test adding MX records works"
603 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
606 name = self.get_dns_domain()
608 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
610 self.finish_name_packet(p, updates)
614 r.name = "%s" % self.get_dns_domain()
615 r.rr_type = dns.DNS_QTYPE_MX
616 r.rr_class = dns.DNS_QCLASS_IN
619 rdata = dns.mx_record()
620 rdata.preference = 10
621 rdata.exchange = 'mail.%s' % self.get_dns_domain()
624 p.nscount = len(updates)
627 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
628 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
630 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
633 name = "%s" % self.get_dns_domain()
634 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
637 self.finish_name_packet(p, questions)
638 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
639 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
640 self.assertEqual(response.ancount, 1)
641 ans = response.answers[0]
642 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
643 self.assertEqual(ans.rdata.preference, 10)
644 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
647 class TestComplexQueries(DNSTest):
648 def make_dns_update(self, key, value, qtype):
649 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
651 name = self.get_dns_domain()
652 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
653 self.finish_name_packet(p, [u])
658 r.rr_class = dns.DNS_QCLASS_IN
664 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
665 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
668 super(TestComplexQueries, self).setUp()
670 global server, server_ip, lp, creds, timeout
671 self.server = server_name
672 self.server_ip = server_ip
675 self.timeout = timeout
677 def test_one_a_query(self):
678 "create a query packet containing one query record"
683 name = "cname_test.%s" % self.get_dns_domain()
684 rdata = "%s.%s" % (self.server, self.get_dns_domain())
685 self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
687 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
691 name = "cname_test.%s" % self.get_dns_domain()
692 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
693 print("asking for ", q.name)
696 self.finish_name_packet(p, questions)
697 (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
698 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
699 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
700 self.assertEquals(response.ancount, 2)
701 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
702 self.assertEquals(response.answers[0].rdata, "%s.%s" %
703 (self.server, self.get_dns_domain()))
704 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
705 self.assertEquals(response.answers[1].rdata,
710 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
713 name = self.get_dns_domain()
715 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
717 self.finish_name_packet(p, updates)
721 r.name = "cname_test.%s" % self.get_dns_domain()
722 r.rr_type = dns.DNS_QTYPE_CNAME
723 r.rr_class = dns.DNS_QCLASS_NONE
726 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
728 p.nscount = len(updates)
731 (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
732 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
734 def test_cname_two_chain(self):
735 name0 = "cnamechain0.%s" % self.get_dns_domain()
736 name1 = "cnamechain1.%s" % self.get_dns_domain()
737 name2 = "cnamechain2.%s" % self.get_dns_domain()
738 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
739 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
740 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
742 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
744 q = self.make_name_question(name1, dns.DNS_QTYPE_A,
748 self.finish_name_packet(p, questions)
749 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
750 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
751 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
752 self.assertEquals(response.ancount, 3)
754 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
755 self.assertEquals(response.answers[0].name, name1)
756 self.assertEquals(response.answers[0].rdata, name2)
758 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
759 self.assertEquals(response.answers[1].name, name2)
760 self.assertEquals(response.answers[1].rdata, name0)
762 self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
763 self.assertEquals(response.answers[2].rdata,
766 def test_invalid_empty_cname(self):
767 name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
769 self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
770 except AssertionError as e:
773 self.fail("Successfully added empty CNAME, which is invalid.")
775 def test_cname_two_chain_not_matching_qtype(self):
776 name0 = "cnamechain0.%s" % self.get_dns_domain()
777 name1 = "cnamechain1.%s" % self.get_dns_domain()
778 name2 = "cnamechain2.%s" % self.get_dns_domain()
779 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
780 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
781 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
783 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
785 q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
789 self.finish_name_packet(p, questions)
790 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
791 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
792 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
794 # CNAME should return all intermediate results!
795 # Only the A records exists, not the TXT.
796 self.assertEquals(response.ancount, 2)
798 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
799 self.assertEquals(response.answers[0].name, name1)
800 self.assertEquals(response.answers[0].rdata, name2)
802 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
803 self.assertEquals(response.answers[1].name, name2)
804 self.assertEquals(response.answers[1].rdata, name0)
806 class TestInvalidQueries(DNSTest):
808 super(TestInvalidQueries, self).setUp()
809 global server, server_ip, lp, creds, timeout
810 self.server = server_name
811 self.server_ip = server_ip
814 self.timeout = timeout
816 def test_one_a_query(self):
817 "send 0 bytes follows by create a query packet containing one query record"
821 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
822 s.connect((self.server_ip, 53))
828 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
831 name = "%s.%s" % (self.server, self.get_dns_domain())
832 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
833 print("asking for ", q.name)
836 self.finish_name_packet(p, questions)
837 (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
838 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
839 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
840 self.assertEquals(response.ancount, 1)
841 self.assertEquals(response.answers[0].rdata,
844 def test_one_a_reply(self):
845 "send a reply instead of a query"
848 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
851 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
852 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
853 print("asking for ", q.name)
856 self.finish_name_packet(p, questions)
857 p.operation |= dns.DNS_FLAG_REPLY
860 send_packet = ndr.ndr_pack(p)
861 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
862 s.settimeout(timeout)
864 s.connect((host, 53))
865 tcp_packet = struct.pack('!H', len(send_packet))
866 tcp_packet += send_packet
867 s.send(tcp_packet, 0)
868 recv_packet = s.recv(0xffff + 2, 0)
869 self.assertEquals(0, len(recv_packet))
870 except socket.timeout:
871 # Windows chooses not to respond to incorrectly formatted queries.
872 # Although this appears to be non-deterministic even for the same
873 # request twice, it also appears to be based on a how poorly the
874 # request is formatted.
880 class TestZones(DNSTest):
882 super(TestZones, self).setUp()
883 global server, server_ip, lp, creds, timeout
884 self.server = server_name
885 self.server_ip = server_ip
888 self.timeout = timeout
890 self.zone = "test.lan"
891 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %\
895 self.samdb = SamDB(url="ldap://" + self.server_ip,
896 lp = self.get_loadparm(),
897 session_info=system_session(),
898 credentials=self.creds)
900 self.zone_dn = "DC=" + self.zone +\
901 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
902 str(self.samdb.get_default_basedn())
905 super(TestZones, self).tearDown()
908 self.delete_zone(self.zone)
909 except RuntimeError as e:
910 (num, string) = e.args
911 if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
914 def create_zone(self, zone, aging_enabled=False):
915 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
916 zone_create.pszZoneName = zone
917 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
918 zone_create.fAging = int(aging_enabled)
919 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
920 zone_create.fDsIntegrated = 1
921 zone_create.fLoadExisting = 1
922 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
924 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
925 self.rpc_conn.DnssrvOperation2(client_version,
931 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
933 except WERRORError as e:
936 def set_params(self, **kwargs):
937 zone = kwargs.pop('zone', None)
938 for key,val in kwargs.items():
939 name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
940 name_param.dwParam = val
941 name_param.pszNodeName = key
943 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
944 nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
946 self.rpc_conn.DnssrvOperation2(client_version, 0, self.server, zone,
947 0, 'ResetDwordProperty', nap_type,
949 except WERRORError as e:
952 def ldap_modify_dnsrecs(self, name, func):
953 dn = 'DC={},{}'.format(name, self.zone_dn)
954 dns_recs = self.ldap_get_dns_records(name)
957 update_dict = {'dn':dn, 'dnsRecord':[ndr_pack(r) for r in dns_recs]}
958 self.samdb.modify(ldb.Message.from_dict(self.samdb,
960 ldb.FLAG_MOD_REPLACE))
962 def dns_update_record(self, prefix, txt):
963 p = self.make_txt_update(prefix, txt, self.zone)
964 (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
965 self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
966 recs = self.ldap_get_dns_records(prefix)
967 recs = [r for r in recs if r.data.str == txt]
968 self.assertEqual(len(recs), 1)
971 def ldap_get_records(self, name):
972 dn = 'DC={},{}'.format(name, self.zone_dn)
973 expr = "(&(objectClass=dnsNode)(name={}))".format(name)
974 return self.samdb.search(base=dn, scope=ldb.SCOPE_SUBTREE,
975 expression=expr, attrs=["*"])
977 def ldap_get_dns_records(self, name):
978 records = self.ldap_get_records(name)
979 return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
980 for r in records[0].get('dnsRecord')]
982 def ldap_get_zone_settings(self):
983 records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
984 expression="(&(objectClass=dnsZone)"+\
985 "(name={}))".format(self.zone),
986 attrs=["dNSProperty"])
987 self.assertEqual(len(records), 1)
988 props = [ndr_unpack(dnsp.DnsProperty, r)
989 for r in records[0].get('dNSProperty')]
991 #We have no choice but to repeat these here.
992 zone_prop_ids = {0x00: "EMPTY",
994 0x02: "ALLOW_UPDATE",
996 0x10: "NOREFRESH_INTERVAL",
997 0x11: "SCAVENGING_SERVERS",
998 0x12: "AGING_ENABLED_TIME",
999 0x20: "REFRESH_INTERVAL",
1000 0x40: "AGING_STATE",
1001 0x80: "DELETED_FROM_HOSTNAME",
1002 0x81: "MASTER_SERVERS",
1003 0x82: "AUTO_NS_SERVERS",
1004 0x83: "DCPROMO_CONVERT",
1005 0x90: "SCAVENGING_SERVERS_DA",
1006 0x91: "MASTER_SERVERS_DA",
1007 0x92: "NS_SERVERS_DA",
1008 0x100: "NODE_DBFLAGS"}
1009 return {zone_prop_ids[p.id].lower(): p.data for p in props}
1011 def set_aging(self, enable=False):
1012 self.create_zone(self.zone, aging_enabled=enable)
1013 self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1014 Aging=int(bool(enable)), zone=self.zone,
1015 AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
1017 def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1018 self.set_aging(enable=True)
1019 settings = self.ldap_get_zone_settings()
1020 self.assertTrue(settings['aging_state'] is not None)
1021 self.assertTrue(settings['aging_state'])
1023 rec = self.dns_update_record('agingtest', ['test txt'])
1024 self.assertNotEqual(rec.dwTimeStamp, 0)
1026 def test_set_aging_disabled(self):
1027 self.set_aging(enable=False)
1028 settings = self.ldap_get_zone_settings()
1029 self.assertTrue(settings['aging_state'] is not None)
1030 self.assertFalse(settings['aging_state'])
1032 rec = self.dns_update_record('agingtest', ['test txt'])
1033 self.assertNotEqual(rec.dwTimeStamp, 0)
1035 def test_aging_update(self, enable=True):
1036 name, txt = 'agingtest', ['test txt']
1037 self.set_aging(enable=True)
1038 before_mod = self.dns_update_record(name, txt)
1040 self.set_params(zone=self.zone, Aging=0)
1043 self.assertTrue(rec.dwTimeStamp > 0)
1044 rec.dwTimeStamp -= dec
1045 self.ldap_modify_dnsrecs(name, mod_ts)
1046 after_mod = self.ldap_get_dns_records(name)
1047 self.assertEqual(len(after_mod), 1)
1048 after_mod = after_mod[0]
1049 self.assertEqual(after_mod.dwTimeStamp,
1050 before_mod.dwTimeStamp - dec)
1051 after_update = self.dns_update_record(name, txt)
1052 after_should_equal = before_mod if enable else after_mod
1053 self.assertEqual(after_should_equal.dwTimeStamp,
1054 after_update.dwTimeStamp)
1056 def test_aging_update_disabled(self):
1057 self.test_aging_update(enable=False)
1059 def test_aging_refresh(self):
1060 name,txt = 'agingtest', ['test txt']
1061 self.create_zone(self.zone, aging_enabled=True)
1063 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1064 Aging=1, zone=self.zone,
1065 AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
1066 before_mod = self.dns_update_record(name, txt)
1068 self.assertTrue(rec.dwTimeStamp > 0)
1069 rec.dwTimeStamp -= interval/2
1070 self.ldap_modify_dnsrecs(name, mod_ts)
1071 update_during_norefresh = self.dns_update_record(name, txt)
1073 self.assertTrue(rec.dwTimeStamp > 0)
1074 rec.dwTimeStamp -= interval + interval/2
1075 self.ldap_modify_dnsrecs(name, mod_ts)
1076 update_during_refresh = self.dns_update_record(name, txt)
1077 self.assertEqual(update_during_norefresh.dwTimeStamp,
1078 before_mod.dwTimeStamp - interval/2)
1079 self.assertEqual(update_during_refresh.dwTimeStamp,
1080 before_mod.dwTimeStamp)
1082 def test_rpc_add_no_timestamp(self):
1083 name,txt = 'agingtest', ['test txt']
1084 self.set_aging(enable=True)
1085 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1086 rec_buf.rec = TXTRecord(txt)
1087 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1089 self.zone, name, rec_buf, None)
1090 recs = self.ldap_get_dns_records(name)
1091 self.assertEqual(len(recs), 1)
1092 self.assertEqual(recs[0].dwTimeStamp, 0)
1094 def test_basic_scavenging(self):
1095 self.create_zone(self.zone, aging_enabled=True)
1097 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1098 zone=self.zone, Aging=1,
1099 AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
1100 name, txt = 'agingtest', ['test txt']
1101 rec = self.dns_update_record(name,txt)
1102 rec = self.dns_update_record(name+'2',txt)
1104 self.assertTrue(rec.dwTimeStamp > 0)
1105 rec.dwTimeStamp -= interval*5
1106 self.ldap_modify_dnsrecs(name, mod_ts)
1107 self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1108 dsdb._scavenge_dns_records(self.samdb)
1110 recs = self.ldap_get_dns_records(name)
1111 self.assertEqual(len(recs), 1)
1112 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1113 recs = self.ldap_get_dns_records(name+'2')
1114 self.assertEqual(len(recs), 1)
1115 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1117 def delete_zone(self, zone):
1118 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1124 dnsserver.DNSSRV_TYPEID_NULL,
1127 def test_soa_query(self):
1129 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1132 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1134 self.finish_name_packet(p, questions)
1136 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1137 # Windows returns OK while BIND logically seems to return NXDOMAIN
1138 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1139 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1140 self.assertEquals(response.ancount, 0)
1142 self.create_zone(zone)
1143 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1144 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1145 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1146 self.assertEquals(response.ancount, 1)
1147 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1149 self.delete_zone(zone)
1150 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1151 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1152 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1153 self.assertEquals(response.ancount, 0)
1155 class TestRPCRoundtrip(DNSTest):
1157 super(TestRPCRoundtrip, self).setUp()
1158 global server, server_ip, lp, creds
1159 self.server = server_name
1160 self.server_ip = server_ip
1163 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
1164 self.lp, self.creds)
1167 super(TestRPCRoundtrip, self).tearDown()
1169 def test_update_add_txt_rpc_to_dns(self):
1170 prefix, txt = 'rpctextrec', ['"This is a test"']
1172 name = "%s.%s" % (prefix, self.get_dns_domain())
1174 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1175 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1176 add_rec_buf.rec = rec
1178 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1179 0, self.server_ip, self.get_dns_domain(),
1180 name, add_rec_buf, None)
1181 except WERRORError as e:
1185 self.check_query_txt(prefix, txt)
1187 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1188 0, self.server_ip, self.get_dns_domain(),
1189 name, None, add_rec_buf)
1191 def test_update_add_null_padded_txt_record(self):
1192 "test adding records works"
1193 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1194 p = self.make_txt_update(prefix, txt)
1195 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1196 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1197 self.check_query_txt(prefix, txt)
1198 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1199 self.get_dns_domain(),
1200 "%s.%s" % (prefix, self.get_dns_domain()),
1201 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
1203 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1204 p = self.make_txt_update(prefix, txt)
1205 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1206 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1207 self.check_query_txt(prefix, txt)
1208 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1209 self.get_dns_domain(),
1210 "%s.%s" % (prefix, self.get_dns_domain()),
1211 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
1213 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1214 p = self.make_txt_update(prefix, txt)
1215 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1216 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1217 self.check_query_txt(prefix, txt)
1218 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1219 self.get_dns_domain(),
1220 "%s.%s" % (prefix, self.get_dns_domain()),
1221 dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
1223 def test_update_add_padding_rpc_to_dns(self):
1224 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1225 prefix = 'rpc' + prefix
1226 name = "%s.%s" % (prefix, self.get_dns_domain())
1228 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
1229 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1230 add_rec_buf.rec = rec
1232 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1233 0, self.server_ip, self.get_dns_domain(),
1234 name, add_rec_buf, None)
1236 except WERRORError as e:
1240 self.check_query_txt(prefix, txt)
1242 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1243 0, self.server_ip, self.get_dns_domain(),
1244 name, None, add_rec_buf)
1246 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1247 prefix = 'rpc' + prefix
1248 name = "%s.%s" % (prefix, self.get_dns_domain())
1250 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
1251 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1252 add_rec_buf.rec = rec
1254 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1255 0, self.server_ip, self.get_dns_domain(),
1256 name, add_rec_buf, None)
1258 except WERRORError as e:
1262 self.check_query_txt(prefix, txt)
1264 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1265 0, self.server_ip, self.get_dns_domain(),
1266 name, None, add_rec_buf)
1268 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1269 prefix = 'rpc' + prefix
1270 name = "%s.%s" % (prefix, self.get_dns_domain())
1272 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
1273 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1274 add_rec_buf.rec = rec
1276 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1277 0, self.server_ip, self.get_dns_domain(),
1278 name, add_rec_buf, None)
1279 except WERRORError as e:
1283 self.check_query_txt(prefix, txt)
1285 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1286 0, self.server_ip, self.get_dns_domain(),
1287 name, None, add_rec_buf)
1289 # Test is incomplete due to strlen against txt records
1290 def test_update_add_null_char_txt_record(self):
1291 "test adding records works"
1292 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1293 p = self.make_txt_update(prefix, txt)
1294 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1295 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1296 self.check_query_txt(prefix, ['NULL'])
1297 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1298 self.get_dns_domain(),
1299 "%s.%s" % (prefix, self.get_dns_domain()),
1300 dnsp.DNS_TYPE_TXT, '"NULL"'))
1302 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1303 p = self.make_txt_update(prefix, txt)
1304 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1305 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1306 self.check_query_txt(prefix, ['NULL', 'NULL'])
1307 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1308 self.get_dns_domain(),
1309 "%s.%s" % (prefix, self.get_dns_domain()),
1310 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1312 def test_update_add_null_char_rpc_to_dns(self):
1313 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1314 prefix = 'rpc' + prefix
1315 name = "%s.%s" % (prefix, self.get_dns_domain())
1317 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
1318 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1319 add_rec_buf.rec = rec
1321 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1322 0, self.server_ip, self.get_dns_domain(),
1323 name, add_rec_buf, None)
1325 except WERRORError as e:
1329 self.check_query_txt(prefix, ['NULL'])
1331 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1332 0, self.server_ip, self.get_dns_domain(),
1333 name, None, add_rec_buf)
1335 def test_update_add_hex_char_txt_record(self):
1336 "test adding records works"
1337 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1338 p = self.make_txt_update(prefix, txt)
1339 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1340 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1341 self.check_query_txt(prefix, txt)
1342 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1343 self.get_dns_domain(),
1344 "%s.%s" % (prefix, self.get_dns_domain()),
1345 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1347 def test_update_add_hex_rpc_to_dns(self):
1348 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1349 prefix = 'rpc' + prefix
1350 name = "%s.%s" % (prefix, self.get_dns_domain())
1352 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1353 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1354 add_rec_buf.rec = rec
1356 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1357 0, self.server_ip, self.get_dns_domain(),
1358 name, add_rec_buf, None)
1360 except WERRORError as e:
1364 self.check_query_txt(prefix, txt)
1366 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1367 0, self.server_ip, self.get_dns_domain(),
1368 name, None, add_rec_buf)
1370 def test_update_add_slash_txt_record(self):
1371 "test adding records works"
1372 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1373 p = self.make_txt_update(prefix, txt)
1374 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1375 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1376 self.check_query_txt(prefix, txt)
1377 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1378 self.get_dns_domain(),
1379 "%s.%s" % (prefix, self.get_dns_domain()),
1380 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1382 # This test fails against Windows as it eliminates slashes in RPC
1383 # One typical use for a slash is in records like 'var=value' to
1384 # escape '=' characters.
1385 def test_update_add_slash_rpc_to_dns(self):
1386 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1387 prefix = 'rpc' + prefix
1388 name = "%s.%s" % (prefix, self.get_dns_domain())
1390 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1391 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1392 add_rec_buf.rec = rec
1394 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1395 0, self.server_ip, self.get_dns_domain(),
1396 name, add_rec_buf, None)
1398 except WERRORError as e:
1402 self.check_query_txt(prefix, txt)
1405 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1406 0, self.server_ip, self.get_dns_domain(),
1407 name, None, add_rec_buf)
1409 def test_update_add_two_txt_records(self):
1410 "test adding two txt records works"
1411 prefix, txt = 'textrec2', ['"This is a test"',
1412 '"and this is a test, too"']
1413 p = self.make_txt_update(prefix, txt)
1414 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1415 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1416 self.check_query_txt(prefix, txt)
1417 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1418 self.get_dns_domain(),
1419 "%s.%s" % (prefix, self.get_dns_domain()),
1420 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1421 ' "\\"and this is a test, too\\""'))
1423 def test_update_add_two_rpc_to_dns(self):
1424 prefix, txt = 'textrec2', ['"This is a test"',
1425 '"and this is a test, too"']
1426 prefix = 'rpc' + prefix
1427 name = "%s.%s" % (prefix, self.get_dns_domain())
1429 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1430 '"\\"This is a test\\""' +
1431 ' "\\"and this is a test, too\\""')
1432 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1433 add_rec_buf.rec = rec
1435 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1436 0, self.server_ip, self.get_dns_domain(),
1437 name, add_rec_buf, None)
1439 except WERRORError as e:
1443 self.check_query_txt(prefix, txt)
1445 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1446 0, self.server_ip, self.get_dns_domain(),
1447 name, None, add_rec_buf)
1449 def test_update_add_empty_txt_records(self):
1450 "test adding two txt records works"
1451 prefix, txt = 'emptytextrec', []
1452 p = self.make_txt_update(prefix, txt)
1453 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1454 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1455 self.check_query_txt(prefix, txt)
1456 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1457 self.get_dns_domain(),
1458 "%s.%s" % (prefix, self.get_dns_domain()),
1459 dnsp.DNS_TYPE_TXT, ''))
1461 def test_update_add_empty_rpc_to_dns(self):
1462 prefix, txt = 'rpcemptytextrec', []
1464 name = "%s.%s" % (prefix, self.get_dns_domain())
1466 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1467 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1468 add_rec_buf.rec = rec
1470 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1471 0, self.server_ip, self.get_dns_domain(),
1472 name, add_rec_buf, None)
1473 except WERRORError as e:
1477 self.check_query_txt(prefix, txt)
1479 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1480 0, self.server_ip, self.get_dns_domain(),
1481 name, None, add_rec_buf)
1483 TestProgram(module=__name__, opts=subunitopts)