dns: custom match rule for DNS records to be tombstoned
[samba.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
24 import ldb
25 import os
26 import sys
27 import struct
28 import random
29 import socket
30 import samba.ndr as ndr
31 from samba import credentials, param
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
38 import optparse
39
40 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
41 sambaopts = options.SambaOptions(parser)
42 parser.add_option_group(sambaopts)
43
44 # This timeout only has relevance when testing against Windows
45 # Format errors tend to return patchy responses, so a timeout is needed.
46 parser.add_option("--timeout", type="int", dest="timeout",
47                   help="Specify timeout for DNS requests")
48
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54
55 opts, args = parser.parse_args()
56
57 lp = sambaopts.get_loadparm()
58 creds = credopts.get_credentials(lp)
59
60 timeout = opts.timeout
61
62 if len(args) < 2:
63     parser.print_usage()
64     sys.exit(1)
65
66 server_name = args[0]
67 server_ip = args[1]
68 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
69
70 class TestSimpleQueries(DNSTest):
71     def setUp(self):
72         super(TestSimpleQueries, self).setUp()
73         global server, server_ip, lp, creds, timeout
74         self.server = server_name
75         self.server_ip = server_ip
76         self.lp = lp
77         self.creds = creds
78         self.timeout = timeout
79
80     def test_one_a_query(self):
81         "create a query packet containing one query record"
82         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
83         questions = []
84
85         name = "%s.%s" % (self.server, self.get_dns_domain())
86         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
87         print("asking for ", q.name)
88         questions.append(q)
89
90         self.finish_name_packet(p, questions)
91         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
92         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
93         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
94         self.assertEquals(response.ancount, 1)
95         self.assertEquals(response.answers[0].rdata,
96                           self.server_ip)
97
98     def test_one_SOA_query(self):
99         "create a query packet containing one query record for the SOA"
100         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
101         questions = []
102
103         name = "%s" % (self.get_dns_domain())
104         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
105         print("asking for ", q.name)
106         questions.append(q)
107
108         self.finish_name_packet(p, questions)
109         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
110         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
111         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
112         self.assertEquals(response.ancount, 1)
113         self.assertEquals(response.answers[0].rdata.mname.upper(),
114                           ("%s.%s" % (self.server, self.get_dns_domain())).upper())
115
116     def test_one_a_query_tcp(self):
117         "create a query packet containing one query record via TCP"
118         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
119         questions = []
120
121         name = "%s.%s" % (self.server, self.get_dns_domain())
122         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
123         print("asking for ", q.name)
124         questions.append(q)
125
126         self.finish_name_packet(p, questions)
127         (response, response_packet) = self.dns_transaction_tcp(p, host=server_ip)
128         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
129         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
130         self.assertEquals(response.ancount, 1)
131         self.assertEquals(response.answers[0].rdata,
132                           self.server_ip)
133
134     def test_one_mx_query(self):
135         "create a query packet causing an empty RCODE_OK answer"
136         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
137         questions = []
138
139         name = "%s.%s" % (self.server, self.get_dns_domain())
140         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
141         print("asking for ", q.name)
142         questions.append(q)
143
144         self.finish_name_packet(p, questions)
145         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
146         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
147         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
148         self.assertEquals(response.ancount, 0)
149
150         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
151         questions = []
152
153         name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
154         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
155         print("asking for ", q.name)
156         questions.append(q)
157
158         self.finish_name_packet(p, questions)
159         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
160         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
161         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
162         self.assertEquals(response.ancount, 0)
163
164     def test_two_queries(self):
165         "create a query packet containing two query records"
166         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
167         questions = []
168
169         name = "%s.%s" % (self.server, self.get_dns_domain())
170         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
171         questions.append(q)
172
173         name = "%s.%s" % ('bogusname', self.get_dns_domain())
174         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
175         questions.append(q)
176
177         self.finish_name_packet(p, questions)
178         try:
179             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
180             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
181         except socket.timeout:
182             # Windows chooses not to respond to incorrectly formatted queries.
183             # Although this appears to be non-deterministic even for the same
184             # request twice, it also appears to be based on a how poorly the
185             # request is formatted.
186             pass
187
188     def test_qtype_all_query(self):
189         "create a QTYPE_ALL query"
190         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
191         questions = []
192
193         name = "%s.%s" % (self.server, self.get_dns_domain())
194         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
195         print("asking for ", q.name)
196         questions.append(q)
197
198         self.finish_name_packet(p, questions)
199         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
200
201         num_answers = 1
202         dc_ipv6 = os.getenv('SERVER_IPV6')
203         if dc_ipv6 is not None:
204             num_answers += 1
205
206         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
207         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
208         self.assertEquals(response.ancount, num_answers)
209         self.assertEquals(response.answers[0].rdata,
210                           self.server_ip)
211         if dc_ipv6 is not None:
212             self.assertEquals(response.answers[1].rdata, dc_ipv6)
213
214     def test_qclass_none_query(self):
215         "create a QCLASS_NONE query"
216         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
217         questions = []
218
219         name = "%s.%s" % (self.server, self.get_dns_domain())
220         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
221         questions.append(q)
222
223         self.finish_name_packet(p, questions)
224         try:
225             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
226             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
227         except socket.timeout:
228             # Windows chooses not to respond to incorrectly formatted queries.
229             # Although this appears to be non-deterministic even for the same
230             # request twice, it also appears to be based on a how poorly the
231             # request is formatted.
232             pass
233
234     def test_soa_hostname_query(self):
235         "create a SOA query for a hostname"
236         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
237         questions = []
238
239         name = "%s.%s" % (self.server, self.get_dns_domain())
240         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
241         questions.append(q)
242
243         self.finish_name_packet(p, questions)
244         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
245         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
246         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
247         # We don't get SOA records for single hosts
248         self.assertEquals(response.ancount, 0)
249         # But we do respond with an authority section
250         self.assertEqual(response.nscount, 1)
251
252     def test_soa_domain_query(self):
253         "create a SOA query for a domain"
254         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
255         questions = []
256
257         name = self.get_dns_domain()
258         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
259         questions.append(q)
260
261         self.finish_name_packet(p, questions)
262         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
263         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
264         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
265         self.assertEquals(response.ancount, 1)
266         self.assertEquals(response.answers[0].rdata.minimum, 3600)
267
268
269 class TestDNSUpdates(DNSTest):
270     def setUp(self):
271         super(TestDNSUpdates, self).setUp()
272         global server, server_ip, lp, creds, timeout
273         self.server = server_name
274         self.server_ip = server_ip
275         self.lp = lp
276         self.creds = creds
277         self.timeout = timeout
278
279     def test_two_updates(self):
280         "create two update requests"
281         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
282         updates = []
283
284         name = "%s.%s" % (self.server, self.get_dns_domain())
285         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
286         updates.append(u)
287
288         name = self.get_dns_domain()
289         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
290         updates.append(u)
291
292         self.finish_name_packet(p, updates)
293         try:
294             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
295             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
296         except socket.timeout:
297             # Windows chooses not to respond to incorrectly formatted queries.
298             # Although this appears to be non-deterministic even for the same
299             # request twice, it also appears to be based on a how poorly the
300             # request is formatted.
301             pass
302
303     def test_update_wrong_qclass(self):
304         "create update with DNS_QCLASS_NONE"
305         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
306         updates = []
307
308         name = self.get_dns_domain()
309         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
310         updates.append(u)
311
312         self.finish_name_packet(p, updates)
313         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
314         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
315
316     def test_update_prereq_with_non_null_ttl(self):
317         "test update with a non-null TTL"
318         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
319         updates = []
320
321         name = self.get_dns_domain()
322
323         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
324         updates.append(u)
325         self.finish_name_packet(p, updates)
326
327         prereqs = []
328         r = dns.res_rec()
329         r.name = "%s.%s" % (self.server, self.get_dns_domain())
330         r.rr_type = dns.DNS_QTYPE_TXT
331         r.rr_class = dns.DNS_QCLASS_NONE
332         r.ttl = 1
333         r.length = 0
334         prereqs.append(r)
335
336         p.ancount = len(prereqs)
337         p.answers = prereqs
338
339         try:
340             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
341             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
342         except socket.timeout:
343             # Windows chooses not to respond to incorrectly formatted queries.
344             # Although this appears to be non-deterministic even for the same
345             # request twice, it also appears to be based on a how poorly the
346             # request is formatted.
347             pass
348
349     def test_update_prereq_with_non_null_length(self):
350         "test update with a non-null length"
351         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
352         updates = []
353
354         name = self.get_dns_domain()
355
356         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
357         updates.append(u)
358         self.finish_name_packet(p, updates)
359
360         prereqs = []
361         r = dns.res_rec()
362         r.name = "%s.%s" % (self.server, self.get_dns_domain())
363         r.rr_type = dns.DNS_QTYPE_TXT
364         r.rr_class = dns.DNS_QCLASS_ANY
365         r.ttl = 0
366         r.length = 1
367         prereqs.append(r)
368
369         p.ancount = len(prereqs)
370         p.answers = prereqs
371
372         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
373         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
374
375     def test_update_prereq_nonexisting_name(self):
376         "test update with a nonexisting name"
377         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
378         updates = []
379
380         name = self.get_dns_domain()
381
382         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
383         updates.append(u)
384         self.finish_name_packet(p, updates)
385
386         prereqs = []
387         r = dns.res_rec()
388         r.name = "idontexist.%s" % self.get_dns_domain()
389         r.rr_type = dns.DNS_QTYPE_TXT
390         r.rr_class = dns.DNS_QCLASS_ANY
391         r.ttl = 0
392         r.length = 0
393         prereqs.append(r)
394
395         p.ancount = len(prereqs)
396         p.answers = prereqs
397
398         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
399         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
400
401     def test_update_add_txt_record(self):
402         "test adding records works"
403         prefix, txt = 'textrec', ['"This is a test"']
404         p = self.make_txt_update(prefix, txt)
405         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
406         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
407         self.check_query_txt(prefix, txt)
408
409     def test_delete_record(self):
410         "Test if deleting records works"
411
412         NAME = "deleterec.%s" % self.get_dns_domain()
413
414         # First, create a record to make sure we have a record to delete.
415         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
416         updates = []
417
418         name = self.get_dns_domain()
419
420         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
421         updates.append(u)
422         self.finish_name_packet(p, updates)
423
424         updates = []
425         r = dns.res_rec()
426         r.name = NAME
427         r.rr_type = dns.DNS_QTYPE_TXT
428         r.rr_class = dns.DNS_QCLASS_IN
429         r.ttl = 900
430         r.length = 0xffff
431         rdata = self.make_txt_record(['"This is a test"'])
432         r.rdata = rdata
433         updates.append(r)
434         p.nscount = len(updates)
435         p.nsrecs = updates
436
437         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
438         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
439
440         # Now check the record is around
441         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
442         questions = []
443         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
444         questions.append(q)
445
446         self.finish_name_packet(p, questions)
447         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
448         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
449
450         # Now delete the record
451         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
452         updates = []
453
454         name = self.get_dns_domain()
455
456         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
457         updates.append(u)
458         self.finish_name_packet(p, updates)
459
460         updates = []
461         r = dns.res_rec()
462         r.name = NAME
463         r.rr_type = dns.DNS_QTYPE_TXT
464         r.rr_class = dns.DNS_QCLASS_NONE
465         r.ttl = 0
466         r.length = 0xffff
467         rdata = self.make_txt_record(['"This is a test"'])
468         r.rdata = rdata
469         updates.append(r)
470         p.nscount = len(updates)
471         p.nsrecs = updates
472
473         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
474         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
475
476         # And finally check it's gone
477         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
478         questions = []
479
480         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
481         questions.append(q)
482
483         self.finish_name_packet(p, questions)
484         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
485         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
486
487     def test_readd_record(self):
488         "Test if adding, deleting and then readding a records works"
489
490         NAME = "readdrec.%s" % self.get_dns_domain()
491
492         # Create the record
493         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
494         updates = []
495
496         name = self.get_dns_domain()
497
498         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
499         updates.append(u)
500         self.finish_name_packet(p, updates)
501
502         updates = []
503         r = dns.res_rec()
504         r.name = NAME
505         r.rr_type = dns.DNS_QTYPE_TXT
506         r.rr_class = dns.DNS_QCLASS_IN
507         r.ttl = 900
508         r.length = 0xffff
509         rdata = self.make_txt_record(['"This is a test"'])
510         r.rdata = rdata
511         updates.append(r)
512         p.nscount = len(updates)
513         p.nsrecs = updates
514
515         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
516         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
517
518         # Now check the record is around
519         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
520         questions = []
521         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
522         questions.append(q)
523
524         self.finish_name_packet(p, questions)
525         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
526         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
527
528         # Now delete the record
529         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
530         updates = []
531
532         name = self.get_dns_domain()
533
534         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
535         updates.append(u)
536         self.finish_name_packet(p, updates)
537
538         updates = []
539         r = dns.res_rec()
540         r.name = NAME
541         r.rr_type = dns.DNS_QTYPE_TXT
542         r.rr_class = dns.DNS_QCLASS_NONE
543         r.ttl = 0
544         r.length = 0xffff
545         rdata = self.make_txt_record(['"This is a test"'])
546         r.rdata = rdata
547         updates.append(r)
548         p.nscount = len(updates)
549         p.nsrecs = updates
550
551         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
552         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
553
554         # check it's gone
555         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
556         questions = []
557
558         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
559         questions.append(q)
560
561         self.finish_name_packet(p, questions)
562         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
563         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
564
565         # recreate the record
566         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
567         updates = []
568
569         name = self.get_dns_domain()
570
571         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
572         updates.append(u)
573         self.finish_name_packet(p, updates)
574
575         updates = []
576         r = dns.res_rec()
577         r.name = NAME
578         r.rr_type = dns.DNS_QTYPE_TXT
579         r.rr_class = dns.DNS_QCLASS_IN
580         r.ttl = 900
581         r.length = 0xffff
582         rdata = self.make_txt_record(['"This is a test"'])
583         r.rdata = rdata
584         updates.append(r)
585         p.nscount = len(updates)
586         p.nsrecs = updates
587
588         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
589         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
590
591         # Now check the record is around
592         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
593         questions = []
594         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
595         questions.append(q)
596
597         self.finish_name_packet(p, questions)
598         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
599         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
600
601     def test_update_add_mx_record(self):
602         "test adding MX records works"
603         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
604         updates = []
605
606         name = self.get_dns_domain()
607
608         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
609         updates.append(u)
610         self.finish_name_packet(p, updates)
611
612         updates = []
613         r = dns.res_rec()
614         r.name = "%s" % self.get_dns_domain()
615         r.rr_type = dns.DNS_QTYPE_MX
616         r.rr_class = dns.DNS_QCLASS_IN
617         r.ttl = 900
618         r.length = 0xffff
619         rdata = dns.mx_record()
620         rdata.preference = 10
621         rdata.exchange = 'mail.%s' % self.get_dns_domain()
622         r.rdata = rdata
623         updates.append(r)
624         p.nscount = len(updates)
625         p.nsrecs = updates
626
627         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
628         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
629
630         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
631         questions = []
632
633         name = "%s" % self.get_dns_domain()
634         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
635         questions.append(q)
636
637         self.finish_name_packet(p, questions)
638         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
639         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
640         self.assertEqual(response.ancount, 1)
641         ans = response.answers[0]
642         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
643         self.assertEqual(ans.rdata.preference, 10)
644         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
645
646
647 class TestComplexQueries(DNSTest):
648     def make_dns_update(self, key, value, qtype):
649         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
650
651         name = self.get_dns_domain()
652         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
653         self.finish_name_packet(p, [u])
654
655         r = dns.res_rec()
656         r.name = key
657         r.rr_type = qtype
658         r.rr_class = dns.DNS_QCLASS_IN
659         r.ttl = 900
660         r.length = 0xffff
661         r.rdata = value
662         p.nscount = 1
663         p.nsrecs = [r]
664         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
665         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
666
667     def setUp(self):
668         super(TestComplexQueries, self).setUp()
669
670         global server, server_ip, lp, creds, timeout
671         self.server = server_name
672         self.server_ip = server_ip
673         self.lp = lp
674         self.creds = creds
675         self.timeout = timeout
676
677     def test_one_a_query(self):
678         "create a query packet containing one query record"
679
680         try:
681
682             # Create the record
683             name = "cname_test.%s" % self.get_dns_domain()
684             rdata = "%s.%s" % (self.server, self.get_dns_domain())
685             self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
686
687             p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
688             questions = []
689
690             # Check the record
691             name = "cname_test.%s" % self.get_dns_domain()
692             q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
693             print("asking for ", q.name)
694             questions.append(q)
695
696             self.finish_name_packet(p, questions)
697             (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
698             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
699             self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
700             self.assertEquals(response.ancount, 2)
701             self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
702             self.assertEquals(response.answers[0].rdata, "%s.%s" %
703                               (self.server, self.get_dns_domain()))
704             self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
705             self.assertEquals(response.answers[1].rdata,
706                               self.server_ip)
707
708         finally:
709             # Delete the record
710             p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
711             updates = []
712
713             name = self.get_dns_domain()
714
715             u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
716             updates.append(u)
717             self.finish_name_packet(p, updates)
718
719             updates = []
720             r = dns.res_rec()
721             r.name = "cname_test.%s" % self.get_dns_domain()
722             r.rr_type = dns.DNS_QTYPE_CNAME
723             r.rr_class = dns.DNS_QCLASS_NONE
724             r.ttl = 0
725             r.length = 0xffff
726             r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
727             updates.append(r)
728             p.nscount = len(updates)
729             p.nsrecs = updates
730
731             (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
732             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
733
734     def test_cname_two_chain(self):
735         name0 = "cnamechain0.%s" % self.get_dns_domain()
736         name1 = "cnamechain1.%s" % self.get_dns_domain()
737         name2 = "cnamechain2.%s" % self.get_dns_domain()
738         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
739         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
740         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
741
742         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
743         questions = []
744         q = self.make_name_question(name1, dns.DNS_QTYPE_A,
745                                     dns.DNS_QCLASS_IN)
746         questions.append(q)
747
748         self.finish_name_packet(p, questions)
749         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
750         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
751         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
752         self.assertEquals(response.ancount, 3)
753
754         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
755         self.assertEquals(response.answers[0].name, name1)
756         self.assertEquals(response.answers[0].rdata, name2)
757
758         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
759         self.assertEquals(response.answers[1].name, name2)
760         self.assertEquals(response.answers[1].rdata, name0)
761
762         self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
763         self.assertEquals(response.answers[2].rdata,
764                           self.server_ip)
765
766     def test_invalid_empty_cname(self):
767         name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
768         try:
769             self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
770         except AssertionError as e:
771             pass
772         else:
773             self.fail("Successfully added empty CNAME, which is invalid.")
774
775     def test_cname_two_chain_not_matching_qtype(self):
776         name0 = "cnamechain0.%s" % self.get_dns_domain()
777         name1 = "cnamechain1.%s" % self.get_dns_domain()
778         name2 = "cnamechain2.%s" % self.get_dns_domain()
779         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
780         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
781         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
782
783         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
784         questions = []
785         q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
786                                     dns.DNS_QCLASS_IN)
787         questions.append(q)
788
789         self.finish_name_packet(p, questions)
790         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
791         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
792         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
793
794         # CNAME should return all intermediate results!
795         # Only the A records exists, not the TXT.
796         self.assertEquals(response.ancount, 2)
797
798         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
799         self.assertEquals(response.answers[0].name, name1)
800         self.assertEquals(response.answers[0].rdata, name2)
801
802         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
803         self.assertEquals(response.answers[1].name, name2)
804         self.assertEquals(response.answers[1].rdata, name0)
805
806 class TestInvalidQueries(DNSTest):
807     def setUp(self):
808         super(TestInvalidQueries, self).setUp()
809         global server, server_ip, lp, creds, timeout
810         self.server = server_name
811         self.server_ip = server_ip
812         self.lp = lp
813         self.creds = creds
814         self.timeout = timeout
815
816     def test_one_a_query(self):
817         "send 0 bytes follows by create a query packet containing one query record"
818
819         s = None
820         try:
821             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
822             s.connect((self.server_ip, 53))
823             s.send("", 0)
824         finally:
825             if s is not None:
826                 s.close()
827
828         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
829         questions = []
830
831         name = "%s.%s" % (self.server, self.get_dns_domain())
832         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
833         print("asking for ", q.name)
834         questions.append(q)
835
836         self.finish_name_packet(p, questions)
837         (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
838         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
839         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
840         self.assertEquals(response.ancount, 1)
841         self.assertEquals(response.answers[0].rdata,
842                           self.server_ip)
843
844     def test_one_a_reply(self):
845         "send a reply instead of a query"
846         global timeout
847
848         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
849         questions = []
850
851         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
852         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
853         print("asking for ", q.name)
854         questions.append(q)
855
856         self.finish_name_packet(p, questions)
857         p.operation |= dns.DNS_FLAG_REPLY
858         s = None
859         try:
860             send_packet = ndr.ndr_pack(p)
861             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
862             s.settimeout(timeout)
863             host=self.server_ip
864             s.connect((host, 53))
865             tcp_packet = struct.pack('!H', len(send_packet))
866             tcp_packet += send_packet
867             s.send(tcp_packet, 0)
868             recv_packet = s.recv(0xffff + 2, 0)
869             self.assertEquals(0, len(recv_packet))
870         except socket.timeout:
871             # Windows chooses not to respond to incorrectly formatted queries.
872             # Although this appears to be non-deterministic even for the same
873             # request twice, it also appears to be based on a how poorly the
874             # request is formatted.
875             pass
876         finally:
877             if s is not None:
878                 s.close()
879
880 class TestZones(DNSTest):
881     def setUp(self):
882         super(TestZones, self).setUp()
883         global server, server_ip, lp, creds, timeout
884         self.server = server_name
885         self.server_ip = server_ip
886         self.lp = lp
887         self.creds = creds
888         self.timeout = timeout
889
890         self.zone = "test.lan"
891         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %\
892                                             (self.server_ip),
893                                             self.lp, self.creds)
894
895         self.samdb = SamDB(url="ldap://" + self.server_ip,
896                            lp = self.get_loadparm(),
897                            session_info=system_session(),
898                            credentials=self.creds)
899
900         self.zone_dn = "DC=" + self.zone +\
901                        ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
902                        str(self.samdb.get_default_basedn())
903
904     def tearDown(self):
905         super(TestZones, self).tearDown()
906
907         try:
908             self.delete_zone(self.zone)
909         except RuntimeError as e:
910             (num, string) = e.args
911             if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
912                 raise
913
914     def create_zone(self, zone, aging_enabled=False):
915         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
916         zone_create.pszZoneName = zone
917         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
918         zone_create.fAging = int(aging_enabled)
919         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
920         zone_create.fDsIntegrated = 1
921         zone_create.fLoadExisting = 1
922         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
923         try:
924             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
925             self.rpc_conn.DnssrvOperation2(client_version,
926                                            0,
927                                            self.server_ip,
928                                            None,
929                                            0,
930                                            'ZoneCreate',
931                                            dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
932                                            zone_create)
933         except WERRORError as e:
934             self.fail(str(e))
935
936     def set_params(self, **kwargs):
937         zone = kwargs.pop('zone', None)
938         for key,val in kwargs.items():
939             name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
940             name_param.dwParam = val
941             name_param.pszNodeName = key
942
943             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
944             nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
945             try:
946                 self.rpc_conn.DnssrvOperation2(client_version, 0, self.server, zone,
947                                                0, 'ResetDwordProperty', nap_type,
948                                                name_param)
949             except WERRORError as e:
950                 self.fail(str(e))
951
952     def ldap_modify_dnsrecs(self, name, func):
953         dn = 'DC={},{}'.format(name, self.zone_dn)
954         dns_recs = self.ldap_get_dns_records(name)
955         for rec in dns_recs:
956             func(rec)
957         update_dict = {'dn':dn, 'dnsRecord':[ndr_pack(r) for r in dns_recs]}
958         self.samdb.modify(ldb.Message.from_dict(self.samdb,
959                                                 update_dict,
960                                                 ldb.FLAG_MOD_REPLACE))
961
962     def dns_update_record(self, prefix, txt):
963         p = self.make_txt_update(prefix, txt, self.zone)
964         (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
965         self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
966         recs = self.ldap_get_dns_records(prefix)
967         recs = [r for r in recs if r.data.str == txt]
968         self.assertEqual(len(recs), 1)
969         return recs[0]
970
971     def ldap_get_records(self, name):
972         dn = 'DC={},{}'.format(name, self.zone_dn)
973         expr = "(&(objectClass=dnsNode)(name={}))".format(name)
974         return self.samdb.search(base=dn, scope=ldb.SCOPE_SUBTREE,
975                                  expression=expr, attrs=["*"])
976
977     def ldap_get_dns_records(self, name):
978         records = self.ldap_get_records(name)
979         return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
980                 for r in records[0].get('dnsRecord')]
981
982     def ldap_get_zone_settings(self):
983         records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
984                    expression="(&(objectClass=dnsZone)"+\
985                                 "(name={}))".format(self.zone),
986                                     attrs=["dNSProperty"])
987         self.assertEqual(len(records), 1)
988         props = [ndr_unpack(dnsp.DnsProperty, r)
989                  for r in records[0].get('dNSProperty')]
990
991         #We have no choice but to repeat these here.
992         zone_prop_ids = {0x00: "EMPTY",
993                          0x01: "TYPE",
994                          0x02: "ALLOW_UPDATE",
995                          0x08: "SECURE_TIME",
996                          0x10: "NOREFRESH_INTERVAL",
997                          0x11: "SCAVENGING_SERVERS",
998                          0x12: "AGING_ENABLED_TIME",
999                          0x20: "REFRESH_INTERVAL",
1000                          0x40: "AGING_STATE",
1001                          0x80: "DELETED_FROM_HOSTNAME",
1002                          0x81: "MASTER_SERVERS",
1003                          0x82: "AUTO_NS_SERVERS",
1004                          0x83: "DCPROMO_CONVERT",
1005                          0x90: "SCAVENGING_SERVERS_DA",
1006                          0x91: "MASTER_SERVERS_DA",
1007                          0x92: "NS_SERVERS_DA",
1008                          0x100: "NODE_DBFLAGS"}
1009         return {zone_prop_ids[p.id].lower(): p.data for p in props}
1010
1011     def set_aging(self, enable=False):
1012         self.create_zone(self.zone, aging_enabled=enable)
1013         self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1014                         Aging=int(bool(enable)), zone=self.zone,
1015                         AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
1016
1017     def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1018         self.set_aging(enable=True)
1019         settings = self.ldap_get_zone_settings()
1020         self.assertTrue(settings['aging_state'] is not None)
1021         self.assertTrue(settings['aging_state'])
1022
1023         rec = self.dns_update_record('agingtest', ['test txt'])
1024         self.assertNotEqual(rec.dwTimeStamp, 0)
1025
1026     def test_set_aging_disabled(self):
1027         self.set_aging(enable=False)
1028         settings = self.ldap_get_zone_settings()
1029         self.assertTrue(settings['aging_state'] is not None)
1030         self.assertFalse(settings['aging_state'])
1031
1032         rec = self.dns_update_record('agingtest', ['test txt'])
1033         self.assertNotEqual(rec.dwTimeStamp, 0)
1034
1035     def test_aging_update(self, enable=True):
1036         name, txt = 'agingtest', ['test txt']
1037         self.set_aging(enable=True)
1038         before_mod = self.dns_update_record(name, txt)
1039         if not enable:
1040             self.set_params(zone=self.zone, Aging=0)
1041         dec = 2
1042         def mod_ts(rec):
1043             self.assertTrue(rec.dwTimeStamp > 0)
1044             rec.dwTimeStamp -= dec
1045         self.ldap_modify_dnsrecs(name, mod_ts)
1046         after_mod = self.ldap_get_dns_records(name)
1047         self.assertEqual(len(after_mod), 1)
1048         after_mod = after_mod[0]
1049         self.assertEqual(after_mod.dwTimeStamp,
1050                          before_mod.dwTimeStamp - dec)
1051         after_update = self.dns_update_record(name, txt)
1052         after_should_equal = before_mod if enable else after_mod
1053         self.assertEqual(after_should_equal.dwTimeStamp,
1054                          after_update.dwTimeStamp)
1055
1056     def test_aging_update_disabled(self):
1057         self.test_aging_update(enable=False)
1058
1059     def test_aging_refresh(self):
1060         name,txt = 'agingtest', ['test txt']
1061         self.create_zone(self.zone, aging_enabled=True)
1062         interval = 10
1063         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1064                         Aging=1, zone=self.zone,
1065                         AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
1066         before_mod = self.dns_update_record(name, txt)
1067         def mod_ts(rec):
1068             self.assertTrue(rec.dwTimeStamp > 0)
1069             rec.dwTimeStamp -= interval/2
1070         self.ldap_modify_dnsrecs(name, mod_ts)
1071         update_during_norefresh = self.dns_update_record(name, txt)
1072         def mod_ts(rec):
1073             self.assertTrue(rec.dwTimeStamp > 0)
1074             rec.dwTimeStamp -= interval + interval/2
1075         self.ldap_modify_dnsrecs(name, mod_ts)
1076         update_during_refresh = self.dns_update_record(name, txt)
1077         self.assertEqual(update_during_norefresh.dwTimeStamp,
1078                          before_mod.dwTimeStamp - interval/2)
1079         self.assertEqual(update_during_refresh.dwTimeStamp,
1080                          before_mod.dwTimeStamp)
1081
1082     def test_rpc_add_no_timestamp(self):
1083         name,txt = 'agingtest', ['test txt']
1084         self.set_aging(enable=True)
1085         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1086         rec_buf.rec = TXTRecord(txt)
1087         self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1088                                           0, self.server_ip,
1089                                           self.zone, name, rec_buf, None)
1090         recs = self.ldap_get_dns_records(name)
1091         self.assertEqual(len(recs), 1)
1092         self.assertEqual(recs[0].dwTimeStamp, 0)
1093
1094     def test_dns_tombstone_custom_match_rule(self):
1095         name,txt = 'agingtest', ['test txt']
1096         name2,txt2 = 'agingtest2', ['test txt2']
1097         name3,txt3 = 'agingtest3', ['test txt3']
1098         self.create_zone(self.zone, aging_enabled=True)
1099         interval = 10
1100         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1101                         Aging=1, zone=self.zone,
1102                         AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
1103
1104         self.dns_update_record(name, txt),
1105
1106         self.dns_update_record(name2, txt),
1107         self.dns_update_record(name2, txt2),
1108
1109         self.dns_update_record(name3, txt),
1110         self.dns_update_record(name3, txt2),
1111         last_update = self.dns_update_record(name3, txt3)
1112
1113         # Modify txt1 of the first 2 names
1114         def mod_ts(rec):
1115             if rec.data.str == txt:
1116                 rec.dwTimeStamp -= 2
1117         self.ldap_modify_dnsrecs(name, mod_ts)
1118         self.ldap_modify_dnsrecs(name2, mod_ts)
1119
1120         recs = self.ldap_get_dns_records(name3)
1121         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
1122         expr = expr.format(int(last_update.dwTimeStamp)-1)
1123         try:
1124             res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1125                                     expression=expr, attrs=["*"])
1126         except ldb.LdbError as e:
1127             self.fail(str(e))
1128         updated_names = {str(r.get('name')) for r in res}
1129         self.assertEqual(updated_names, set([name, name2]))
1130
1131     def test_dns_tombstone_custom_match_rule_fail(self):
1132         self.create_zone(self.zone, aging_enabled=True)
1133
1134         # The check here is that this does not blow up on silly input
1135         expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1136         res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1137                                 expression=expr, attrs=["*"])
1138         self.assertEquals(len(res), 0)
1139
1140     def test_basic_scavenging(self):
1141         self.create_zone(self.zone, aging_enabled=True)
1142         interval = 1
1143         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1144                         zone=self.zone, Aging=1,
1145                         AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
1146         name, txt = 'agingtest', ['test txt']
1147         rec = self.dns_update_record(name,txt)
1148         rec = self.dns_update_record(name+'2',txt)
1149         def mod_ts(rec):
1150             self.assertTrue(rec.dwTimeStamp > 0)
1151             rec.dwTimeStamp -= interval*5
1152         self.ldap_modify_dnsrecs(name, mod_ts)
1153         self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1154         dsdb._scavenge_dns_records(self.samdb)
1155
1156         recs = self.ldap_get_dns_records(name)
1157         self.assertEqual(len(recs), 1)
1158         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1159         recs = self.ldap_get_dns_records(name+'2')
1160         self.assertEqual(len(recs), 1)
1161         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1162
1163     def delete_zone(self, zone):
1164         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1165                                        0,
1166                                        self.server_ip,
1167                                        zone,
1168                                        0,
1169                                        'DeleteZoneFromDs',
1170                                        dnsserver.DNSSRV_TYPEID_NULL,
1171                                        None)
1172
1173     def test_soa_query(self):
1174         zone = "test.lan"
1175         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1176         questions = []
1177
1178         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1179         questions.append(q)
1180         self.finish_name_packet(p, questions)
1181
1182         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1183         # Windows returns OK while BIND logically seems to return NXDOMAIN
1184         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1185         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1186         self.assertEquals(response.ancount, 0)
1187
1188         self.create_zone(zone)
1189         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1190         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1191         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1192         self.assertEquals(response.ancount, 1)
1193         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1194
1195         self.delete_zone(zone)
1196         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1197         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1198         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1199         self.assertEquals(response.ancount, 0)
1200
1201 class TestRPCRoundtrip(DNSTest):
1202     def setUp(self):
1203         super(TestRPCRoundtrip, self).setUp()
1204         global server, server_ip, lp, creds
1205         self.server = server_name
1206         self.server_ip = server_ip
1207         self.lp = lp
1208         self.creds = creds
1209         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
1210                                             self.lp, self.creds)
1211
1212     def tearDown(self):
1213         super(TestRPCRoundtrip, self).tearDown()
1214
1215     def test_update_add_txt_rpc_to_dns(self):
1216         prefix, txt = 'rpctextrec', ['"This is a test"']
1217
1218         name = "%s.%s" % (prefix, self.get_dns_domain())
1219
1220         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1221         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1222         add_rec_buf.rec = rec
1223         try:
1224             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1225                                      0, self.server_ip, self.get_dns_domain(),
1226                                      name, add_rec_buf, None)
1227         except WERRORError as e:
1228             self.fail(str(e))
1229
1230         try:
1231             self.check_query_txt(prefix, txt)
1232         finally:
1233             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1234                                               0, self.server_ip, self.get_dns_domain(),
1235                                               name, None, add_rec_buf)
1236
1237     def test_update_add_null_padded_txt_record(self):
1238         "test adding records works"
1239         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1240         p = self.make_txt_update(prefix, txt)
1241         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1242         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1243         self.check_query_txt(prefix, txt)
1244         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1245                              self.get_dns_domain(),
1246                              "%s.%s" % (prefix, self.get_dns_domain()),
1247                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
1248
1249         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1250         p = self.make_txt_update(prefix, txt)
1251         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1252         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1253         self.check_query_txt(prefix, txt)
1254         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1255                              self.get_dns_domain(),
1256                              "%s.%s" % (prefix, self.get_dns_domain()),
1257                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
1258
1259         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1260         p = self.make_txt_update(prefix, txt)
1261         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1262         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1263         self.check_query_txt(prefix, txt)
1264         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1265                              self.get_dns_domain(),
1266                              "%s.%s" % (prefix, self.get_dns_domain()),
1267                              dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
1268
1269     def test_update_add_padding_rpc_to_dns(self):
1270         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1271         prefix = 'rpc' + prefix
1272         name = "%s.%s" % (prefix, self.get_dns_domain())
1273
1274         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
1275         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1276         add_rec_buf.rec = rec
1277         try:
1278             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1279                                      0, self.server_ip, self.get_dns_domain(),
1280                                      name, add_rec_buf, None)
1281
1282         except WERRORError as e:
1283             self.fail(str(e))
1284
1285         try:
1286             self.check_query_txt(prefix, txt)
1287         finally:
1288             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1289                                               0, self.server_ip, self.get_dns_domain(),
1290                                               name, None, add_rec_buf)
1291
1292         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1293         prefix = 'rpc' + prefix
1294         name = "%s.%s" % (prefix, self.get_dns_domain())
1295
1296         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
1297         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1298         add_rec_buf.rec = rec
1299         try:
1300             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1301                                      0, self.server_ip, self.get_dns_domain(),
1302                                      name, add_rec_buf, None)
1303
1304         except WERRORError as e:
1305             self.fail(str(e))
1306
1307         try:
1308             self.check_query_txt(prefix, txt)
1309         finally:
1310             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1311                                               0, self.server_ip, self.get_dns_domain(),
1312                                               name, None, add_rec_buf)
1313
1314         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1315         prefix = 'rpc' + prefix
1316         name = "%s.%s" % (prefix, self.get_dns_domain())
1317
1318         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
1319         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1320         add_rec_buf.rec = rec
1321         try:
1322             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1323                                      0, self.server_ip, self.get_dns_domain(),
1324                                      name, add_rec_buf, None)
1325         except WERRORError as e:
1326             self.fail(str(e))
1327
1328         try:
1329             self.check_query_txt(prefix, txt)
1330         finally:
1331             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1332                                               0, self.server_ip, self.get_dns_domain(),
1333                                               name, None, add_rec_buf)
1334
1335     # Test is incomplete due to strlen against txt records
1336     def test_update_add_null_char_txt_record(self):
1337         "test adding records works"
1338         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1339         p = self.make_txt_update(prefix, txt)
1340         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1341         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1342         self.check_query_txt(prefix, ['NULL'])
1343         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1344                              self.get_dns_domain(),
1345                              "%s.%s" % (prefix, self.get_dns_domain()),
1346                              dnsp.DNS_TYPE_TXT, '"NULL"'))
1347
1348         prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1349         p = self.make_txt_update(prefix, txt)
1350         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1351         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1352         self.check_query_txt(prefix, ['NULL', 'NULL'])
1353         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1354                              self.get_dns_domain(),
1355                              "%s.%s" % (prefix, self.get_dns_domain()),
1356                              dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1357
1358     def test_update_add_null_char_rpc_to_dns(self):
1359         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1360         prefix = 'rpc' + prefix
1361         name = "%s.%s" % (prefix, self.get_dns_domain())
1362
1363         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
1364         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1365         add_rec_buf.rec = rec
1366         try:
1367             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1368                                      0, self.server_ip, self.get_dns_domain(),
1369                                      name, add_rec_buf, None)
1370
1371         except WERRORError as e:
1372             self.fail(str(e))
1373
1374         try:
1375            self.check_query_txt(prefix, ['NULL'])
1376         finally:
1377             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1378                                               0, self.server_ip, self.get_dns_domain(),
1379                                               name, None, add_rec_buf)
1380
1381     def test_update_add_hex_char_txt_record(self):
1382         "test adding records works"
1383         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1384         p = self.make_txt_update(prefix, txt)
1385         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1386         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1387         self.check_query_txt(prefix, txt)
1388         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1389                              self.get_dns_domain(),
1390                              "%s.%s" % (prefix, self.get_dns_domain()),
1391                              dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1392
1393     def test_update_add_hex_rpc_to_dns(self):
1394         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1395         prefix = 'rpc' + prefix
1396         name = "%s.%s" % (prefix, self.get_dns_domain())
1397
1398         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1399         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1400         add_rec_buf.rec = rec
1401         try:
1402             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1403                                      0, self.server_ip, self.get_dns_domain(),
1404                                      name, add_rec_buf, None)
1405
1406         except WERRORError as e:
1407             self.fail(str(e))
1408
1409         try:
1410            self.check_query_txt(prefix, txt)
1411         finally:
1412             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1413                                               0, self.server_ip, self.get_dns_domain(),
1414                                               name, None, add_rec_buf)
1415
1416     def test_update_add_slash_txt_record(self):
1417         "test adding records works"
1418         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1419         p = self.make_txt_update(prefix, txt)
1420         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1421         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1422         self.check_query_txt(prefix, txt)
1423         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1424                              self.get_dns_domain(),
1425                              "%s.%s" % (prefix, self.get_dns_domain()),
1426                              dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1427
1428     # This test fails against Windows as it eliminates slashes in RPC
1429     # One typical use for a slash is in records like 'var=value' to
1430     # escape '=' characters.
1431     def test_update_add_slash_rpc_to_dns(self):
1432         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1433         prefix = 'rpc' + prefix
1434         name = "%s.%s" % (prefix, self.get_dns_domain())
1435
1436         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1437         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1438         add_rec_buf.rec = rec
1439         try:
1440             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1441                                      0, self.server_ip, self.get_dns_domain(),
1442                                      name, add_rec_buf, None)
1443
1444         except WERRORError as e:
1445             self.fail(str(e))
1446
1447         try:
1448             self.check_query_txt(prefix, txt)
1449
1450         finally:
1451             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1452                                               0, self.server_ip, self.get_dns_domain(),
1453                                               name, None, add_rec_buf)
1454
1455     def test_update_add_two_txt_records(self):
1456         "test adding two txt records works"
1457         prefix, txt = 'textrec2', ['"This is a test"',
1458                                    '"and this is a test, too"']
1459         p = self.make_txt_update(prefix, txt)
1460         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1461         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1462         self.check_query_txt(prefix, txt)
1463         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1464                              self.get_dns_domain(),
1465                              "%s.%s" % (prefix, self.get_dns_domain()),
1466                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1467                              ' "\\"and this is a test, too\\""'))
1468
1469     def test_update_add_two_rpc_to_dns(self):
1470         prefix, txt = 'textrec2', ['"This is a test"',
1471                                    '"and this is a test, too"']
1472         prefix = 'rpc' + prefix
1473         name = "%s.%s" % (prefix, self.get_dns_domain())
1474
1475         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1476                                 '"\\"This is a test\\""' +
1477                                 ' "\\"and this is a test, too\\""')
1478         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1479         add_rec_buf.rec = rec
1480         try:
1481             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1482                                      0, self.server_ip, self.get_dns_domain(),
1483                                      name, add_rec_buf, None)
1484
1485         except WERRORError as e:
1486             self.fail(str(e))
1487
1488         try:
1489             self.check_query_txt(prefix, txt)
1490         finally:
1491             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1492                                               0, self.server_ip, self.get_dns_domain(),
1493                                               name, None, add_rec_buf)
1494
1495     def test_update_add_empty_txt_records(self):
1496         "test adding two txt records works"
1497         prefix, txt = 'emptytextrec', []
1498         p = self.make_txt_update(prefix, txt)
1499         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1500         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1501         self.check_query_txt(prefix, txt)
1502         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1503                              self.get_dns_domain(),
1504                              "%s.%s" % (prefix, self.get_dns_domain()),
1505                              dnsp.DNS_TYPE_TXT, ''))
1506
1507     def test_update_add_empty_rpc_to_dns(self):
1508         prefix, txt = 'rpcemptytextrec', []
1509
1510         name = "%s.%s" % (prefix, self.get_dns_domain())
1511
1512         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1513         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1514         add_rec_buf.rec = rec
1515         try:
1516             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1517                                      0, self.server_ip, self.get_dns_domain(),
1518                                      name, add_rec_buf, None)
1519         except WERRORError as e:
1520             self.fail(str(e))
1521
1522         try:
1523             self.check_query_txt(prefix, txt)
1524         finally:
1525             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1526                                               0, self.server_ip, self.get_dns_domain(),
1527                                               name, None, add_rec_buf)
1528
1529 TestProgram(module=__name__, opts=subunitopts)