1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
25 from ldb import ERR_OPERATIONS_ERROR
30 import samba.ndr as ndr
31 from samba import credentials
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
41 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
42 sambaopts = options.SambaOptions(parser)
43 parser.add_option_group(sambaopts)
45 # This timeout only has relevance when testing against Windows
46 # Format errors tend to return patchy responses, so a timeout is needed.
47 parser.add_option("--timeout", type="int", dest="timeout",
48 help="Specify timeout for DNS requests")
50 # use command line creds if available
51 credopts = options.CredentialsOptions(parser)
52 parser.add_option_group(credopts)
53 subunitopts = SubunitOptions(parser)
54 parser.add_option_group(subunitopts)
56 opts, args = parser.parse_args()
58 lp = sambaopts.get_loadparm()
59 creds = credopts.get_credentials(lp)
61 timeout = opts.timeout
69 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
72 class TestSimpleQueries(DNSTest):
74 super(TestSimpleQueries, self).setUp()
75 global server, server_ip, lp, creds, timeout
76 self.server = server_name
77 self.server_ip = server_ip
80 self.timeout = timeout
82 def test_one_a_query(self):
83 "create a query packet containing one query record"
84 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
87 name = "%s.%s" % (self.server, self.get_dns_domain())
88 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
89 print("asking for ", q.name)
92 self.finish_name_packet(p, questions)
93 (response, response_packet) =\
94 self.dns_transaction_udp(p, host=server_ip)
95 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97 self.assertEqual(response.ancount, 1)
98 self.assertEqual(response.answers[0].rdata,
101 def test_one_SOA_query(self):
102 "create a query packet containing one query record for the SOA"
103 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
106 name = "%s" % (self.get_dns_domain())
107 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
108 print("asking for ", q.name)
111 self.finish_name_packet(p, questions)
112 (response, response_packet) =\
113 self.dns_transaction_udp(p, host=server_ip)
114 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
115 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
116 self.assertEqual(response.ancount, 1)
118 response.answers[0].rdata.mname.upper(),
119 ("%s.%s" % (self.server, self.get_dns_domain())).upper())
121 def test_one_a_query_tcp(self):
122 "create a query packet containing one query record via TCP"
123 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
126 name = "%s.%s" % (self.server, self.get_dns_domain())
127 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
128 print("asking for ", q.name)
131 self.finish_name_packet(p, questions)
132 (response, response_packet) =\
133 self.dns_transaction_tcp(p, host=server_ip)
134 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
135 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
136 self.assertEqual(response.ancount, 1)
137 self.assertEqual(response.answers[0].rdata,
140 def test_one_mx_query(self):
141 "create a query packet causing an empty RCODE_OK answer"
142 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
145 name = "%s.%s" % (self.server, self.get_dns_domain())
146 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
147 print("asking for ", q.name)
150 self.finish_name_packet(p, questions)
151 (response, response_packet) =\
152 self.dns_transaction_udp(p, host=server_ip)
153 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
154 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
155 self.assertEqual(response.ancount, 0)
157 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
160 name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
161 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
162 print("asking for ", q.name)
165 self.finish_name_packet(p, questions)
166 (response, response_packet) =\
167 self.dns_transaction_udp(p, host=server_ip)
168 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
169 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
170 self.assertEqual(response.ancount, 0)
172 def test_two_queries(self):
173 "create a query packet containing two query records"
174 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
177 name = "%s.%s" % (self.server, self.get_dns_domain())
178 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
181 name = "%s.%s" % ('bogusname', self.get_dns_domain())
182 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
185 self.finish_name_packet(p, questions)
187 (response, response_packet) =\
188 self.dns_transaction_udp(p, host=server_ip)
189 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
190 except socket.timeout:
191 # Windows chooses not to respond to incorrectly formatted queries.
192 # Although this appears to be non-deterministic even for the same
193 # request twice, it also appears to be based on a how poorly the
194 # request is formatted.
197 def test_qtype_all_query(self):
198 "create a QTYPE_ALL query"
199 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
202 name = "%s.%s" % (self.server, self.get_dns_domain())
203 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
204 print("asking for ", q.name)
207 self.finish_name_packet(p, questions)
208 (response, response_packet) =\
209 self.dns_transaction_udp(p, host=server_ip)
212 dc_ipv6 = os.getenv('SERVER_IPV6')
213 if dc_ipv6 is not None:
216 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
217 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
218 self.assertEqual(response.ancount, num_answers)
219 self.assertEqual(response.answers[0].rdata,
221 if dc_ipv6 is not None:
222 self.assertEqual(response.answers[1].rdata, dc_ipv6)
224 def test_qclass_none_query(self):
225 "create a QCLASS_NONE query"
226 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
229 name = "%s.%s" % (self.server, self.get_dns_domain())
230 q = self.make_name_question(
236 self.finish_name_packet(p, questions)
238 (response, response_packet) =\
239 self.dns_transaction_udp(p, host=server_ip)
240 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
241 except socket.timeout:
242 # Windows chooses not to respond to incorrectly formatted queries.
243 # Although this appears to be non-deterministic even for the same
244 # request twice, it also appears to be based on a how poorly the
245 # request is formatted.
248 def test_soa_hostname_query(self):
249 "create a SOA query for a hostname"
250 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
253 name = "%s.%s" % (self.server, self.get_dns_domain())
254 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
257 self.finish_name_packet(p, questions)
258 (response, response_packet) =\
259 self.dns_transaction_udp(p, host=server_ip)
260 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262 # We don't get SOA records for single hosts
263 self.assertEqual(response.ancount, 0)
264 # But we do respond with an authority section
265 self.assertEqual(response.nscount, 1)
267 def test_soa_unknown_hostname_query(self):
268 "create a SOA query for an unknown hostname"
269 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
272 name = "foobar.%s" % (self.get_dns_domain())
273 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
276 self.finish_name_packet(p, questions)
277 (response, response_packet) =\
278 self.dns_transaction_udp(p, host=server_ip)
279 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
280 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
281 # We don't get SOA records for single hosts
282 self.assertEqual(response.ancount, 0)
283 # But we do respond with an authority section
284 self.assertEqual(response.nscount, 1)
286 def test_soa_domain_query(self):
287 "create a SOA query for a domain"
288 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
291 name = self.get_dns_domain()
292 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
295 self.finish_name_packet(p, questions)
296 (response, response_packet) =\
297 self.dns_transaction_udp(p, host=server_ip)
298 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
299 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
300 self.assertEqual(response.ancount, 1)
301 self.assertEqual(response.answers[0].rdata.minimum, 3600)
304 class TestDNSUpdates(DNSTest):
306 super(TestDNSUpdates, self).setUp()
307 global server, server_ip, lp, creds, timeout
308 self.server = server_name
309 self.server_ip = server_ip
312 self.timeout = timeout
314 def test_two_updates(self):
315 "create two update requests"
316 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
319 name = "%s.%s" % (self.server, self.get_dns_domain())
320 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
323 name = self.get_dns_domain()
324 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
327 self.finish_name_packet(p, updates)
329 (response, response_packet) =\
330 self.dns_transaction_udp(p, host=server_ip)
331 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
332 except socket.timeout:
333 # Windows chooses not to respond to incorrectly formatted queries.
334 # Although this appears to be non-deterministic even for the same
335 # request twice, it also appears to be based on a how poorly the
336 # request is formatted.
339 def test_update_wrong_qclass(self):
340 "create update with DNS_QCLASS_NONE"
341 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
344 name = self.get_dns_domain()
345 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
348 self.finish_name_packet(p, updates)
349 (response, response_packet) =\
350 self.dns_transaction_udp(p, host=server_ip)
351 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
353 def test_update_prereq_with_non_null_ttl(self):
354 "test update with a non-null TTL"
355 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
358 name = self.get_dns_domain()
360 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
362 self.finish_name_packet(p, updates)
366 r.name = "%s.%s" % (self.server, self.get_dns_domain())
367 r.rr_type = dns.DNS_QTYPE_TXT
368 r.rr_class = dns.DNS_QCLASS_NONE
373 p.ancount = len(prereqs)
377 (response, response_packet) =\
378 self.dns_transaction_udp(p, host=server_ip)
379 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
380 except socket.timeout:
381 # Windows chooses not to respond to incorrectly formatted queries.
382 # Although this appears to be non-deterministic even for the same
383 # request twice, it also appears to be based on a how poorly the
384 # request is formatted.
387 def test_update_prereq_with_non_null_length(self):
388 "test update with a non-null length"
389 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
392 name = self.get_dns_domain()
394 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
396 self.finish_name_packet(p, updates)
400 r.name = "%s.%s" % (self.server, self.get_dns_domain())
401 r.rr_type = dns.DNS_QTYPE_TXT
402 r.rr_class = dns.DNS_QCLASS_ANY
407 p.ancount = len(prereqs)
410 (response, response_packet) =\
411 self.dns_transaction_udp(p, host=server_ip)
412 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
414 def test_update_prereq_nonexisting_name(self):
415 "test update with a nonexisting name"
416 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
419 name = self.get_dns_domain()
421 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
423 self.finish_name_packet(p, updates)
427 r.name = "idontexist.%s" % self.get_dns_domain()
428 r.rr_type = dns.DNS_QTYPE_TXT
429 r.rr_class = dns.DNS_QCLASS_ANY
434 p.ancount = len(prereqs)
437 (response, response_packet) =\
438 self.dns_transaction_udp(p, host=server_ip)
439 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
441 def test_update_add_txt_record(self):
442 "test adding records works"
443 prefix, txt = 'textrec', ['"This is a test"']
444 p = self.make_txt_update(prefix, txt)
445 (response, response_packet) =\
446 self.dns_transaction_udp(p, host=server_ip)
447 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
448 self.check_query_txt(prefix, txt)
450 def test_delete_record(self):
451 "Test if deleting records works"
453 NAME = "deleterec.%s" % self.get_dns_domain()
455 # First, create a record to make sure we have a record to delete.
456 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
459 name = self.get_dns_domain()
461 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
463 self.finish_name_packet(p, updates)
468 r.rr_type = dns.DNS_QTYPE_TXT
469 r.rr_class = dns.DNS_QCLASS_IN
472 rdata = self.make_txt_record(['"This is a test"'])
475 p.nscount = len(updates)
478 (response, response_packet) =\
479 self.dns_transaction_udp(p, host=server_ip)
480 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
482 # Now check the record is around
483 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
485 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
488 self.finish_name_packet(p, questions)
489 (response, response_packet) =\
490 self.dns_transaction_udp(p, host=server_ip)
491 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
493 # Now delete the record
494 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
497 name = self.get_dns_domain()
499 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
501 self.finish_name_packet(p, updates)
506 r.rr_type = dns.DNS_QTYPE_TXT
507 r.rr_class = dns.DNS_QCLASS_NONE
510 rdata = self.make_txt_record(['"This is a test"'])
513 p.nscount = len(updates)
516 (response, response_packet) =\
517 self.dns_transaction_udp(p, host=server_ip)
518 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
520 # And finally check it's gone
521 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
524 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
527 self.finish_name_packet(p, questions)
528 (response, response_packet) =\
529 self.dns_transaction_udp(p, host=server_ip)
530 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
532 def test_readd_record(self):
533 "Test if adding, deleting and then readding a records works"
535 NAME = "readdrec.%s" % self.get_dns_domain()
538 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
541 name = self.get_dns_domain()
543 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
545 self.finish_name_packet(p, updates)
550 r.rr_type = dns.DNS_QTYPE_TXT
551 r.rr_class = dns.DNS_QCLASS_IN
554 rdata = self.make_txt_record(['"This is a test"'])
557 p.nscount = len(updates)
560 (response, response_packet) =\
561 self.dns_transaction_udp(p, host=server_ip)
562 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
564 # Now check the record is around
565 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
567 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
570 self.finish_name_packet(p, questions)
571 (response, response_packet) =\
572 self.dns_transaction_udp(p, host=server_ip)
573 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
575 # Now delete the record
576 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
579 name = self.get_dns_domain()
581 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
583 self.finish_name_packet(p, updates)
588 r.rr_type = dns.DNS_QTYPE_TXT
589 r.rr_class = dns.DNS_QCLASS_NONE
592 rdata = self.make_txt_record(['"This is a test"'])
595 p.nscount = len(updates)
598 (response, response_packet) =\
599 self.dns_transaction_udp(p, host=server_ip)
600 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
603 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
606 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
609 self.finish_name_packet(p, questions)
610 (response, response_packet) =\
611 self.dns_transaction_udp(p, host=server_ip)
612 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
614 # recreate the record
615 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
618 name = self.get_dns_domain()
620 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
622 self.finish_name_packet(p, updates)
627 r.rr_type = dns.DNS_QTYPE_TXT
628 r.rr_class = dns.DNS_QCLASS_IN
631 rdata = self.make_txt_record(['"This is a test"'])
634 p.nscount = len(updates)
637 (response, response_packet) =\
638 self.dns_transaction_udp(p, host=server_ip)
639 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
641 # Now check the record is around
642 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
644 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
647 self.finish_name_packet(p, questions)
648 (response, response_packet) =\
649 self.dns_transaction_udp(p, host=server_ip)
650 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
652 def test_update_add_mx_record(self):
653 "test adding MX records works"
654 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
657 name = self.get_dns_domain()
659 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
661 self.finish_name_packet(p, updates)
665 r.name = "%s" % self.get_dns_domain()
666 r.rr_type = dns.DNS_QTYPE_MX
667 r.rr_class = dns.DNS_QCLASS_IN
670 rdata = dns.mx_record()
671 rdata.preference = 10
672 rdata.exchange = 'mail.%s' % self.get_dns_domain()
675 p.nscount = len(updates)
678 (response, response_packet) =\
679 self.dns_transaction_udp(p, host=server_ip)
680 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
682 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
685 name = "%s" % self.get_dns_domain()
686 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
689 self.finish_name_packet(p, questions)
690 (response, response_packet) =\
691 self.dns_transaction_udp(p, host=server_ip)
692 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
693 self.assertEqual(response.ancount, 1)
694 ans = response.answers[0]
695 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
696 self.assertEqual(ans.rdata.preference, 10)
697 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
700 class TestComplexQueries(DNSTest):
701 def make_dns_update(self, key, value, qtype):
702 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
704 name = self.get_dns_domain()
705 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
706 self.finish_name_packet(p, [u])
711 r.rr_class = dns.DNS_QCLASS_IN
717 (response, response_packet) =\
718 self.dns_transaction_udp(p, host=server_ip)
719 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
722 super(TestComplexQueries, self).setUp()
724 global server, server_ip, lp, creds, timeout
725 self.server = server_name
726 self.server_ip = server_ip
729 self.timeout = timeout
731 def test_one_a_query(self):
732 "create a query packet containing one query record"
737 name = "cname_test.%s" % self.get_dns_domain()
738 rdata = "%s.%s" % (self.server, self.get_dns_domain())
739 self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
741 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
745 name = "cname_test.%s" % self.get_dns_domain()
746 q = self.make_name_question(name,
749 print("asking for ", q.name)
752 self.finish_name_packet(p, questions)
753 (response, response_packet) =\
754 self.dns_transaction_udp(p, host=self.server_ip)
755 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
756 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
757 self.assertEqual(response.ancount, 2)
758 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
759 self.assertEqual(response.answers[0].rdata, "%s.%s" %
760 (self.server, self.get_dns_domain()))
761 self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_A)
762 self.assertEqual(response.answers[1].rdata,
767 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
770 name = self.get_dns_domain()
772 u = self.make_name_question(name,
776 self.finish_name_packet(p, updates)
780 r.name = "cname_test.%s" % self.get_dns_domain()
781 r.rr_type = dns.DNS_QTYPE_CNAME
782 r.rr_class = dns.DNS_QCLASS_NONE
785 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
787 p.nscount = len(updates)
790 (response, response_packet) =\
791 self.dns_transaction_udp(p, host=self.server_ip)
792 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
794 def test_cname_two_chain(self):
795 name0 = "cnamechain0.%s" % self.get_dns_domain()
796 name1 = "cnamechain1.%s" % self.get_dns_domain()
797 name2 = "cnamechain2.%s" % self.get_dns_domain()
798 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
799 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
800 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
802 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
804 q = self.make_name_question(name1, dns.DNS_QTYPE_A,
808 self.finish_name_packet(p, questions)
809 (response, response_packet) =\
810 self.dns_transaction_udp(p, host=server_ip)
811 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
812 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
813 self.assertEqual(response.ancount, 3)
815 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
816 self.assertEqual(response.answers[0].name, name1)
817 self.assertEqual(response.answers[0].rdata, name2)
819 self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
820 self.assertEqual(response.answers[1].name, name2)
821 self.assertEqual(response.answers[1].rdata, name0)
823 self.assertEqual(response.answers[2].rr_type, dns.DNS_QTYPE_A)
824 self.assertEqual(response.answers[2].rdata,
827 def test_invalid_empty_cname(self):
828 name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
830 self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
831 except AssertionError:
834 self.fail("Successfully added empty CNAME, which is invalid.")
836 def test_cname_two_chain_not_matching_qtype(self):
837 name0 = "cnamechain0.%s" % self.get_dns_domain()
838 name1 = "cnamechain1.%s" % self.get_dns_domain()
839 name2 = "cnamechain2.%s" % self.get_dns_domain()
840 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
841 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
842 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
844 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
846 q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
850 self.finish_name_packet(p, questions)
851 (response, response_packet) =\
852 self.dns_transaction_udp(p, host=server_ip)
853 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
854 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
856 # CNAME should return all intermediate results!
857 # Only the A records exists, not the TXT.
858 self.assertEqual(response.ancount, 2)
860 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
861 self.assertEqual(response.answers[0].name, name1)
862 self.assertEqual(response.answers[0].rdata, name2)
864 self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
865 self.assertEqual(response.answers[1].name, name2)
866 self.assertEqual(response.answers[1].rdata, name0)
868 def test_cname_loop(self):
869 cname1 = "cnamelooptestrec." + self.get_dns_domain()
870 cname2 = "cnamelooptestrec2." + self.get_dns_domain()
871 cname3 = "cnamelooptestrec3." + self.get_dns_domain()
872 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
873 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
874 self.make_dns_update(cname3, cname1, dnsp.DNS_TYPE_CNAME)
876 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
879 q = self.make_name_question(cname1,
883 self.finish_name_packet(p, questions)
885 (response, response_packet) =\
886 self.dns_transaction_udp(p, host=self.server_ip)
888 max_recursion_depth = 20
889 self.assertEqual(len(response.answers), max_recursion_depth)
891 # Make sure cname limit doesn't count other records. This is a generic
892 # test called in tests below
893 def max_rec_test(self, rtype, rec_gen):
894 name = "limittestrec{0}.{1}".format(rtype, self.get_dns_domain())
896 num_recs_to_enter = limit + 5
898 for i in range(1, num_recs_to_enter+1):
900 self.make_dns_update(name, ip, rtype)
902 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
905 q = self.make_name_question(name,
909 self.finish_name_packet(p, questions)
911 (response, response_packet) =\
912 self.dns_transaction_udp(p, host=self.server_ip)
914 self.assertEqual(len(response.answers), num_recs_to_enter)
916 def test_record_limit_A(self):
918 return "127.0.0." + str(i)
919 self.max_rec_test(rtype=dns.DNS_QTYPE_A, rec_gen=ip4_gen)
921 def test_record_limit_AAAA(self):
923 return "AAAA:0:0:0:0:0:0:" + str(i)
924 self.max_rec_test(rtype=dns.DNS_QTYPE_AAAA, rec_gen=ip6_gen)
926 def test_record_limit_SRV(self):
928 rec = dns.srv_record()
932 rec.target = "srvtestrec" + str(i)
934 self.max_rec_test(rtype=dns.DNS_QTYPE_SRV, rec_gen=srv_gen)
936 # Same as test_record_limit_A but with a preceding CNAME follow
937 def test_cname_limit(self):
938 cname1 = "cnamelimittestrec." + self.get_dns_domain()
939 cname2 = "cnamelimittestrec2." + self.get_dns_domain()
940 cname3 = "cnamelimittestrec3." + self.get_dns_domain()
941 ip_prefix = '127.0.0.'
943 num_recs_to_enter = limit + 5
945 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
946 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
947 num_arecs_to_enter = num_recs_to_enter - 2
948 for i in range(1, num_arecs_to_enter+1):
949 ip = ip_prefix + str(i)
950 self.make_dns_update(cname3, ip, dns.DNS_QTYPE_A)
952 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
955 q = self.make_name_question(cname1,
959 self.finish_name_packet(p, questions)
961 (response, response_packet) =\
962 self.dns_transaction_udp(p, host=self.server_ip)
964 self.assertEqual(len(response.answers), num_recs_to_enter)
966 # ANY query on cname record shouldn't follow the link
967 def test_cname_any_query(self):
968 cname1 = "cnameanytestrec." + self.get_dns_domain()
969 cname2 = "cnameanytestrec2." + self.get_dns_domain()
970 cname3 = "cnameanytestrec3." + self.get_dns_domain()
972 self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
973 self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
975 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
978 q = self.make_name_question(cname1,
982 self.finish_name_packet(p, questions)
984 (response, response_packet) =\
985 self.dns_transaction_udp(p, host=self.server_ip)
987 self.assertEqual(len(response.answers), 1)
988 self.assertEqual(response.answers[0].name, cname1)
989 self.assertEqual(response.answers[0].rdata, cname2)
992 class TestInvalidQueries(DNSTest):
994 super(TestInvalidQueries, self).setUp()
995 global server, server_ip, lp, creds, timeout
996 self.server = server_name
997 self.server_ip = server_ip
1000 self.timeout = timeout
1002 def test_one_a_query(self):
1003 """send 0 bytes follows by create a query packet
1004 containing one query record"""
1008 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
1009 s.connect((self.server_ip, 53))
1015 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1018 name = "%s.%s" % (self.server, self.get_dns_domain())
1019 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1020 print("asking for ", q.name)
1023 self.finish_name_packet(p, questions)
1024 (response, response_packet) =\
1025 self.dns_transaction_udp(p, host=self.server_ip)
1026 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1027 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1028 self.assertEqual(response.ancount, 1)
1029 self.assertEqual(response.answers[0].rdata,
1032 def test_one_a_reply(self):
1033 "send a reply instead of a query"
1036 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1039 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
1040 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1041 print("asking for ", q.name)
1044 self.finish_name_packet(p, questions)
1045 p.operation |= dns.DNS_FLAG_REPLY
1048 send_packet = ndr.ndr_pack(p)
1049 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
1050 s.settimeout(timeout)
1051 host = self.server_ip
1052 s.connect((host, 53))
1053 tcp_packet = struct.pack('!H', len(send_packet))
1054 tcp_packet += send_packet
1055 s.send(tcp_packet, 0)
1056 recv_packet = s.recv(0xffff + 2, 0)
1057 self.assertEqual(0, len(recv_packet))
1058 except socket.timeout:
1059 # Windows chooses not to respond to incorrectly formatted queries.
1060 # Although this appears to be non-deterministic even for the same
1061 # request twice, it also appears to be based on a how poorly the
1062 # request is formatted.
1069 class TestZones(DNSTest):
1071 super(TestZones, self).setUp()
1072 global server, server_ip, lp, creds, timeout
1073 self.server = server_name
1074 self.server_ip = server_ip
1077 self.timeout = timeout
1079 self.zone = "test.lan"
1080 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1082 self.lp, self.creds)
1084 self.samdb = SamDB(url="ldap://" + self.server_ip,
1085 lp=self.get_loadparm(),
1086 session_info=system_session(),
1087 credentials=self.creds)
1088 self.zone_dn = "DC=" + self.zone +\
1089 ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
1090 str(self.samdb.get_default_basedn())
1093 super(TestZones, self).tearDown()
1096 self.delete_zone(self.zone)
1097 except RuntimeError as e:
1098 (num, string) = e.args
1099 if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
1102 def make_zone_obj(self, zone, aging_enabled=False):
1103 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1104 zone_create.pszZoneName = zone
1105 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
1106 zone_create.fAging = int(aging_enabled)
1107 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
1108 zone_create.fDsIntegrated = 1
1109 zone_create.fLoadExisting = 1
1110 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
1113 def create_zone(self, zone, aging_enabled=False):
1114 zone_create = self.make_zone_obj(zone, aging_enabled)
1116 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1117 self.rpc_conn.DnssrvOperation2(client_version,
1123 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
1125 except WERRORError as e:
1128 def set_params(self, **kwargs):
1129 zone = kwargs.pop('zone', None)
1130 for key, val in kwargs.items():
1131 name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
1132 name_param.dwParam = val
1133 name_param.pszNodeName = key
1135 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1136 nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
1138 self.rpc_conn.DnssrvOperation2(client_version,
1143 'ResetDwordProperty',
1146 except WERRORError as e:
1149 def ldap_modify_dnsrecs(self, name, func):
1150 dn = 'DC={0},{1}'.format(name, self.zone_dn)
1151 dns_recs = self.ldap_get_dns_records(name)
1152 for rec in dns_recs:
1154 update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1155 self.samdb.modify(ldb.Message.from_dict(self.samdb,
1157 ldb.FLAG_MOD_REPLACE))
1159 def dns_update_record(self, prefix, txt):
1160 p = self.make_txt_update(prefix, txt, self.zone)
1161 (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1162 self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1163 recs = self.ldap_get_dns_records(prefix)
1164 recs = [r for r in recs if r.data.str == txt]
1165 self.assertEqual(len(recs), 1)
1168 def dns_tombstone(self, prefix, txt, zone):
1169 name = prefix + "." + zone
1171 to = dnsp.DnssrvRpcRecord()
1172 to.dwTimeStamp = 1000
1173 to.wType = dnsp.DNS_TYPE_TOMBSTONE
1175 self.samdb.dns_replace(name, [to])
1177 def ldap_get_records(self, name):
1178 # The use of SCOPE_SUBTREE here avoids raising an exception in the
1179 # 0 results case for a test below.
1181 expr = "(&(objectClass=dnsNode)(name={0}))".format(name)
1182 return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1183 expression=expr, attrs=["*"])
1185 def ldap_get_dns_records(self, name):
1186 records = self.ldap_get_records(name)
1187 return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1188 for r in records[0].get('dnsRecord')]
1190 def ldap_get_zone_settings(self):
1191 records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1192 expression="(&(objectClass=dnsZone)" +
1193 "(name={0}))".format(self.zone),
1194 attrs=["dNSProperty"])
1195 self.assertEqual(len(records), 1)
1196 props = [ndr_unpack(dnsp.DnsProperty, r)
1197 for r in records[0].get('dNSProperty')]
1199 # We have no choice but to repeat these here.
1200 zone_prop_ids = {0x00: "EMPTY",
1202 0x02: "ALLOW_UPDATE",
1203 0x08: "SECURE_TIME",
1204 0x10: "NOREFRESH_INTERVAL",
1205 0x11: "SCAVENGING_SERVERS",
1206 0x12: "AGING_ENABLED_TIME",
1207 0x20: "REFRESH_INTERVAL",
1208 0x40: "AGING_STATE",
1209 0x80: "DELETED_FROM_HOSTNAME",
1210 0x81: "MASTER_SERVERS",
1211 0x82: "AUTO_NS_SERVERS",
1212 0x83: "DCPROMO_CONVERT",
1213 0x90: "SCAVENGING_SERVERS_DA",
1214 0x91: "MASTER_SERVERS_DA",
1215 0x92: "NS_SERVERS_DA",
1216 0x100: "NODE_DBFLAGS"}
1217 return {zone_prop_ids[p.id].lower(): p.data for p in props}
1219 def set_aging(self, enable=False):
1220 self.create_zone(self.zone, aging_enabled=enable)
1221 self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1222 Aging=int(bool(enable)), zone=self.zone,
1223 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1225 def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1226 self.set_aging(enable=True)
1227 settings = self.ldap_get_zone_settings()
1228 self.assertTrue(settings['aging_state'] is not None)
1229 self.assertTrue(settings['aging_state'])
1231 rec = self.dns_update_record('agingtest', ['test txt'])
1232 self.assertNotEqual(rec.dwTimeStamp, 0)
1234 def test_set_aging_disabled(self):
1235 self.set_aging(enable=False)
1236 settings = self.ldap_get_zone_settings()
1237 self.assertTrue(settings['aging_state'] is not None)
1238 self.assertFalse(settings['aging_state'])
1240 rec = self.dns_update_record('agingtest', ['test txt'])
1241 self.assertNotEqual(rec.dwTimeStamp, 0)
1243 def test_aging_update(self, enable=True):
1244 name, txt = 'agingtest', ['test txt']
1245 self.set_aging(enable=True)
1246 before_mod = self.dns_update_record(name, txt)
1248 self.set_params(zone=self.zone, Aging=0)
1252 self.assertTrue(rec.dwTimeStamp > 0)
1253 rec.dwTimeStamp -= dec
1254 self.ldap_modify_dnsrecs(name, mod_ts)
1255 after_mod = self.ldap_get_dns_records(name)
1256 self.assertEqual(len(after_mod), 1)
1257 after_mod = after_mod[0]
1258 self.assertEqual(after_mod.dwTimeStamp,
1259 before_mod.dwTimeStamp - dec)
1260 after_update = self.dns_update_record(name, txt)
1261 after_should_equal = before_mod if enable else after_mod
1262 self.assertEqual(after_should_equal.dwTimeStamp,
1263 after_update.dwTimeStamp)
1265 def test_aging_update_disabled(self):
1266 self.test_aging_update(enable=False)
1268 def test_aging_refresh(self):
1269 name, txt = 'agingtest', ['test txt']
1270 self.create_zone(self.zone, aging_enabled=True)
1272 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1273 Aging=1, zone=self.zone,
1274 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1275 before_mod = self.dns_update_record(name, txt)
1278 self.assertTrue(rec.dwTimeStamp > 0)
1279 rec.dwTimeStamp -= interval // 2
1280 self.ldap_modify_dnsrecs(name, mod_ts)
1281 update_during_norefresh = self.dns_update_record(name, txt)
1284 self.assertTrue(rec.dwTimeStamp > 0)
1285 rec.dwTimeStamp -= interval + interval // 2
1286 self.ldap_modify_dnsrecs(name, mod_ts)
1287 update_during_refresh = self.dns_update_record(name, txt)
1288 self.assertEqual(update_during_norefresh.dwTimeStamp,
1289 before_mod.dwTimeStamp - interval / 2)
1290 self.assertEqual(update_during_refresh.dwTimeStamp,
1291 before_mod.dwTimeStamp)
1293 def test_rpc_add_no_timestamp(self):
1294 name, txt = 'agingtest', ['test txt']
1295 self.set_aging(enable=True)
1296 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1297 rec_buf.rec = TXTRecord(txt)
1298 self.rpc_conn.DnssrvUpdateRecord2(
1299 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1306 recs = self.ldap_get_dns_records(name)
1307 self.assertEqual(len(recs), 1)
1308 self.assertEqual(recs[0].dwTimeStamp, 0)
1310 def test_static_record_dynamic_update(self):
1311 name, txt = 'agingtest', ['test txt']
1312 txt2 = ['test txt2']
1313 self.set_aging(enable=True)
1314 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1315 rec_buf.rec = TXTRecord(txt)
1316 self.rpc_conn.DnssrvUpdateRecord2(
1317 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1325 rec2 = self.dns_update_record(name, txt2)
1326 self.assertEqual(rec2.dwTimeStamp, 0)
1328 def test_dynamic_record_static_update(self):
1329 name, txt = 'agingtest', ['test txt']
1330 txt2 = ['test txt2']
1331 txt3 = ['test txt3']
1332 self.set_aging(enable=True)
1334 self.dns_update_record(name, txt)
1336 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1337 rec_buf.rec = TXTRecord(txt2)
1338 self.rpc_conn.DnssrvUpdateRecord2(
1339 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1347 self.dns_update_record(name, txt3)
1349 recs = self.ldap_get_dns_records(name)
1350 # Put in dict because ldap recs might be out of order
1351 recs = {str(r.data.str): r for r in recs}
1352 self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1353 self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1354 self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1356 def test_dns_tombstone_custom_match_rule(self):
1357 lp = self.get_loadparm()
1358 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1359 session_info=system_session(),
1360 credentials=self.creds)
1362 name, txt = 'agingtest', ['test txt']
1363 name2, txt2 = 'agingtest2', ['test txt2']
1364 name3, txt3 = 'agingtest3', ['test txt3']
1365 name4, txt4 = 'agingtest4', ['test txt4']
1366 name5, txt5 = 'agingtest5', ['test txt5']
1368 self.create_zone(self.zone, aging_enabled=True)
1370 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1371 Aging=1, zone=self.zone,
1372 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1374 self.dns_update_record(name, txt)
1376 self.dns_update_record(name2, txt)
1377 self.dns_update_record(name2, txt2)
1379 self.dns_update_record(name3, txt)
1380 self.dns_update_record(name3, txt2)
1381 last_update = self.dns_update_record(name3, txt3)
1383 # Modify txt1 of the first 2 names
1385 if rec.data.str == txt:
1386 rec.dwTimeStamp -= 2
1387 self.ldap_modify_dnsrecs(name, mod_ts)
1388 self.ldap_modify_dnsrecs(name2, mod_ts)
1390 # create a static dns record.
1391 rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1392 rec_buf.rec = TXTRecord(txt4)
1393 self.rpc_conn.DnssrvUpdateRecord2(
1394 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1402 # Create a tomb stoned record.
1403 self.dns_update_record(name5, txt5)
1404 self.dns_tombstone(name5, txt5, self.zone)
1406 self.ldap_get_dns_records(name3)
1407 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1408 expr = expr.format(int(last_update.dwTimeStamp) - 1)
1410 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1411 expression=expr, attrs=["*"])
1412 except ldb.LdbError as e:
1414 updated_names = {str(r.get('name')) for r in res}
1415 self.assertEqual(updated_names, set([name, name2]))
1417 def test_dns_tombstone_custom_match_rule_no_records(self):
1418 lp = self.get_loadparm()
1419 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1420 session_info=system_session(),
1421 credentials=self.creds)
1423 self.create_zone(self.zone, aging_enabled=True)
1425 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1426 Aging=1, zone=self.zone,
1427 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1429 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1430 expr = expr.format(1)
1433 res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1434 expression=expr, attrs=["*"])
1435 except ldb.LdbError as e:
1437 self.assertEqual(0, len(res))
1439 def test_dns_tombstone_custom_match_rule_fail(self):
1440 self.create_zone(self.zone, aging_enabled=True)
1441 samdb = SamDB(url=lp.samdb_url(),
1443 session_info=system_session(),
1444 credentials=self.creds)
1446 # Property name in not dnsRecord
1447 expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1448 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1449 expression=expr, attrs=["*"])
1450 self.assertEqual(len(res), 0)
1452 # No value for tombstone time
1454 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1455 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1456 expression=expr, attrs=["*"])
1457 self.assertEqual(len(res), 0)
1458 self.fail("Exception: ldb.ldbError not generated")
1459 except ldb.LdbError as e:
1461 self.assertEqual(num, ERR_OPERATIONS_ERROR)
1463 # Tombstone time = -
1465 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1466 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1467 expression=expr, attrs=["*"])
1468 self.assertEqual(len(res), 0)
1469 self.fail("Exception: ldb.ldbError not generated")
1470 except ldb.LdbError as e:
1472 self.assertEqual(num, ERR_OPERATIONS_ERROR)
1474 # Tombstone time longer than 64 characters
1476 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1477 expr = expr.format("1" * 65)
1478 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1479 expression=expr, attrs=["*"])
1480 self.assertEqual(len(res), 0)
1481 self.fail("Exception: ldb.ldbError not generated")
1482 except ldb.LdbError as e:
1484 self.assertEqual(num, ERR_OPERATIONS_ERROR)
1486 # Non numeric Tombstone time
1488 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1489 res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1490 expression=expr, attrs=["*"])
1491 self.assertEqual(len(res), 0)
1492 self.fail("Exception: ldb.ldbError not generated")
1493 except ldb.LdbError as e:
1495 self.assertEqual(num, ERR_OPERATIONS_ERROR)
1497 # Non system session
1499 db = SamDB(url="ldap://" + self.server_ip,
1500 lp=self.get_loadparm(),
1501 credentials=self.creds)
1503 expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1504 res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1505 expression=expr, attrs=["*"])
1506 self.assertEqual(len(res), 0)
1507 self.fail("Exception: ldb.ldbError not generated")
1508 except ldb.LdbError as e:
1510 self.assertEqual(num, ERR_OPERATIONS_ERROR)
1512 def test_basic_scavenging(self):
1513 lp = self.get_loadparm()
1514 self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1515 session_info=system_session(),
1516 credentials=self.creds)
1518 self.create_zone(self.zone, aging_enabled=True)
1520 self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1521 zone=self.zone, Aging=1,
1522 AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1523 name, txt = 'agingtest', ['test txt']
1524 name2, txt2 = 'agingtest2', ['test txt2']
1525 name3, txt3 = 'agingtest3', ['test txt3']
1526 name4, txt4 = 'agingtest4', ['test txt4']
1527 name5, txt5 = 'agingtest5', ['test txt5']
1528 self.dns_update_record(name, txt)
1529 self.dns_update_record(name2, txt)
1530 self.dns_update_record(name2, txt2)
1531 self.dns_update_record(name3, txt)
1532 self.dns_update_record(name3, txt2)
1534 # Create a tomb stoned record.
1535 self.dns_update_record(name4, txt4)
1536 self.dns_tombstone(name4, txt4, self.zone)
1537 records = self.ldap_get_records(name4)
1538 self.assertTrue("dNSTombstoned" in records[0])
1539 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1541 # Create an un-tombstoned record, with dnsTombstoned: FALSE
1542 self.dns_update_record(name5, txt5)
1543 self.dns_tombstone(name5, txt5, self.zone)
1544 self.dns_update_record(name5, txt5)
1545 records = self.ldap_get_records(name5)
1546 self.assertTrue("dNSTombstoned" in records[0])
1547 self.assertEqual(records[0]["dNSTombstoned"][0], b"FALSE")
1549 last_add = self.dns_update_record(name3, txt3)
1552 self.assertTrue(rec.dwTimeStamp > 0)
1553 if rec.data.str == txt:
1554 rec.dwTimeStamp -= interval * 5
1556 def mod_ts_all(rec):
1557 rec.dwTimeStamp -= interval * 5
1558 self.ldap_modify_dnsrecs(name, mod_ts)
1559 self.ldap_modify_dnsrecs(name2, mod_ts)
1560 self.ldap_modify_dnsrecs(name3, mod_ts)
1561 self.ldap_modify_dnsrecs(name5, mod_ts_all)
1562 self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1563 dsdb._scavenge_dns_records(self.samdb)
1565 recs = self.ldap_get_dns_records(name)
1566 self.assertEqual(len(recs), 1)
1567 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1568 records = self.ldap_get_records(name)
1569 self.assertTrue("dNSTombstoned" in records[0])
1570 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1572 recs = self.ldap_get_dns_records(name2)
1573 self.assertEqual(len(recs), 1)
1574 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1575 self.assertEqual(recs[0].data.str, txt2)
1577 recs = self.ldap_get_dns_records(name3)
1578 self.assertEqual(len(recs), 2)
1579 txts = {str(r.data.str) for r in recs}
1580 self.assertEqual(txts, {str(txt2), str(txt3)})
1581 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1582 self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1584 recs = self.ldap_get_dns_records(name4)
1585 self.assertEqual(len(recs), 1)
1586 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1587 records = self.ldap_get_records(name4)
1588 self.assertTrue("dNSTombstoned" in records[0])
1589 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1591 recs = self.ldap_get_dns_records(name5)
1592 self.assertEqual(len(recs), 1)
1593 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1594 records = self.ldap_get_records(name5)
1595 self.assertTrue("dNSTombstoned" in records[0])
1596 self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1598 for make_it_work in [False, True]:
1599 inc = -1 if make_it_work else 1
1602 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1603 self.ldap_modify_dnsrecs(name, mod_ts)
1604 dsdb._dns_delete_tombstones(self.samdb)
1605 recs = self.ldap_get_records(name)
1607 self.assertEqual(len(recs), 0)
1609 self.assertEqual(len(recs), 1)
1611 def test_fully_qualified_zone(self):
1613 def create_zone_expect_exists(zone):
1615 zone_create = self.make_zone_obj(zone)
1616 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1617 zc_type = dnsserver.DNSSRV_TYPEID_ZONE_CREATE
1618 self.rpc_conn.DnssrvOperation2(client_version,
1626 except WERRORError as e:
1628 if enum != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS:
1631 self.fail("Zone {} should already exist".format(zone))
1633 # Create unqualified, then check creating qualified fails.
1634 self.create_zone(self.zone)
1635 create_zone_expect_exists(self.zone + '.')
1637 # Same again, but the other way around.
1638 self.create_zone(self.zone + '2.')
1639 create_zone_expect_exists(self.zone + '2')
1641 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1642 request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY
1643 tid = dnsserver.DNSSRV_TYPEID_DWORD
1644 typeid, res = self.rpc_conn.DnssrvComplexOperation2(client_version,
1652 self.delete_zone(self.zone)
1653 self.delete_zone(self.zone + '2')
1655 # Two zones should've been created, neither of them fully qualified.
1656 zones_we_just_made = []
1657 zones = [str(z.pszZoneName) for z in res.ZoneArray]
1659 if zone.startswith(self.zone):
1660 zones_we_just_made.append(zone)
1661 self.assertEqual(len(zones_we_just_made), 2)
1662 self.assertEqual(set(zones_we_just_made), {self.zone + '2', self.zone})
1664 def delete_zone(self, zone):
1665 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1671 dnsserver.DNSSRV_TYPEID_NULL,
1674 def test_soa_query(self):
1676 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1679 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1681 self.finish_name_packet(p, questions)
1683 (response, response_packet) =\
1684 self.dns_transaction_udp(p, host=server_ip)
1685 # Windows returns OK while BIND logically seems to return NXDOMAIN
1686 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1687 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1688 self.assertEqual(response.ancount, 0)
1690 self.create_zone(zone)
1691 (response, response_packet) =\
1692 self.dns_transaction_udp(p, host=server_ip)
1693 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1694 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1695 self.assertEqual(response.ancount, 1)
1696 self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1698 self.delete_zone(zone)
1699 (response, response_packet) =\
1700 self.dns_transaction_udp(p, host=server_ip)
1701 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1702 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1703 self.assertEqual(response.ancount, 0)
1706 class TestRPCRoundtrip(DNSTest):
1708 super(TestRPCRoundtrip, self).setUp()
1709 global server, server_ip, lp, creds
1710 self.server = server_name
1711 self.server_ip = server_ip
1714 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1720 super(TestRPCRoundtrip, self).tearDown()
1722 def rpc_update(self, fqn=None, data=None, wType=None, delete=False):
1723 fqn = fqn or ("rpctestrec." + self.get_dns_domain())
1725 rec = data_to_dns_record(wType, data)
1726 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1727 add_rec_buf.rec = rec
1729 add_arg = add_rec_buf
1733 del_arg = add_rec_buf
1735 self.rpc_conn.DnssrvUpdateRecord2(
1736 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1739 self.get_dns_domain(),
1744 def test_rpc_self_referencing_cname(self):
1745 cname = "cnametest2_unqual_rec_loop"
1746 cname_fqn = "%s.%s" % (cname, self.get_dns_domain())
1749 self.rpc_update(fqn=cname, data=cname_fqn,
1750 wType=dnsp.DNS_TYPE_CNAME, delete=True)
1751 except WERRORError as e:
1752 if e.args[0] != werror.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST:
1753 self.fail("RPC DNS gaven wrong error on pre-test cleanup "
1754 "for self referencing CNAME: %s" % e.args[0])
1757 self.rpc_update(fqn=cname, wType=dnsp.DNS_TYPE_CNAME, data=cname_fqn)
1758 except WERRORError as e:
1759 if e.args[0] != werror.WERR_DNS_ERROR_CNAME_LOOP:
1760 self.fail("RPC DNS gaven wrong error on insertion of "
1761 "self referencing CNAME: %s" % e.args[0])
1764 self.fail("RPC DNS allowed insertion of self referencing CNAME")
1766 def test_update_add_txt_rpc_to_dns(self):
1767 prefix, txt = 'rpctextrec', ['"This is a test"']
1769 name = "%s.%s" % (prefix, self.get_dns_domain())
1771 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1772 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1773 add_rec_buf.rec = rec
1775 self.rpc_conn.DnssrvUpdateRecord2(
1776 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1779 self.get_dns_domain(),
1784 except WERRORError as e:
1788 self.check_query_txt(prefix, txt)
1790 self.rpc_conn.DnssrvUpdateRecord2(
1791 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1794 self.get_dns_domain(),
1799 def test_update_add_null_padded_txt_record(self):
1800 "test adding records works"
1801 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1802 p = self.make_txt_update(prefix, txt)
1803 (response, response_packet) =\
1804 self.dns_transaction_udp(p, host=server_ip)
1805 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1806 self.check_query_txt(prefix, txt)
1807 self.assertIsNotNone(
1808 dns_record_match(self.rpc_conn,
1810 self.get_dns_domain(),
1811 "%s.%s" % (prefix, self.get_dns_domain()),
1813 '"\\"This is a test\\"" "" ""'))
1815 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1816 p = self.make_txt_update(prefix, txt)
1817 (response, response_packet) =\
1818 self.dns_transaction_udp(p, host=server_ip)
1819 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1820 self.check_query_txt(prefix, txt)
1821 self.assertIsNotNone(
1825 self.get_dns_domain(),
1826 "%s.%s" % (prefix, self.get_dns_domain()),
1828 '"\\"This is a test\\"" "" "" "more text"'))
1830 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1831 p = self.make_txt_update(prefix, txt)
1832 (response, response_packet) =\
1833 self.dns_transaction_udp(p, host=server_ip)
1834 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1835 self.check_query_txt(prefix, txt)
1836 self.assertIsNotNone(
1840 self.get_dns_domain(),
1841 "%s.%s" % (prefix, self.get_dns_domain()),
1843 '"" "" "\\"This is a test\\""'))
1845 def test_update_add_padding_rpc_to_dns(self):
1846 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1847 prefix = 'rpc' + prefix
1848 name = "%s.%s" % (prefix, self.get_dns_domain())
1850 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1851 '"\\"This is a test\\"" "" ""')
1852 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1853 add_rec_buf.rec = rec
1855 self.rpc_conn.DnssrvUpdateRecord2(
1856 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1859 self.get_dns_domain(),
1864 except WERRORError as e:
1868 self.check_query_txt(prefix, txt)
1870 self.rpc_conn.DnssrvUpdateRecord2(
1871 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1874 self.get_dns_domain(),
1879 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1880 prefix = 'rpc' + prefix
1881 name = "%s.%s" % (prefix, self.get_dns_domain())
1883 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1884 '"\\"This is a test\\"" "" "" "more text"')
1885 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1886 add_rec_buf.rec = rec
1888 self.rpc_conn.DnssrvUpdateRecord2(
1889 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1892 self.get_dns_domain(),
1897 except WERRORError as e:
1901 self.check_query_txt(prefix, txt)
1903 self.rpc_conn.DnssrvUpdateRecord2(
1904 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1907 self.get_dns_domain(),
1912 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1913 prefix = 'rpc' + prefix
1914 name = "%s.%s" % (prefix, self.get_dns_domain())
1916 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1917 '"" "" "\\"This is a test\\""')
1918 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1919 add_rec_buf.rec = rec
1921 self.rpc_conn.DnssrvUpdateRecord2(
1922 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1925 self.get_dns_domain(),
1929 except WERRORError as e:
1933 self.check_query_txt(prefix, txt)
1935 self.rpc_conn.DnssrvUpdateRecord2(
1936 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1939 self.get_dns_domain(),
1944 # Test is incomplete due to strlen against txt records
1945 def test_update_add_null_char_txt_record(self):
1946 "test adding records works"
1947 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1948 p = self.make_txt_update(prefix, txt)
1949 (response, response_packet) =\
1950 self.dns_transaction_udp(p, host=server_ip)
1951 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1952 self.check_query_txt(prefix, ['NULL'])
1953 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1954 self.get_dns_domain(),
1955 "%s.%s" % (prefix, self.get_dns_domain()),
1956 dnsp.DNS_TYPE_TXT, '"NULL"'))
1958 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1959 p = self.make_txt_update(prefix, txt)
1960 (response, response_packet) =\
1961 self.dns_transaction_udp(p, host=server_ip)
1962 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1963 self.check_query_txt(prefix, ['NULL', 'NULL'])
1964 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1965 self.get_dns_domain(),
1966 "%s.%s" % (prefix, self.get_dns_domain()),
1967 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1969 def test_update_add_null_char_rpc_to_dns(self):
1970 prefix = 'rpcnulltextrec'
1971 name = "%s.%s" % (prefix, self.get_dns_domain())
1973 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1974 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1975 add_rec_buf.rec = rec
1977 self.rpc_conn.DnssrvUpdateRecord2(
1978 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1981 self.get_dns_domain(),
1986 except WERRORError as e:
1990 self.check_query_txt(prefix, ['NULL'])
1992 self.rpc_conn.DnssrvUpdateRecord2(
1993 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1996 self.get_dns_domain(),
2001 def test_update_add_hex_char_txt_record(self):
2002 "test adding records works"
2003 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
2004 p = self.make_txt_update(prefix, txt)
2005 (response, response_packet) =\
2006 self.dns_transaction_udp(p, host=server_ip)
2007 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2008 self.check_query_txt(prefix, txt)
2009 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2010 self.get_dns_domain(),
2011 "%s.%s" % (prefix, self.get_dns_domain()),
2012 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
2014 def test_update_add_hex_rpc_to_dns(self):
2015 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
2016 prefix = 'rpc' + prefix
2017 name = "%s.%s" % (prefix, self.get_dns_domain())
2019 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
2020 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2021 add_rec_buf.rec = rec
2023 self.rpc_conn.DnssrvUpdateRecord2(
2024 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2027 self.get_dns_domain(),
2032 except WERRORError as e:
2036 self.check_query_txt(prefix, txt)
2038 self.rpc_conn.DnssrvUpdateRecord2(
2039 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2042 self.get_dns_domain(),
2047 def test_update_add_slash_txt_record(self):
2048 "test adding records works"
2049 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
2050 p = self.make_txt_update(prefix, txt)
2051 (response, response_packet) =\
2052 self.dns_transaction_udp(p, host=server_ip)
2053 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2054 self.check_query_txt(prefix, txt)
2055 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2056 self.get_dns_domain(),
2057 "%s.%s" % (prefix, self.get_dns_domain()),
2058 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
2060 # This test fails against Windows as it eliminates slashes in RPC
2061 # One typical use for a slash is in records like 'var=value' to
2062 # escape '=' characters.
2063 def test_update_add_slash_rpc_to_dns(self):
2064 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
2065 prefix = 'rpc' + prefix
2066 name = "%s.%s" % (prefix, self.get_dns_domain())
2068 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
2069 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2070 add_rec_buf.rec = rec
2072 self.rpc_conn.DnssrvUpdateRecord2(
2073 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2076 self.get_dns_domain(),
2081 except WERRORError as e:
2085 self.check_query_txt(prefix, txt)
2088 self.rpc_conn.DnssrvUpdateRecord2(
2089 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2092 self.get_dns_domain(),
2097 def test_update_add_two_txt_records(self):
2098 "test adding two txt records works"
2099 prefix, txt = 'textrec2', ['"This is a test"',
2100 '"and this is a test, too"']
2101 p = self.make_txt_update(prefix, txt)
2102 (response, response_packet) =\
2103 self.dns_transaction_udp(p, host=server_ip)
2104 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2105 self.check_query_txt(prefix, txt)
2106 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2107 self.get_dns_domain(),
2108 "%s.%s" % (prefix, self.get_dns_domain()),
2109 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
2110 ' "\\"and this is a test, too\\""'))
2112 def test_update_add_two_rpc_to_dns(self):
2113 prefix, txt = 'textrec2', ['"This is a test"',
2114 '"and this is a test, too"']
2115 prefix = 'rpc' + prefix
2116 name = "%s.%s" % (prefix, self.get_dns_domain())
2118 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
2119 '"\\"This is a test\\""' +
2120 ' "\\"and this is a test, too\\""')
2121 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2122 add_rec_buf.rec = rec
2124 self.rpc_conn.DnssrvUpdateRecord2(
2125 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2128 self.get_dns_domain(),
2133 except WERRORError as e:
2137 self.check_query_txt(prefix, txt)
2139 self.rpc_conn.DnssrvUpdateRecord2(
2140 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2143 self.get_dns_domain(),
2148 def test_update_add_empty_txt_records(self):
2149 "test adding two txt records works"
2150 prefix, txt = 'emptytextrec', []
2151 p = self.make_txt_update(prefix, txt)
2152 (response, response_packet) =\
2153 self.dns_transaction_udp(p, host=server_ip)
2154 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2155 self.check_query_txt(prefix, txt)
2156 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2157 self.get_dns_domain(),
2158 "%s.%s" % (prefix, self.get_dns_domain()),
2159 dnsp.DNS_TYPE_TXT, ''))
2161 def test_update_add_empty_rpc_to_dns(self):
2162 prefix, txt = 'rpcemptytextrec', []
2164 name = "%s.%s" % (prefix, self.get_dns_domain())
2166 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
2167 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2168 add_rec_buf.rec = rec
2170 self.rpc_conn.DnssrvUpdateRecord2(
2171 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2174 self.get_dns_domain(),
2178 except WERRORError as e:
2182 self.check_query_txt(prefix, txt)
2184 self.rpc_conn.DnssrvUpdateRecord2(
2185 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2188 self.get_dns_domain(),
2194 TestProgram(module=__name__, opts=subunitopts)