1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
25 from ldb import ERR_OPERATIONS_ERROR
30 import samba.ndr as ndr
31 from samba import credentials
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
41 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
42 sambaopts = options.SambaOptions(parser)
43 parser.add_option_group(sambaopts)
45 # This timeout only has relevance when testing against Windows
46 # Format errors tend to return patchy responses, so a timeout is needed.
47 parser.add_option("--timeout", type="int", dest="timeout",
48 help="Specify timeout for DNS requests")
50 # use command line creds if available
51 credopts = options.CredentialsOptions(parser)
52 parser.add_option_group(credopts)
53 subunitopts = SubunitOptions(parser)
54 parser.add_option_group(subunitopts)
56 opts, args = parser.parse_args()
58 lp = sambaopts.get_loadparm()
59 creds = credopts.get_credentials(lp)
61 timeout = opts.timeout
69 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
72 class TestSimpleQueries(DNSTest):
74 super(TestSimpleQueries, self).setUp()
75 global server, server_ip, lp, creds, timeout
76 self.server = server_name
77 self.server_ip = server_ip
80 self.timeout = timeout
82 def test_one_a_query(self):
83 "create a query packet containing one query record"
84 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
87 name = "%s.%s" % (self.server, self.get_dns_domain())
88 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
89 print("asking for ", q.name)
92 self.finish_name_packet(p, questions)
93 (response, response_packet) =\
94 self.dns_transaction_udp(p, host=server_ip)
95 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97 self.assertEquals(response.ancount, 1)
98 self.assertEquals(response.answers[0].rdata,
101 def test_one_SOA_query(self):
102 "create a query packet containing one query record for the SOA"
103 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
106 name = "%s" % (self.get_dns_domain())
107 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
108 print("asking for ", q.name)
111 self.finish_name_packet(p, questions)
112 (response, response_packet) =\
113 self.dns_transaction_udp(p, host=server_ip)
114 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
115 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
116 self.assertEquals(response.ancount, 1)
118 response.answers[0].rdata.mname.upper(),
119 ("%s.%s" % (self.server, self.get_dns_domain())).upper())
121 def test_one_a_query_tcp(self):
122 "create a query packet containing one query record via TCP"
123 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
126 name = "%s.%s" % (self.server, self.get_dns_domain())
127 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
128 print("asking for ", q.name)
131 self.finish_name_packet(p, questions)
132 (response, response_packet) =\
133 self.dns_transaction_tcp(p, host=server_ip)
134 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
135 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
136 self.assertEquals(response.ancount, 1)
137 self.assertEquals(response.answers[0].rdata,
140 def test_one_mx_query(self):
141 "create a query packet causing an empty RCODE_OK answer"
142 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
145 name = "%s.%s" % (self.server, self.get_dns_domain())
146 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
147 print("asking for ", q.name)
150 self.finish_name_packet(p, questions)
151 (response, response_packet) =\
152 self.dns_transaction_udp(p, host=server_ip)
153 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
154 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
155 self.assertEquals(response.ancount, 0)
157 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
160 name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
161 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
162 print("asking for ", q.name)
165 self.finish_name_packet(p, questions)
166 (response, response_packet) =\
167 self.dns_transaction_udp(p, host=server_ip)
168 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
169 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
170 self.assertEquals(response.ancount, 0)
172 def test_two_queries(self):
173 "create a query packet containing two query records"
174 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
177 name = "%s.%s" % (self.server, self.get_dns_domain())
178 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
181 name = "%s.%s" % ('bogusname', self.get_dns_domain())
182 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
185 self.finish_name_packet(p, questions)
187 (response, response_packet) =\
188 self.dns_transaction_udp(p, host=server_ip)
189 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
190 except socket.timeout:
191 # Windows chooses not to respond to incorrectly formatted queries.
192 # Although this appears to be non-deterministic even for the same
193 # request twice, it also appears to be based on a how poorly the
194 # request is formatted.
197 def test_qtype_all_query(self):
198 "create a QTYPE_ALL query"
199 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
202 name = "%s.%s" % (self.server, self.get_dns_domain())
203 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
204 print("asking for ", q.name)
207 self.finish_name_packet(p, questions)
208 (response, response_packet) =\
209 self.dns_transaction_udp(p, host=server_ip)
212 dc_ipv6 = os.getenv('SERVER_IPV6')
213 if dc_ipv6 is not None:
216 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
217 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
218 self.assertEquals(response.ancount, num_answers)
219 self.assertEquals(response.answers[0].rdata,
221 if dc_ipv6 is not None:
222 self.assertEquals(response.answers[1].rdata, dc_ipv6)
224 def test_qclass_none_query(self):
225 "create a QCLASS_NONE query"
226 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
229 name = "%s.%s" % (self.server, self.get_dns_domain())
230 q = self.make_name_question(
236 self.finish_name_packet(p, questions)
238 (response, response_packet) =\
239 self.dns_transaction_udp(p, host=server_ip)
240 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
241 except socket.timeout:
242 # Windows chooses not to respond to incorrectly formatted queries.
243 # Although this appears to be non-deterministic even for the same
244 # request twice, it also appears to be based on a how poorly the
245 # request is formatted.
248 def test_soa_hostname_query(self):
249 "create a SOA query for a hostname"
250 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
253 name = "%s.%s" % (self.server, self.get_dns_domain())
254 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
257 self.finish_name_packet(p, questions)
258 (response, response_packet) =\
259 self.dns_transaction_udp(p, host=server_ip)
260 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262 # We don't get SOA records for single hosts
263 self.assertEquals(response.ancount, 0)
264 # But we do respond with an authority section
265 self.assertEqual(response.nscount, 1)
267 def test_soa_domain_query(self):
268 "create a SOA query for a domain"
269 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
272 name = self.get_dns_domain()
273 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
276 self.finish_name_packet(p, questions)
277 (response, response_packet) =\
278 self.dns_transaction_udp(p, host=server_ip)
279 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
280 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
281 self.assertEquals(response.ancount, 1)
282 self.assertEquals(response.answers[0].rdata.minimum, 3600)
285 class TestDNSUpdates(DNSTest):
287 super(TestDNSUpdates, self).setUp()
288 global server, server_ip, lp, creds, timeout
289 self.server = server_name
290 self.server_ip = server_ip
293 self.timeout = timeout
295 def test_two_updates(self):
296 "create two update requests"
297 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
300 name = "%s.%s" % (self.server, self.get_dns_domain())
301 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
304 name = self.get_dns_domain()
305 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
308 self.finish_name_packet(p, updates)
310 (response, response_packet) =\
311 self.dns_transaction_udp(p, host=server_ip)
312 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
313 except socket.timeout:
314 # Windows chooses not to respond to incorrectly formatted queries.
315 # Although this appears to be non-deterministic even for the same
316 # request twice, it also appears to be based on a how poorly the
317 # request is formatted.
320 def test_update_wrong_qclass(self):
321 "create update with DNS_QCLASS_NONE"
322 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
325 name = self.get_dns_domain()
326 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
329 self.finish_name_packet(p, updates)
330 (response, response_packet) =\
331 self.dns_transaction_udp(p, host=server_ip)
332 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
334 def test_update_prereq_with_non_null_ttl(self):
335 "test update with a non-null TTL"
336 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
339 name = self.get_dns_domain()
341 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
343 self.finish_name_packet(p, updates)
347 r.name = "%s.%s" % (self.server, self.get_dns_domain())
348 r.rr_type = dns.DNS_QTYPE_TXT
349 r.rr_class = dns.DNS_QCLASS_NONE
354 p.ancount = len(prereqs)
358 (response, response_packet) =\
359 self.dns_transaction_udp(p, host=server_ip)
360 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
361 except socket.timeout:
362 # Windows chooses not to respond to incorrectly formatted queries.
363 # Although this appears to be non-deterministic even for the same
364 # request twice, it also appears to be based on a how poorly the
365 # request is formatted.
368 def test_update_prereq_with_non_null_length(self):
369 "test update with a non-null length"
370 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
373 name = self.get_dns_domain()
375 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
377 self.finish_name_packet(p, updates)
381 r.name = "%s.%s" % (self.server, self.get_dns_domain())
382 r.rr_type = dns.DNS_QTYPE_TXT
383 r.rr_class = dns.DNS_QCLASS_ANY
388 p.ancount = len(prereqs)
391 (response, response_packet) =\
392 self.dns_transaction_udp(p, host=server_ip)
393 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
395 def test_update_prereq_nonexisting_name(self):
396 "test update with a nonexisting name"
397 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
400 name = self.get_dns_domain()
402 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
404 self.finish_name_packet(p, updates)
408 r.name = "idontexist.%s" % self.get_dns_domain()
409 r.rr_type = dns.DNS_QTYPE_TXT
410 r.rr_class = dns.DNS_QCLASS_ANY
415 p.ancount = len(prereqs)
418 (response, response_packet) =\
419 self.dns_transaction_udp(p, host=server_ip)
420 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
422 def test_update_add_txt_record(self):
423 "test adding records works"
424 prefix, txt = 'textrec', ['"This is a test"']
425 p = self.make_txt_update(prefix, txt)
426 (response, response_packet) =\
427 self.dns_transaction_udp(p, host=server_ip)
428 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
429 self.check_query_txt(prefix, txt)
431 def test_delete_record(self):
432 "Test if deleting records works"
434 NAME = "deleterec.%s" % self.get_dns_domain()
436 # First, create a record to make sure we have a record to delete.
437 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
440 name = self.get_dns_domain()
442 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
444 self.finish_name_packet(p, updates)
449 r.rr_type = dns.DNS_QTYPE_TXT
450 r.rr_class = dns.DNS_QCLASS_IN
453 rdata = self.make_txt_record(['"This is a test"'])
456 p.nscount = len(updates)
459 (response, response_packet) =\
460 self.dns_transaction_udp(p, host=server_ip)
461 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
463 # Now check the record is around
464 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
466 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
469 self.finish_name_packet(p, questions)
470 (response, response_packet) =\
471 self.dns_transaction_udp(p, host=server_ip)
472 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
474 # Now delete the record
475 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
478 name = self.get_dns_domain()
480 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
482 self.finish_name_packet(p, updates)
487 r.rr_type = dns.DNS_QTYPE_TXT
488 r.rr_class = dns.DNS_QCLASS_NONE
491 rdata = self.make_txt_record(['"This is a test"'])
494 p.nscount = len(updates)
497 (response, response_packet) =\
498 self.dns_transaction_udp(p, host=server_ip)
499 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
501 # And finally check it's gone
502 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
505 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
508 self.finish_name_packet(p, questions)
509 (response, response_packet) =\
510 self.dns_transaction_udp(p, host=server_ip)
511 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
513 def test_readd_record(self):
514 "Test if adding, deleting and then readding a records works"
516 NAME = "readdrec.%s" % self.get_dns_domain()
519 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
522 name = self.get_dns_domain()
524 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
526 self.finish_name_packet(p, updates)
531 r.rr_type = dns.DNS_QTYPE_TXT
532 r.rr_class = dns.DNS_QCLASS_IN
535 rdata = self.make_txt_record(['"This is a test"'])
538 p.nscount = len(updates)
541 (response, response_packet) =\
542 self.dns_transaction_udp(p, host=server_ip)
543 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
545 # Now check the record is around
546 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
548 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
551 self.finish_name_packet(p, questions)
552 (response, response_packet) =\
553 self.dns_transaction_udp(p, host=server_ip)
554 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
556 # Now delete the record
557 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
560 name = self.get_dns_domain()
562 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
564 self.finish_name_packet(p, updates)
569 r.rr_type = dns.DNS_QTYPE_TXT
570 r.rr_class = dns.DNS_QCLASS_NONE
573 rdata = self.make_txt_record(['"This is a test"'])
576 p.nscount = len(updates)
579 (response, response_packet) =\
580 self.dns_transaction_udp(p, host=server_ip)
581 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
584 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
587 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
590 self.finish_name_packet(p, questions)
591 (response, response_packet) =\
592 self.dns_transaction_udp(p, host=server_ip)
593 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
595 # recreate the record
596 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
599 name = self.get_dns_domain()
601 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
603 self.finish_name_packet(p, updates)
608 r.rr_type = dns.DNS_QTYPE_TXT
609 r.rr_class = dns.DNS_QCLASS_IN
612 rdata = self.make_txt_record(['"This is a test"'])
615 p.nscount = len(updates)
618 (response, response_packet) =\
619 self.dns_transaction_udp(p, host=server_ip)
620 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
622 # Now check the record is around
623 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
625 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
628 self.finish_name_packet(p, questions)
629 (response, response_packet) =\
630 self.dns_transaction_udp(p, host=server_ip)
631 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
633 def test_update_add_mx_record(self):
634 "test adding MX records works"
635 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
638 name = self.get_dns_domain()
640 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
642 self.finish_name_packet(p, updates)
646 r.name = "%s" % self.get_dns_domain()
647 r.rr_type = dns.DNS_QTYPE_MX
648 r.rr_class = dns.DNS_QCLASS_IN
651 rdata = dns.mx_record()
652 rdata.preference = 10
653 rdata.exchange = 'mail.%s' % self.get_dns_domain()
656 p.nscount = len(updates)
659 (response, response_packet) =\
660 self.dns_transaction_udp(p, host=server_ip)
661 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
663 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
666 name = "%s" % self.get_dns_domain()
667 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
670 self.finish_name_packet(p, questions)
671 (response, response_packet) =\
672 self.dns_transaction_udp(p, host=server_ip)
673 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
674 self.assertEqual(response.ancount, 1)
675 ans = response.answers[0]
676 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
677 self.assertEqual(ans.rdata.preference, 10)
678 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
681 class TestComplexQueries(DNSTest):
682 def make_dns_update(self, key, value, qtype):
683 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
685 name = self.get_dns_domain()
686 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
687 self.finish_name_packet(p, [u])
692 r.rr_class = dns.DNS_QCLASS_IN
698 (response, response_packet) =\
699 self.dns_transaction_udp(p, host=server_ip)
700 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
703 super(TestComplexQueries, self).setUp()
705 global server, server_ip, lp, creds, timeout
706 self.server = server_name
707 self.server_ip = server_ip
710 self.timeout = timeout
712 def test_one_a_query(self):
713 "create a query packet containing one query record"
718 name = "cname_test.%s" % self.get_dns_domain()
719 rdata = "%s.%s" % (self.server, self.get_dns_domain())
720 self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
722 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
726 name = "cname_test.%s" % self.get_dns_domain()
727 q = self.make_name_question(name,
730 print("asking for ", q.name)
733 self.finish_name_packet(p, questions)
734 (response, response_packet) =\
735 self.dns_transaction_udp(p, host=self.server_ip)
736 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
737 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
738 self.assertEquals(response.ancount, 2)
739 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
740 self.assertEquals(response.answers[0].rdata, "%s.%s" %
741 (self.server, self.get_dns_domain()))
742 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
743 self.assertEquals(response.answers[1].rdata,
748 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
751 name = self.get_dns_domain()
753 u = self.make_name_question(name,
757 self.finish_name_packet(p, updates)
761 r.name = "cname_test.%s" % self.get_dns_domain()
762 r.rr_type = dns.DNS_QTYPE_CNAME
763 r.rr_class = dns.DNS_QCLASS_NONE
766 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
768 p.nscount = len(updates)
771 (response, response_packet) =\
772 self.dns_transaction_udp(p, host=self.server_ip)
773 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
775 def test_cname_two_chain(self):
776 name0 = "cnamechain0.%s" % self.get_dns_domain()
777 name1 = "cnamechain1.%s" % self.get_dns_domain()
778 name2 = "cnamechain2.%s" % self.get_dns_domain()
779 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
780 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
781 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
783 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
785 q = self.make_name_question(name1, dns.DNS_QTYPE_A,
789 self.finish_name_packet(p, questions)
790 (response, response_packet) =\
791 self.dns_transaction_udp(p, host=server_ip)
792 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
793 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
794 self.assertEquals(response.ancount, 3)
796 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
797 self.assertEquals(response.answers[0].name, name1)
798 self.assertEquals(response.answers[0].rdata, name2)
800 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
801 self.assertEquals(response.answers[1].name, name2)
802 self.assertEquals(response.answers[1].rdata, name0)
804 self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
805 self.assertEquals(response.answers[2].rdata,
808 def test_invalid_empty_cname(self):
809 name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
811 self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
812 except AssertionError:
815 self.fail("Successfully added empty CNAME, which is invalid.")
817 def test_cname_two_chain_not_matching_qtype(self):
818 name0 = "cnamechain0.%s" % self.get_dns_domain()
819 name1 = "cnamechain1.%s" % self.get_dns_domain()
820 name2 = "cnamechain2.%s" % self.get_dns_domain()
821 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
822 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
823 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
825 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
827 q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
831 self.finish_name_packet(p, questions)
832 (response, response_packet) =\
833 self.dns_transaction_udp(p, host=server_ip)
834 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
835 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
837 # CNAME should return all intermediate results!
838 # Only the A records exists, not the TXT.
839 self.assertEquals(response.ancount, 2)
841 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
842 self.assertEquals(response.answers[0].name, name1)
843 self.assertEquals(response.answers[0].rdata, name2)
845 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
846 self.assertEquals(response.answers[1].name, name2)
847 self.assertEquals(response.answers[1].rdata, name0)
849 def test_cname_loop(self):
850 cname1 = "cnamelooptestrec." + self.get_dns_domain()
851 cname2 = "cnamelooptestrec2." + self.get_dns_domain()
852 cname3 = "cnamelooptestrec3." + self.get_dns_domain()
853 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
854 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
855 self.make_dns_update(cname3, cname1, dnsp.DNS_TYPE_CNAME)
857 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
860 q = self.make_name_question(cname1,
864 self.finish_name_packet(p, questions)
866 (response, response_packet) =\
867 self.dns_transaction_udp(p, host=self.server_ip)
869 max_recursion_depth = 20
870 self.assertEquals(len(response.answers), max_recursion_depth)
872 # Make sure cname limit doesn't count other records. This is a generic
873 # test called in tests below
874 def max_rec_test(self, rtype, rec_gen):
875 name = "limittestrec{0}.{1}".format(rtype, self.get_dns_domain())
877 num_recs_to_enter = limit + 5
879 for i in range(1, num_recs_to_enter+1):
881 self.make_dns_update(name, ip, rtype)
883 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
886 q = self.make_name_question(name,
890 self.finish_name_packet(p, questions)
892 (response, response_packet) =\
893 self.dns_transaction_udp(p, host=self.server_ip)
895 self.assertEqual(len(response.answers), num_recs_to_enter)
897 def test_record_limit_A(self):
899 return "127.0.0." + str(i)
900 self.max_rec_test(rtype=dns.DNS_QTYPE_A, rec_gen=ip4_gen)
902 def test_record_limit_AAAA(self):
904 return "AAAA:0:0:0:0:0:0:" + str(i)
905 self.max_rec_test(rtype=dns.DNS_QTYPE_AAAA, rec_gen=ip6_gen)
907 def test_record_limit_SRV(self):
909 rec = dns.srv_record()
913 rec.target = "srvtestrec" + str(i)
915 self.max_rec_test(rtype=dns.DNS_QTYPE_SRV, rec_gen=srv_gen)
917 # Same as test_record_limit_A but with a preceding CNAME follow
918 def test_cname_limit(self):
919 cname1 = "cnamelimittestrec." + self.get_dns_domain()
920 cname2 = "cnamelimittestrec2." + self.get_dns_domain()
921 cname3 = "cnamelimittestrec3." + self.get_dns_domain()
922 ip_prefix = '127.0.0.'
924 num_recs_to_enter = limit + 5
926 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
927 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
928 num_arecs_to_enter = num_recs_to_enter - 2
929 for i in range(1, num_arecs_to_enter+1):
930 ip = ip_prefix + str(i)
931 self.make_dns_update(cname3, ip, dns.DNS_QTYPE_A)
933 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
936 q = self.make_name_question(cname1,
940 self.finish_name_packet(p, questions)
942 (response, response_packet) =\
943 self.dns_transaction_udp(p, host=self.server_ip)
945 self.assertEqual(len(response.answers), num_recs_to_enter)
947 # ANY query on cname record shouldn't follow the link
948 def test_cname_any_query(self):
949 cname1 = "cnameanytestrec." + self.get_dns_domain()
950 cname2 = "cnameanytestrec2." + self.get_dns_domain()
951 cname3 = "cnameanytestrec3." + self.get_dns_domain()
953 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
954 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
956 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
959 q = self.make_name_question(cname1,
963 self.finish_name_packet(p, questions)
965 (response, response_packet) =\
966 self.dns_transaction_udp(p, host=self.server_ip)
968 self.assertEqual(len(response.answers), 1)
969 self.assertEqual(response.answers[0].name, cname1)
970 self.assertEqual(response.answers[0].rdata, cname2)
973 class TestInvalidQueries(DNSTest):
975 super(TestInvalidQueries, self).setUp()
976 global server, server_ip, lp, creds, timeout
977 self.server = server_name
978 self.server_ip = server_ip
981 self.timeout = timeout
983 def test_one_a_query(self):
984 """send 0 bytes follows by create a query packet
985 containing one query record"""
989 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
990 s.connect((self.server_ip, 53))
996 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
999 name = "%s.%s" % (self.server, self.get_dns_domain())
1000 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1001 print("asking for ", q.name)
1004 self.finish_name_packet(p, questions)
1005 (response, response_packet) =\
1006 self.dns_transaction_udp(p, host=self.server_ip)
1007 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1008 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1009 self.assertEquals(response.ancount, 1)
1010 self.assertEquals(response.answers[0].rdata,
1013 def test_one_a_reply(self):
1014 "send a reply instead of a query"
1017 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1020 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
1021 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1022 print("asking for ", q.name)
1025 self.finish_name_packet(p, questions)
1026 p.operation |= dns.DNS_FLAG_REPLY
1029 send_packet = ndr.ndr_pack(p)
1030 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
1031 s.settimeout(timeout)
1032 host = self.server_ip
1033 s.connect((host, 53))
1034 tcp_packet = struct.pack('!H', len(send_packet))
1035 tcp_packet += send_packet
1036 s.send(tcp_packet, 0)
1037 recv_packet = s.recv(0xffff + 2, 0)
1038 self.assertEquals(0, len(recv_packet))
1039 except socket.timeout:
1040 # Windows chooses not to respond to incorrectly formatted queries.
1041 # Although this appears to be non-deterministic even for the same
1042 # request twice, it also appears to be based on a how poorly the
1043 # request is formatted.
1050 class TestZones(DNSTest):
1052 super(TestZones, self).setUp()
1053 global server, server_ip, lp, creds, timeout
1054 self.server = server_name
1055 self.server_ip = server_ip
1058 self.timeout = timeout
1060 self.zone = "test.lan"
1061 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1063 self.lp, self.creds)
1065 self.samdb = SamDB(url="ldap://" + self.server_ip,
1066 lp=self.get_loadparm(),
1067 session_info=system_session(),
1068 credentials=self.creds)
1069 self.zone_dn = "DC=" + self.zone +\
1070 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
1071 str(self.samdb.get_default_basedn())
1074 super(TestZones, self).tearDown()
1077 self.delete_zone(self.zone)
1078 except RuntimeError as e:
1079 (num, string) = e.args
1080 if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
1083 def create_zone(self, zone, aging_enabled=False):
1084 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1085 zone_create.pszZoneName = zone
1086 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
1087 zone_create.fAging = int(aging_enabled)
1088 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
1089 zone_create.fDsIntegrated = 1
1090 zone_create.fLoadExisting = 1
1091 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
1093 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1094 self.rpc_conn.DnssrvOperation2(client_version,
1100 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
1102 except WERRORError as e:
1105 def set_params(self, **kwargs):
1106 zone = kwargs.pop('zone', None)
1107 for key, val in kwargs.items():
1108 name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
1109 name_param.dwParam = val
1110 name_param.pszNodeName = key
1112 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1113 nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
1115 self.rpc_conn.DnssrvOperation2(client_version,
1120 'ResetDwordProperty',
1123 except WERRORError as e:
1126 def ldap_modify_dnsrecs(self, name, func):
1127 dn = 'DC={0},{1}'.format(name, self.zone_dn)
1128 dns_recs = self.ldap_get_dns_records(name)
1129 for rec in dns_recs:
1131 update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1132 self.samdb.modify(ldb.Message.from_dict(self.samdb,
1134 ldb.FLAG_MOD_REPLACE))
1136 def dns_update_record(self, prefix, txt):
1137 p = self.make_txt_update(prefix, txt, self.zone)
1138 (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1139 self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1140 recs = self.ldap_get_dns_records(prefix)
1141 recs = [r for r in recs if r.data.str == txt]
1142 self.assertEqual(len(recs), 1)
1145 def dns_tombstone(self, prefix, txt, zone):
1146 name = prefix + "." + zone
1148 to = dnsp.DnssrvRpcRecord()
1149 to.dwTimeStamp = 1000
1150 to.wType = dnsp.DNS_TYPE_TOMBSTONE
1152 self.samdb.dns_replace(name, [to])
1154 def ldap_get_records(self, name):
1155 # The use of SCOPE_SUBTREE here avoids raising an exception in the
1156 # 0 results case for a test below.
1158 expr = "(&(objectClass=dnsNode)(name={0}))".format(name)
1159 return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1160 expression=expr, attrs=["*"])
1162 def ldap_get_dns_records(self, name):
1163 records = self.ldap_get_records(name)
1164 return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1165 for r in records[0].get('dnsRecord')]
1167 def ldap_get_zone_settings(self):
1168 records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1169 expression="(&(objectClass=dnsZone)" +
1170 "(name={0}))".format(self.zone),
1171 attrs=["dNSProperty"])
1172 self.assertEqual(len(records), 1)
1173 props = [ndr_unpack(dnsp.DnsProperty, r)
1174 for r in records[0].get('dNSProperty')]
1176 # We have no choice but to repeat these here.
1177 zone_prop_ids = {0x00: "EMPTY",
1179 0x02: "ALLOW_UPDATE",
1180 0x08: "SECURE_TIME",
1181 0x10: "NOREFRESH_INTERVAL",
1182 0x11: "SCAVENGING_SERVERS",
1183 0x12: "AGING_ENABLED_TIME",
1184 0x20: "REFRESH_INTERVAL",
1185 0x40: "AGING_STATE",
1186 0x80: "DELETED_FROM_HOSTNAME",
1187 0x81: "MASTER_SERVERS",
1188 0x82: "AUTO_NS_SERVERS",
1189 0x83: "DCPROMO_CONVERT",
1190 0x90: "SCAVENGING_SERVERS_DA",
1191 0x91: "MASTER_SERVERS_DA",
1192 0x92: "NS_SERVERS_DA",
1193 0x100: "NODE_DBFLAGS"}
1194 return {zone_prop_ids[p.id].lower(): p.data for p in props}
1196 def set_aging(self, enable=False):
1197 self.create_zone(self.zone, aging_enabled=enable)
1198 self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1199 Aging=int(bool(enable)), zone=self.zone,
1200 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1202 def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1203 self.set_aging(enable=True)
1204 settings = self.ldap_get_zone_settings()
1205 self.assertTrue(settings['aging_state'] is not None)
1206 self.assertTrue(settings['aging_state'])
1208 rec = self.dns_update_record('agingtest', ['test txt'])
1209 self.assertNotEqual(rec.dwTimeStamp, 0)
1211 def test_set_aging_disabled(self):
1212 self.set_aging(enable=False)
1213 settings = self.ldap_get_zone_settings()
1214 self.assertTrue(settings['aging_state'] is not None)
1215 self.assertFalse(settings['aging_state'])
1217 rec = self.dns_update_record('agingtest', ['test txt'])
1218 self.assertNotEqual(rec.dwTimeStamp, 0)
1220 def test_aging_update(self, enable=True):
1221 name, txt = 'agingtest', ['test txt']
1222 self.set_aging(enable=True)
1223 before_mod = self.dns_update_record(name, txt)
1225 self.set_params(zone=self.zone, Aging=0)
1229 self.assertTrue(rec.dwTimeStamp > 0)
1230 rec.dwTimeStamp -= dec
1231 self.ldap_modify_dnsrecs(name, mod_ts)
1232 after_mod = self.ldap_get_dns_records(name)
1233 self.assertEqual(len(after_mod), 1)
1234 after_mod = after_mod[0]
1235 self.assertEqual(after_mod.dwTimeStamp,
1236 before_mod.dwTimeStamp - dec)
1237 after_update = self.dns_update_record(name, txt)
1238 after_should_equal = before_mod if enable else after_mod
1239 self.assertEqual(after_should_equal.dwTimeStamp,
1240 after_update.dwTimeStamp)
1242 def test_aging_update_disabled(self):
1243 self.test_aging_update(enable=False)
1245 def test_aging_refresh(self):
1246 name, txt = 'agingtest', ['test txt']
1247 self.create_zone(self.zone, aging_enabled=True)
1249 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1250 Aging=1, zone=self.zone,
1251 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1252 before_mod = self.dns_update_record(name, txt)
1255 self.assertTrue(rec.dwTimeStamp > 0)
1256 rec.dwTimeStamp -= interval / 2
1257 self.ldap_modify_dnsrecs(name, mod_ts)
1258 update_during_norefresh = self.dns_update_record(name, txt)
1261 self.assertTrue(rec.dwTimeStamp > 0)
1262 rec.dwTimeStamp -= interval + interval / 2
1263 self.ldap_modify_dnsrecs(name, mod_ts)
1264 update_during_refresh = self.dns_update_record(name, txt)
1265 self.assertEqual(update_during_norefresh.dwTimeStamp,
1266 before_mod.dwTimeStamp - interval / 2)
1267 self.assertEqual(update_during_refresh.dwTimeStamp,
1268 before_mod.dwTimeStamp)
1270 def test_rpc_add_no_timestamp(self):
1271 name, txt = 'agingtest', ['test txt']
1272 self.set_aging(enable=True)
1273 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1274 rec_buf.rec = TXTRecord(txt)
1275 self.rpc_conn.DnssrvUpdateRecord2(
1276 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1283 recs = self.ldap_get_dns_records(name)
1284 self.assertEqual(len(recs), 1)
1285 self.assertEqual(recs[0].dwTimeStamp, 0)
1287 def test_static_record_dynamic_update(self):
1288 name, txt = 'agingtest', ['test txt']
1289 txt2 = ['test txt2']
1290 self.set_aging(enable=True)
1291 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1292 rec_buf.rec = TXTRecord(txt)
1293 self.rpc_conn.DnssrvUpdateRecord2(
1294 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1302 rec2 = self.dns_update_record(name, txt2)
1303 self.assertEqual(rec2.dwTimeStamp, 0)
1305 def test_dynamic_record_static_update(self):
1306 name, txt = 'agingtest', ['test txt']
1307 txt2 = ['test txt2']
1308 txt3 = ['test txt3']
1309 self.set_aging(enable=True)
1311 self.dns_update_record(name, txt)
1313 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1314 rec_buf.rec = TXTRecord(txt2)
1315 self.rpc_conn.DnssrvUpdateRecord2(
1316 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1324 self.dns_update_record(name, txt3)
1326 recs = self.ldap_get_dns_records(name)
1327 # Put in dict because ldap recs might be out of order
1328 recs = {str(r.data.str): r for r in recs}
1329 self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1330 self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1331 self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1333 def test_dns_tombstone_custom_match_rule(self):
1334 lp = self.get_loadparm()
1335 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1336 session_info=system_session(),
1337 credentials=self.creds)
1339 name, txt = 'agingtest', ['test txt']
1340 name2, txt2 = 'agingtest2', ['test txt2']
1341 name3, txt3 = 'agingtest3', ['test txt3']
1342 name4, txt4 = 'agingtest4', ['test txt4']
1343 name5, txt5 = 'agingtest5', ['test txt5']
1345 self.create_zone(self.zone, aging_enabled=True)
1347 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1348 Aging=1, zone=self.zone,
1349 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1351 self.dns_update_record(name, txt)
1353 self.dns_update_record(name2, txt)
1354 self.dns_update_record(name2, txt2)
1356 self.dns_update_record(name3, txt)
1357 self.dns_update_record(name3, txt2)
1358 last_update = self.dns_update_record(name3, txt3)
1360 # Modify txt1 of the first 2 names
1362 if rec.data.str == txt:
1363 rec.dwTimeStamp -= 2
1364 self.ldap_modify_dnsrecs(name, mod_ts)
1365 self.ldap_modify_dnsrecs(name2, mod_ts)
1367 # create a static dns record.
1368 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1369 rec_buf.rec = TXTRecord(txt4)
1370 self.rpc_conn.DnssrvUpdateRecord2(
1371 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1379 # Create a tomb stoned record.
1380 self.dns_update_record(name5, txt5)
1381 self.dns_tombstone(name5, txt5, self.zone)
1383 self.ldap_get_dns_records(name3)
1384 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1385 expr = expr.format(int(last_update.dwTimeStamp) - 1)
1387 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1388 expression=expr, attrs=["*"])
1389 except ldb.LdbError as e:
1391 updated_names = {str(r.get('name')) for r in res}
1392 self.assertEqual(updated_names, set([name, name2]))
1394 def test_dns_tombstone_custom_match_rule_no_records(self):
1395 lp = self.get_loadparm()
1396 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1397 session_info=system_session(),
1398 credentials=self.creds)
1400 self.create_zone(self.zone, aging_enabled=True)
1402 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1403 Aging=1, zone=self.zone,
1404 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1406 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1407 expr = expr.format(1)
1410 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1411 expression=expr, attrs=["*"])
1412 except ldb.LdbError as e:
1414 self.assertEqual(0, len(res))
1416 def test_dns_tombstone_custom_match_rule_fail(self):
1417 self.create_zone(self.zone, aging_enabled=True)
1418 samdb = SamDB(url=lp.samdb_url(),
1420 session_info=system_session(),
1421 credentials=self.creds)
1423 # Property name in not dnsRecord
1424 expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1425 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1426 expression=expr, attrs=["*"])
1427 self.assertEquals(len(res), 0)
1429 # No value for tombstone time
1431 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1432 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1433 expression=expr, attrs=["*"])
1434 self.assertEquals(len(res), 0)
1435 self.fail("Exception: ldb.ldbError not generated")
1436 except ldb.LdbError as e:
1438 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1440 # Tombstone time = -
1442 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1443 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1444 expression=expr, attrs=["*"])
1445 self.assertEquals(len(res), 0)
1446 self.fail("Exception: ldb.ldbError not generated")
1447 except ldb.LdbError as e:
1449 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1451 # Tombstone time longer than 64 characters
1453 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1454 expr = expr.format("1" * 65)
1455 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1456 expression=expr, attrs=["*"])
1457 self.assertEquals(len(res), 0)
1458 self.fail("Exception: ldb.ldbError not generated")
1459 except ldb.LdbError as e:
1461 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1463 # Non numeric Tombstone time
1465 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1466 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1467 expression=expr, attrs=["*"])
1468 self.assertEquals(len(res), 0)
1469 self.fail("Exception: ldb.ldbError not generated")
1470 except ldb.LdbError as e:
1472 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1474 # Non system session
1476 db = SamDB(url="ldap://" + self.server_ip,
1477 lp=self.get_loadparm(),
1478 credentials=self.creds)
1480 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1481 res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1482 expression=expr, attrs=["*"])
1483 self.assertEquals(len(res), 0)
1484 self.fail("Exception: ldb.ldbError not generated")
1485 except ldb.LdbError as e:
1487 self.assertEquals(num, ERR_OPERATIONS_ERROR)
1489 def test_basic_scavenging(self):
1490 lp = self.get_loadparm()
1491 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1492 session_info=system_session(),
1493 credentials=self.creds)
1495 self.create_zone(self.zone, aging_enabled=True)
1497 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1498 zone=self.zone, Aging=1,
1499 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1500 name, txt = 'agingtest', ['test txt']
1501 name2, txt2 = 'agingtest2', ['test txt2']
1502 name3, txt3 = 'agingtest3', ['test txt3']
1503 self.dns_update_record(name, txt)
1504 self.dns_update_record(name2, txt)
1505 self.dns_update_record(name2, txt2)
1506 self.dns_update_record(name3, txt)
1507 self.dns_update_record(name3, txt2)
1508 last_add = self.dns_update_record(name3, txt3)
1511 self.assertTrue(rec.dwTimeStamp > 0)
1512 if rec.data.str == txt:
1513 rec.dwTimeStamp -= interval * 5
1514 self.ldap_modify_dnsrecs(name, mod_ts)
1515 self.ldap_modify_dnsrecs(name2, mod_ts)
1516 self.ldap_modify_dnsrecs(name3, mod_ts)
1517 self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1518 dsdb._scavenge_dns_records(self.samdb)
1520 recs = self.ldap_get_dns_records(name)
1521 self.assertEqual(len(recs), 1)
1522 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1524 recs = self.ldap_get_dns_records(name2)
1525 self.assertEqual(len(recs), 1)
1526 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1527 self.assertEqual(recs[0].data.str, txt2)
1529 recs = self.ldap_get_dns_records(name3)
1530 self.assertEqual(len(recs), 2)
1531 txts = {str(r.data.str) for r in recs}
1532 self.assertEqual(txts, {str(txt2), str(txt3)})
1533 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1534 self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1536 for make_it_work in [False, True]:
1537 inc = -1 if make_it_work else 1
1540 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1541 self.ldap_modify_dnsrecs(name, mod_ts)
1542 dsdb._dns_delete_tombstones(self.samdb)
1543 recs = self.ldap_get_records(name)
1545 self.assertEqual(len(recs), 0)
1547 self.assertEqual(len(recs), 1)
1549 def delete_zone(self, zone):
1550 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1556 dnsserver.DNSSRV_TYPEID_NULL,
1559 def test_soa_query(self):
1561 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1564 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1566 self.finish_name_packet(p, questions)
1568 (response, response_packet) =\
1569 self.dns_transaction_udp(p, host=server_ip)
1570 # Windows returns OK while BIND logically seems to return NXDOMAIN
1571 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1572 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1573 self.assertEquals(response.ancount, 0)
1575 self.create_zone(zone)
1576 (response, response_packet) =\
1577 self.dns_transaction_udp(p, host=server_ip)
1578 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1579 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1580 self.assertEquals(response.ancount, 1)
1581 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1583 self.delete_zone(zone)
1584 (response, response_packet) =\
1585 self.dns_transaction_udp(p, host=server_ip)
1586 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1587 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1588 self.assertEquals(response.ancount, 0)
1591 class TestRPCRoundtrip(DNSTest):
1593 super(TestRPCRoundtrip, self).setUp()
1594 global server, server_ip, lp, creds
1595 self.server = server_name
1596 self.server_ip = server_ip
1599 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1605 super(TestRPCRoundtrip, self).tearDown()
1607 def rpc_update(self, fqn=None, data=None, wType=None, delete=False):
1608 fqn = fqn or ("rpctestrec." + self.get_dns_domain())
1610 rec = data_to_dns_record(wType, data)
1611 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1612 add_rec_buf.rec = rec
1614 add_arg = add_rec_buf
1618 del_arg = add_rec_buf
1620 self.rpc_conn.DnssrvUpdateRecord2(
1621 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1624 self.get_dns_domain(),
1629 def test_rpc_self_referencing_cname(self):
1630 cname = "cnametest2_unqual_rec_loop"
1631 cname_fqn = "%s.%s" % (cname, self.get_dns_domain())
1634 self.rpc_update(fqn=cname, data=cname_fqn,
1635 wType=dnsp.DNS_TYPE_CNAME, delete=True)
1636 except WERRORError as e:
1637 if e.args[0] != werror.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST:
1638 self.fail("RPC DNS gaven wrong error on pre-test cleanup "
1639 "for self referencing CNAME: %s" % e.args[0])
1642 self.rpc_update(fqn=cname, wType=dnsp.DNS_TYPE_CNAME, data=cname_fqn)
1643 except WERRORError as e:
1644 if e.args[0] != werror.WERR_DNS_ERROR_CNAME_LOOP:
1645 self.fail("RPC DNS gaven wrong error on insertion of "
1646 "self referencing CNAME: %s" % e.args[0])
1649 self.fail("RPC DNS allowed insertion of self referencing CNAME")
1651 def test_update_add_txt_rpc_to_dns(self):
1652 prefix, txt = 'rpctextrec', ['"This is a test"']
1654 name = "%s.%s" % (prefix, self.get_dns_domain())
1656 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1657 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1658 add_rec_buf.rec = rec
1660 self.rpc_conn.DnssrvUpdateRecord2(
1661 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1664 self.get_dns_domain(),
1669 except WERRORError as e:
1673 self.check_query_txt(prefix, txt)
1675 self.rpc_conn.DnssrvUpdateRecord2(
1676 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1679 self.get_dns_domain(),
1684 def test_update_add_null_padded_txt_record(self):
1685 "test adding records works"
1686 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1687 p = self.make_txt_update(prefix, txt)
1688 (response, response_packet) =\
1689 self.dns_transaction_udp(p, host=server_ip)
1690 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1691 self.check_query_txt(prefix, txt)
1692 self.assertIsNotNone(
1693 dns_record_match(self.rpc_conn,
1695 self.get_dns_domain(),
1696 "%s.%s" % (prefix, self.get_dns_domain()),
1698 '"\\"This is a test\\"" "" ""'))
1700 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1701 p = self.make_txt_update(prefix, txt)
1702 (response, response_packet) =\
1703 self.dns_transaction_udp(p, host=server_ip)
1704 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1705 self.check_query_txt(prefix, txt)
1706 self.assertIsNotNone(
1710 self.get_dns_domain(),
1711 "%s.%s" % (prefix, self.get_dns_domain()),
1713 '"\\"This is a test\\"" "" "" "more text"'))
1715 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1716 p = self.make_txt_update(prefix, txt)
1717 (response, response_packet) =\
1718 self.dns_transaction_udp(p, host=server_ip)
1719 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1720 self.check_query_txt(prefix, txt)
1721 self.assertIsNotNone(
1725 self.get_dns_domain(),
1726 "%s.%s" % (prefix, self.get_dns_domain()),
1728 '"" "" "\\"This is a test\\""'))
1730 def test_update_add_padding_rpc_to_dns(self):
1731 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1732 prefix = 'rpc' + prefix
1733 name = "%s.%s" % (prefix, self.get_dns_domain())
1735 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1736 '"\\"This is a test\\"" "" ""')
1737 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1738 add_rec_buf.rec = rec
1740 self.rpc_conn.DnssrvUpdateRecord2(
1741 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1744 self.get_dns_domain(),
1749 except WERRORError as e:
1753 self.check_query_txt(prefix, txt)
1755 self.rpc_conn.DnssrvUpdateRecord2(
1756 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1759 self.get_dns_domain(),
1764 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1765 prefix = 'rpc' + prefix
1766 name = "%s.%s" % (prefix, self.get_dns_domain())
1768 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1769 '"\\"This is a test\\"" "" "" "more text"')
1770 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1771 add_rec_buf.rec = rec
1773 self.rpc_conn.DnssrvUpdateRecord2(
1774 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1777 self.get_dns_domain(),
1782 except WERRORError as e:
1786 self.check_query_txt(prefix, txt)
1788 self.rpc_conn.DnssrvUpdateRecord2(
1789 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1792 self.get_dns_domain(),
1797 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1798 prefix = 'rpc' + prefix
1799 name = "%s.%s" % (prefix, self.get_dns_domain())
1801 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1802 '"" "" "\\"This is a test\\""')
1803 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1804 add_rec_buf.rec = rec
1806 self.rpc_conn.DnssrvUpdateRecord2(
1807 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1810 self.get_dns_domain(),
1814 except WERRORError as e:
1818 self.check_query_txt(prefix, txt)
1820 self.rpc_conn.DnssrvUpdateRecord2(
1821 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1824 self.get_dns_domain(),
1829 # Test is incomplete due to strlen against txt records
1830 def test_update_add_null_char_txt_record(self):
1831 "test adding records works"
1832 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1833 p = self.make_txt_update(prefix, txt)
1834 (response, response_packet) =\
1835 self.dns_transaction_udp(p, host=server_ip)
1836 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1837 self.check_query_txt(prefix, ['NULL'])
1838 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1839 self.get_dns_domain(),
1840 "%s.%s" % (prefix, self.get_dns_domain()),
1841 dnsp.DNS_TYPE_TXT, '"NULL"'))
1843 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1844 p = self.make_txt_update(prefix, txt)
1845 (response, response_packet) =\
1846 self.dns_transaction_udp(p, host=server_ip)
1847 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1848 self.check_query_txt(prefix, ['NULL', 'NULL'])
1849 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1850 self.get_dns_domain(),
1851 "%s.%s" % (prefix, self.get_dns_domain()),
1852 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1854 def test_update_add_null_char_rpc_to_dns(self):
1855 prefix = 'rpcnulltextrec'
1856 name = "%s.%s" % (prefix, self.get_dns_domain())
1858 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1859 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1860 add_rec_buf.rec = rec
1862 self.rpc_conn.DnssrvUpdateRecord2(
1863 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1866 self.get_dns_domain(),
1871 except WERRORError as e:
1875 self.check_query_txt(prefix, ['NULL'])
1877 self.rpc_conn.DnssrvUpdateRecord2(
1878 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1881 self.get_dns_domain(),
1886 def test_update_add_hex_char_txt_record(self):
1887 "test adding records works"
1888 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1889 p = self.make_txt_update(prefix, txt)
1890 (response, response_packet) =\
1891 self.dns_transaction_udp(p, host=server_ip)
1892 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1893 self.check_query_txt(prefix, txt)
1894 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1895 self.get_dns_domain(),
1896 "%s.%s" % (prefix, self.get_dns_domain()),
1897 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1899 def test_update_add_hex_rpc_to_dns(self):
1900 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1901 prefix = 'rpc' + prefix
1902 name = "%s.%s" % (prefix, self.get_dns_domain())
1904 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1905 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1906 add_rec_buf.rec = rec
1908 self.rpc_conn.DnssrvUpdateRecord2(
1909 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1912 self.get_dns_domain(),
1917 except WERRORError as e:
1921 self.check_query_txt(prefix, txt)
1923 self.rpc_conn.DnssrvUpdateRecord2(
1924 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1927 self.get_dns_domain(),
1932 def test_update_add_slash_txt_record(self):
1933 "test adding records works"
1934 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1935 p = self.make_txt_update(prefix, txt)
1936 (response, response_packet) =\
1937 self.dns_transaction_udp(p, host=server_ip)
1938 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1939 self.check_query_txt(prefix, txt)
1940 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1941 self.get_dns_domain(),
1942 "%s.%s" % (prefix, self.get_dns_domain()),
1943 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1945 # This test fails against Windows as it eliminates slashes in RPC
1946 # One typical use for a slash is in records like 'var=value' to
1947 # escape '=' characters.
1948 def test_update_add_slash_rpc_to_dns(self):
1949 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1950 prefix = 'rpc' + prefix
1951 name = "%s.%s" % (prefix, self.get_dns_domain())
1953 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1954 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1955 add_rec_buf.rec = rec
1957 self.rpc_conn.DnssrvUpdateRecord2(
1958 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1961 self.get_dns_domain(),
1966 except WERRORError as e:
1970 self.check_query_txt(prefix, txt)
1973 self.rpc_conn.DnssrvUpdateRecord2(
1974 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1977 self.get_dns_domain(),
1982 def test_update_add_two_txt_records(self):
1983 "test adding two txt records works"
1984 prefix, txt = 'textrec2', ['"This is a test"',
1985 '"and this is a test, too"']
1986 p = self.make_txt_update(prefix, txt)
1987 (response, response_packet) =\
1988 self.dns_transaction_udp(p, host=server_ip)
1989 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1990 self.check_query_txt(prefix, txt)
1991 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1992 self.get_dns_domain(),
1993 "%s.%s" % (prefix, self.get_dns_domain()),
1994 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1995 ' "\\"and this is a test, too\\""'))
1997 def test_update_add_two_rpc_to_dns(self):
1998 prefix, txt = 'textrec2', ['"This is a test"',
1999 '"and this is a test, too"']
2000 prefix = 'rpc' + prefix
2001 name = "%s.%s" % (prefix, self.get_dns_domain())
2003 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
2004 '"\\"This is a test\\""' +
2005 ' "\\"and this is a test, too\\""')
2006 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2007 add_rec_buf.rec = rec
2009 self.rpc_conn.DnssrvUpdateRecord2(
2010 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2013 self.get_dns_domain(),
2018 except WERRORError as e:
2022 self.check_query_txt(prefix, txt)
2024 self.rpc_conn.DnssrvUpdateRecord2(
2025 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2028 self.get_dns_domain(),
2033 def test_update_add_empty_txt_records(self):
2034 "test adding two txt records works"
2035 prefix, txt = 'emptytextrec', []
2036 p = self.make_txt_update(prefix, txt)
2037 (response, response_packet) =\
2038 self.dns_transaction_udp(p, host=server_ip)
2039 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2040 self.check_query_txt(prefix, txt)
2041 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2042 self.get_dns_domain(),
2043 "%s.%s" % (prefix, self.get_dns_domain()),
2044 dnsp.DNS_TYPE_TXT, ''))
2046 def test_update_add_empty_rpc_to_dns(self):
2047 prefix, txt = 'rpcemptytextrec', []
2049 name = "%s.%s" % (prefix, self.get_dns_domain())
2051 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
2052 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2053 add_rec_buf.rec = rec
2055 self.rpc_conn.DnssrvUpdateRecord2(
2056 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2059 self.get_dns_domain(),
2063 except WERRORError as e:
2067 self.check_query_txt(prefix, txt)
2069 self.rpc_conn.DnssrvUpdateRecord2(
2070 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2073 self.get_dns_domain(),
2079 TestProgram(module=__name__, opts=subunitopts)