pytests: heed assertEquals deprecation warning en-masse
[gd/samba-autobuild/.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
24 import ldb
25 from ldb import ERR_OPERATIONS_ERROR
26 import os
27 import sys
28 import struct
29 import socket
30 import samba.ndr as ndr
31 from samba import credentials
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
38 import optparse
39
40
41 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
42 sambaopts = options.SambaOptions(parser)
43 parser.add_option_group(sambaopts)
44
45 # This timeout only has relevance when testing against Windows
46 # Format errors tend to return patchy responses, so a timeout is needed.
47 parser.add_option("--timeout", type="int", dest="timeout",
48                   help="Specify timeout for DNS requests")
49
50 # use command line creds if available
51 credopts = options.CredentialsOptions(parser)
52 parser.add_option_group(credopts)
53 subunitopts = SubunitOptions(parser)
54 parser.add_option_group(subunitopts)
55
56 opts, args = parser.parse_args()
57
58 lp = sambaopts.get_loadparm()
59 creds = credopts.get_credentials(lp)
60
61 timeout = opts.timeout
62
63 if len(args) < 2:
64     parser.print_usage()
65     sys.exit(1)
66
67 server_name = args[0]
68 server_ip = args[1]
69 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
70
71
72 class TestSimpleQueries(DNSTest):
73     def setUp(self):
74         super(TestSimpleQueries, self).setUp()
75         global server, server_ip, lp, creds, timeout
76         self.server = server_name
77         self.server_ip = server_ip
78         self.lp = lp
79         self.creds = creds
80         self.timeout = timeout
81
82     def test_one_a_query(self):
83         "create a query packet containing one query record"
84         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
85         questions = []
86
87         name = "%s.%s" % (self.server, self.get_dns_domain())
88         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
89         print("asking for ", q.name)
90         questions.append(q)
91
92         self.finish_name_packet(p, questions)
93         (response, response_packet) =\
94             self.dns_transaction_udp(p, host=server_ip)
95         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97         self.assertEqual(response.ancount, 1)
98         self.assertEqual(response.answers[0].rdata,
99                           self.server_ip)
100
101     def test_one_SOA_query(self):
102         "create a query packet containing one query record for the SOA"
103         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
104         questions = []
105
106         name = "%s" % (self.get_dns_domain())
107         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
108         print("asking for ", q.name)
109         questions.append(q)
110
111         self.finish_name_packet(p, questions)
112         (response, response_packet) =\
113             self.dns_transaction_udp(p, host=server_ip)
114         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
115         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
116         self.assertEqual(response.ancount, 1)
117         self.assertEqual(
118             response.answers[0].rdata.mname.upper(),
119             ("%s.%s" % (self.server, self.get_dns_domain())).upper())
120
121     def test_one_a_query_tcp(self):
122         "create a query packet containing one query record via TCP"
123         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
124         questions = []
125
126         name = "%s.%s" % (self.server, self.get_dns_domain())
127         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
128         print("asking for ", q.name)
129         questions.append(q)
130
131         self.finish_name_packet(p, questions)
132         (response, response_packet) =\
133             self.dns_transaction_tcp(p, host=server_ip)
134         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
135         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
136         self.assertEqual(response.ancount, 1)
137         self.assertEqual(response.answers[0].rdata,
138                           self.server_ip)
139
140     def test_one_mx_query(self):
141         "create a query packet causing an empty RCODE_OK answer"
142         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
143         questions = []
144
145         name = "%s.%s" % (self.server, self.get_dns_domain())
146         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
147         print("asking for ", q.name)
148         questions.append(q)
149
150         self.finish_name_packet(p, questions)
151         (response, response_packet) =\
152             self.dns_transaction_udp(p, host=server_ip)
153         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
154         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
155         self.assertEqual(response.ancount, 0)
156
157         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
158         questions = []
159
160         name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
161         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
162         print("asking for ", q.name)
163         questions.append(q)
164
165         self.finish_name_packet(p, questions)
166         (response, response_packet) =\
167             self.dns_transaction_udp(p, host=server_ip)
168         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
169         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
170         self.assertEqual(response.ancount, 0)
171
172     def test_two_queries(self):
173         "create a query packet containing two query records"
174         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
175         questions = []
176
177         name = "%s.%s" % (self.server, self.get_dns_domain())
178         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
179         questions.append(q)
180
181         name = "%s.%s" % ('bogusname', self.get_dns_domain())
182         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
183         questions.append(q)
184
185         self.finish_name_packet(p, questions)
186         try:
187             (response, response_packet) =\
188                 self.dns_transaction_udp(p, host=server_ip)
189             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
190         except socket.timeout:
191             # Windows chooses not to respond to incorrectly formatted queries.
192             # Although this appears to be non-deterministic even for the same
193             # request twice, it also appears to be based on a how poorly the
194             # request is formatted.
195             pass
196
197     def test_qtype_all_query(self):
198         "create a QTYPE_ALL query"
199         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
200         questions = []
201
202         name = "%s.%s" % (self.server, self.get_dns_domain())
203         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
204         print("asking for ", q.name)
205         questions.append(q)
206
207         self.finish_name_packet(p, questions)
208         (response, response_packet) =\
209             self.dns_transaction_udp(p, host=server_ip)
210
211         num_answers = 1
212         dc_ipv6 = os.getenv('SERVER_IPV6')
213         if dc_ipv6 is not None:
214             num_answers += 1
215
216         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
217         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
218         self.assertEqual(response.ancount, num_answers)
219         self.assertEqual(response.answers[0].rdata,
220                           self.server_ip)
221         if dc_ipv6 is not None:
222             self.assertEqual(response.answers[1].rdata, dc_ipv6)
223
224     def test_qclass_none_query(self):
225         "create a QCLASS_NONE query"
226         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
227         questions = []
228
229         name = "%s.%s" % (self.server, self.get_dns_domain())
230         q = self.make_name_question(
231             name,
232             dns.DNS_QTYPE_ALL,
233             dns.DNS_QCLASS_NONE)
234         questions.append(q)
235
236         self.finish_name_packet(p, questions)
237         try:
238             (response, response_packet) =\
239                 self.dns_transaction_udp(p, host=server_ip)
240             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
241         except socket.timeout:
242             # Windows chooses not to respond to incorrectly formatted queries.
243             # Although this appears to be non-deterministic even for the same
244             # request twice, it also appears to be based on a how poorly the
245             # request is formatted.
246             pass
247
248     def test_soa_hostname_query(self):
249         "create a SOA query for a hostname"
250         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
251         questions = []
252
253         name = "%s.%s" % (self.server, self.get_dns_domain())
254         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
255         questions.append(q)
256
257         self.finish_name_packet(p, questions)
258         (response, response_packet) =\
259             self.dns_transaction_udp(p, host=server_ip)
260         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262         # We don't get SOA records for single hosts
263         self.assertEqual(response.ancount, 0)
264         # But we do respond with an authority section
265         self.assertEqual(response.nscount, 1)
266
267     def test_soa_unknown_hostname_query(self):
268         "create a SOA query for an unknown hostname"
269         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
270         questions = []
271
272         name = "foobar.%s" % (self.get_dns_domain())
273         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
274         questions.append(q)
275
276         self.finish_name_packet(p, questions)
277         (response, response_packet) =\
278             self.dns_transaction_udp(p, host=server_ip)
279         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
280         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
281         # We don't get SOA records for single hosts
282         self.assertEqual(response.ancount, 0)
283         # But we do respond with an authority section
284         self.assertEqual(response.nscount, 1)
285
286     def test_soa_domain_query(self):
287         "create a SOA query for a domain"
288         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
289         questions = []
290
291         name = self.get_dns_domain()
292         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
293         questions.append(q)
294
295         self.finish_name_packet(p, questions)
296         (response, response_packet) =\
297             self.dns_transaction_udp(p, host=server_ip)
298         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
299         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
300         self.assertEqual(response.ancount, 1)
301         self.assertEqual(response.answers[0].rdata.minimum, 3600)
302
303
304 class TestDNSUpdates(DNSTest):
305     def setUp(self):
306         super(TestDNSUpdates, self).setUp()
307         global server, server_ip, lp, creds, timeout
308         self.server = server_name
309         self.server_ip = server_ip
310         self.lp = lp
311         self.creds = creds
312         self.timeout = timeout
313
314     def test_two_updates(self):
315         "create two update requests"
316         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
317         updates = []
318
319         name = "%s.%s" % (self.server, self.get_dns_domain())
320         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
321         updates.append(u)
322
323         name = self.get_dns_domain()
324         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
325         updates.append(u)
326
327         self.finish_name_packet(p, updates)
328         try:
329             (response, response_packet) =\
330                 self.dns_transaction_udp(p, host=server_ip)
331             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
332         except socket.timeout:
333             # Windows chooses not to respond to incorrectly formatted queries.
334             # Although this appears to be non-deterministic even for the same
335             # request twice, it also appears to be based on a how poorly the
336             # request is formatted.
337             pass
338
339     def test_update_wrong_qclass(self):
340         "create update with DNS_QCLASS_NONE"
341         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
342         updates = []
343
344         name = self.get_dns_domain()
345         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
346         updates.append(u)
347
348         self.finish_name_packet(p, updates)
349         (response, response_packet) =\
350             self.dns_transaction_udp(p, host=server_ip)
351         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
352
353     def test_update_prereq_with_non_null_ttl(self):
354         "test update with a non-null TTL"
355         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
356         updates = []
357
358         name = self.get_dns_domain()
359
360         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
361         updates.append(u)
362         self.finish_name_packet(p, updates)
363
364         prereqs = []
365         r = dns.res_rec()
366         r.name = "%s.%s" % (self.server, self.get_dns_domain())
367         r.rr_type = dns.DNS_QTYPE_TXT
368         r.rr_class = dns.DNS_QCLASS_NONE
369         r.ttl = 1
370         r.length = 0
371         prereqs.append(r)
372
373         p.ancount = len(prereqs)
374         p.answers = prereqs
375
376         try:
377             (response, response_packet) =\
378                 self.dns_transaction_udp(p, host=server_ip)
379             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
380         except socket.timeout:
381             # Windows chooses not to respond to incorrectly formatted queries.
382             # Although this appears to be non-deterministic even for the same
383             # request twice, it also appears to be based on a how poorly the
384             # request is formatted.
385             pass
386
387     def test_update_prereq_with_non_null_length(self):
388         "test update with a non-null length"
389         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
390         updates = []
391
392         name = self.get_dns_domain()
393
394         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
395         updates.append(u)
396         self.finish_name_packet(p, updates)
397
398         prereqs = []
399         r = dns.res_rec()
400         r.name = "%s.%s" % (self.server, self.get_dns_domain())
401         r.rr_type = dns.DNS_QTYPE_TXT
402         r.rr_class = dns.DNS_QCLASS_ANY
403         r.ttl = 0
404         r.length = 1
405         prereqs.append(r)
406
407         p.ancount = len(prereqs)
408         p.answers = prereqs
409
410         (response, response_packet) =\
411             self.dns_transaction_udp(p, host=server_ip)
412         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
413
414     def test_update_prereq_nonexisting_name(self):
415         "test update with a nonexisting name"
416         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
417         updates = []
418
419         name = self.get_dns_domain()
420
421         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
422         updates.append(u)
423         self.finish_name_packet(p, updates)
424
425         prereqs = []
426         r = dns.res_rec()
427         r.name = "idontexist.%s" % self.get_dns_domain()
428         r.rr_type = dns.DNS_QTYPE_TXT
429         r.rr_class = dns.DNS_QCLASS_ANY
430         r.ttl = 0
431         r.length = 0
432         prereqs.append(r)
433
434         p.ancount = len(prereqs)
435         p.answers = prereqs
436
437         (response, response_packet) =\
438             self.dns_transaction_udp(p, host=server_ip)
439         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
440
441     def test_update_add_txt_record(self):
442         "test adding records works"
443         prefix, txt = 'textrec', ['"This is a test"']
444         p = self.make_txt_update(prefix, txt)
445         (response, response_packet) =\
446             self.dns_transaction_udp(p, host=server_ip)
447         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
448         self.check_query_txt(prefix, txt)
449
450     def test_delete_record(self):
451         "Test if deleting records works"
452
453         NAME = "deleterec.%s" % self.get_dns_domain()
454
455         # First, create a record to make sure we have a record to delete.
456         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
457         updates = []
458
459         name = self.get_dns_domain()
460
461         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
462         updates.append(u)
463         self.finish_name_packet(p, updates)
464
465         updates = []
466         r = dns.res_rec()
467         r.name = NAME
468         r.rr_type = dns.DNS_QTYPE_TXT
469         r.rr_class = dns.DNS_QCLASS_IN
470         r.ttl = 900
471         r.length = 0xffff
472         rdata = self.make_txt_record(['"This is a test"'])
473         r.rdata = rdata
474         updates.append(r)
475         p.nscount = len(updates)
476         p.nsrecs = updates
477
478         (response, response_packet) =\
479             self.dns_transaction_udp(p, host=server_ip)
480         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
481
482         # Now check the record is around
483         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
484         questions = []
485         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
486         questions.append(q)
487
488         self.finish_name_packet(p, questions)
489         (response, response_packet) =\
490             self.dns_transaction_udp(p, host=server_ip)
491         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
492
493         # Now delete the record
494         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
495         updates = []
496
497         name = self.get_dns_domain()
498
499         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
500         updates.append(u)
501         self.finish_name_packet(p, updates)
502
503         updates = []
504         r = dns.res_rec()
505         r.name = NAME
506         r.rr_type = dns.DNS_QTYPE_TXT
507         r.rr_class = dns.DNS_QCLASS_NONE
508         r.ttl = 0
509         r.length = 0xffff
510         rdata = self.make_txt_record(['"This is a test"'])
511         r.rdata = rdata
512         updates.append(r)
513         p.nscount = len(updates)
514         p.nsrecs = updates
515
516         (response, response_packet) =\
517             self.dns_transaction_udp(p, host=server_ip)
518         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
519
520         # And finally check it's gone
521         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
522         questions = []
523
524         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
525         questions.append(q)
526
527         self.finish_name_packet(p, questions)
528         (response, response_packet) =\
529             self.dns_transaction_udp(p, host=server_ip)
530         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
531
532     def test_readd_record(self):
533         "Test if adding, deleting and then readding a records works"
534
535         NAME = "readdrec.%s" % self.get_dns_domain()
536
537         # Create the record
538         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
539         updates = []
540
541         name = self.get_dns_domain()
542
543         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
544         updates.append(u)
545         self.finish_name_packet(p, updates)
546
547         updates = []
548         r = dns.res_rec()
549         r.name = NAME
550         r.rr_type = dns.DNS_QTYPE_TXT
551         r.rr_class = dns.DNS_QCLASS_IN
552         r.ttl = 900
553         r.length = 0xffff
554         rdata = self.make_txt_record(['"This is a test"'])
555         r.rdata = rdata
556         updates.append(r)
557         p.nscount = len(updates)
558         p.nsrecs = updates
559
560         (response, response_packet) =\
561             self.dns_transaction_udp(p, host=server_ip)
562         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
563
564         # Now check the record is around
565         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
566         questions = []
567         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
568         questions.append(q)
569
570         self.finish_name_packet(p, questions)
571         (response, response_packet) =\
572             self.dns_transaction_udp(p, host=server_ip)
573         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
574
575         # Now delete the record
576         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
577         updates = []
578
579         name = self.get_dns_domain()
580
581         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
582         updates.append(u)
583         self.finish_name_packet(p, updates)
584
585         updates = []
586         r = dns.res_rec()
587         r.name = NAME
588         r.rr_type = dns.DNS_QTYPE_TXT
589         r.rr_class = dns.DNS_QCLASS_NONE
590         r.ttl = 0
591         r.length = 0xffff
592         rdata = self.make_txt_record(['"This is a test"'])
593         r.rdata = rdata
594         updates.append(r)
595         p.nscount = len(updates)
596         p.nsrecs = updates
597
598         (response, response_packet) =\
599             self.dns_transaction_udp(p, host=server_ip)
600         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
601
602         # check it's gone
603         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
604         questions = []
605
606         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
607         questions.append(q)
608
609         self.finish_name_packet(p, questions)
610         (response, response_packet) =\
611             self.dns_transaction_udp(p, host=server_ip)
612         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
613
614         # recreate the record
615         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
616         updates = []
617
618         name = self.get_dns_domain()
619
620         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
621         updates.append(u)
622         self.finish_name_packet(p, updates)
623
624         updates = []
625         r = dns.res_rec()
626         r.name = NAME
627         r.rr_type = dns.DNS_QTYPE_TXT
628         r.rr_class = dns.DNS_QCLASS_IN
629         r.ttl = 900
630         r.length = 0xffff
631         rdata = self.make_txt_record(['"This is a test"'])
632         r.rdata = rdata
633         updates.append(r)
634         p.nscount = len(updates)
635         p.nsrecs = updates
636
637         (response, response_packet) =\
638             self.dns_transaction_udp(p, host=server_ip)
639         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
640
641         # Now check the record is around
642         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
643         questions = []
644         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
645         questions.append(q)
646
647         self.finish_name_packet(p, questions)
648         (response, response_packet) =\
649             self.dns_transaction_udp(p, host=server_ip)
650         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
651
652     def test_update_add_mx_record(self):
653         "test adding MX records works"
654         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
655         updates = []
656
657         name = self.get_dns_domain()
658
659         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
660         updates.append(u)
661         self.finish_name_packet(p, updates)
662
663         updates = []
664         r = dns.res_rec()
665         r.name = "%s" % self.get_dns_domain()
666         r.rr_type = dns.DNS_QTYPE_MX
667         r.rr_class = dns.DNS_QCLASS_IN
668         r.ttl = 900
669         r.length = 0xffff
670         rdata = dns.mx_record()
671         rdata.preference = 10
672         rdata.exchange = 'mail.%s' % self.get_dns_domain()
673         r.rdata = rdata
674         updates.append(r)
675         p.nscount = len(updates)
676         p.nsrecs = updates
677
678         (response, response_packet) =\
679             self.dns_transaction_udp(p, host=server_ip)
680         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
681
682         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
683         questions = []
684
685         name = "%s" % self.get_dns_domain()
686         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
687         questions.append(q)
688
689         self.finish_name_packet(p, questions)
690         (response, response_packet) =\
691             self.dns_transaction_udp(p, host=server_ip)
692         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
693         self.assertEqual(response.ancount, 1)
694         ans = response.answers[0]
695         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
696         self.assertEqual(ans.rdata.preference, 10)
697         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
698
699
700 class TestComplexQueries(DNSTest):
701     def make_dns_update(self, key, value, qtype):
702         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
703
704         name = self.get_dns_domain()
705         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
706         self.finish_name_packet(p, [u])
707
708         r = dns.res_rec()
709         r.name = key
710         r.rr_type = qtype
711         r.rr_class = dns.DNS_QCLASS_IN
712         r.ttl = 900
713         r.length = 0xffff
714         r.rdata = value
715         p.nscount = 1
716         p.nsrecs = [r]
717         (response, response_packet) =\
718             self.dns_transaction_udp(p, host=server_ip)
719         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
720
721     def setUp(self):
722         super(TestComplexQueries, self).setUp()
723
724         global server, server_ip, lp, creds, timeout
725         self.server = server_name
726         self.server_ip = server_ip
727         self.lp = lp
728         self.creds = creds
729         self.timeout = timeout
730
731     def test_one_a_query(self):
732         "create a query packet containing one query record"
733
734         try:
735
736             # Create the record
737             name = "cname_test.%s" % self.get_dns_domain()
738             rdata = "%s.%s" % (self.server, self.get_dns_domain())
739             self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
740
741             p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
742             questions = []
743
744             # Check the record
745             name = "cname_test.%s" % self.get_dns_domain()
746             q = self.make_name_question(name,
747                                         dns.DNS_QTYPE_A,
748                                         dns.DNS_QCLASS_IN)
749             print("asking for ", q.name)
750             questions.append(q)
751
752             self.finish_name_packet(p, questions)
753             (response, response_packet) =\
754                 self.dns_transaction_udp(p, host=self.server_ip)
755             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
756             self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
757             self.assertEqual(response.ancount, 2)
758             self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
759             self.assertEqual(response.answers[0].rdata, "%s.%s" %
760                               (self.server, self.get_dns_domain()))
761             self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_A)
762             self.assertEqual(response.answers[1].rdata,
763                               self.server_ip)
764
765         finally:
766             # Delete the record
767             p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
768             updates = []
769
770             name = self.get_dns_domain()
771
772             u = self.make_name_question(name,
773                                         dns.DNS_QTYPE_SOA,
774                                         dns.DNS_QCLASS_IN)
775             updates.append(u)
776             self.finish_name_packet(p, updates)
777
778             updates = []
779             r = dns.res_rec()
780             r.name = "cname_test.%s" % self.get_dns_domain()
781             r.rr_type = dns.DNS_QTYPE_CNAME
782             r.rr_class = dns.DNS_QCLASS_NONE
783             r.ttl = 0
784             r.length = 0xffff
785             r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
786             updates.append(r)
787             p.nscount = len(updates)
788             p.nsrecs = updates
789
790             (response, response_packet) =\
791                 self.dns_transaction_udp(p, host=self.server_ip)
792             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
793
794     def test_cname_two_chain(self):
795         name0 = "cnamechain0.%s" % self.get_dns_domain()
796         name1 = "cnamechain1.%s" % self.get_dns_domain()
797         name2 = "cnamechain2.%s" % self.get_dns_domain()
798         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
799         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
800         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
801
802         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
803         questions = []
804         q = self.make_name_question(name1, dns.DNS_QTYPE_A,
805                                     dns.DNS_QCLASS_IN)
806         questions.append(q)
807
808         self.finish_name_packet(p, questions)
809         (response, response_packet) =\
810             self.dns_transaction_udp(p, host=server_ip)
811         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
812         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
813         self.assertEqual(response.ancount, 3)
814
815         self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
816         self.assertEqual(response.answers[0].name, name1)
817         self.assertEqual(response.answers[0].rdata, name2)
818
819         self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
820         self.assertEqual(response.answers[1].name, name2)
821         self.assertEqual(response.answers[1].rdata, name0)
822
823         self.assertEqual(response.answers[2].rr_type, dns.DNS_QTYPE_A)
824         self.assertEqual(response.answers[2].rdata,
825                           self.server_ip)
826
827     def test_invalid_empty_cname(self):
828         name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
829         try:
830             self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
831         except AssertionError:
832             pass
833         else:
834             self.fail("Successfully added empty CNAME, which is invalid.")
835
836     def test_cname_two_chain_not_matching_qtype(self):
837         name0 = "cnamechain0.%s" % self.get_dns_domain()
838         name1 = "cnamechain1.%s" % self.get_dns_domain()
839         name2 = "cnamechain2.%s" % self.get_dns_domain()
840         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
841         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
842         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
843
844         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
845         questions = []
846         q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
847                                     dns.DNS_QCLASS_IN)
848         questions.append(q)
849
850         self.finish_name_packet(p, questions)
851         (response, response_packet) =\
852             self.dns_transaction_udp(p, host=server_ip)
853         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
854         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
855
856         # CNAME should return all intermediate results!
857         # Only the A records exists, not the TXT.
858         self.assertEqual(response.ancount, 2)
859
860         self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
861         self.assertEqual(response.answers[0].name, name1)
862         self.assertEqual(response.answers[0].rdata, name2)
863
864         self.assertEqual(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
865         self.assertEqual(response.answers[1].name, name2)
866         self.assertEqual(response.answers[1].rdata, name0)
867
868     def test_cname_loop(self):
869         cname1 = "cnamelooptestrec." + self.get_dns_domain()
870         cname2 = "cnamelooptestrec2." + self.get_dns_domain()
871         cname3 = "cnamelooptestrec3." + self.get_dns_domain()
872         self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
873         self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
874         self.make_dns_update(cname3, cname1, dnsp.DNS_TYPE_CNAME)
875
876         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
877         questions = []
878
879         q = self.make_name_question(cname1,
880                                     dns.DNS_QTYPE_A,
881                                     dns.DNS_QCLASS_IN)
882         questions.append(q)
883         self.finish_name_packet(p, questions)
884
885         (response, response_packet) =\
886             self.dns_transaction_udp(p, host=self.server_ip)
887
888         max_recursion_depth = 20
889         self.assertEqual(len(response.answers), max_recursion_depth)
890
891     # Make sure cname limit doesn't count other records.  This is a generic
892     # test called in tests below
893     def max_rec_test(self, rtype, rec_gen):
894         name = "limittestrec{0}.{1}".format(rtype, self.get_dns_domain())
895         limit = 20
896         num_recs_to_enter = limit + 5
897
898         for i in range(1, num_recs_to_enter+1):
899             ip = rec_gen(i)
900             self.make_dns_update(name, ip, rtype)
901
902         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
903         questions = []
904
905         q = self.make_name_question(name,
906                                     rtype,
907                                     dns.DNS_QCLASS_IN)
908         questions.append(q)
909         self.finish_name_packet(p, questions)
910
911         (response, response_packet) =\
912             self.dns_transaction_udp(p, host=self.server_ip)
913
914         self.assertEqual(len(response.answers), num_recs_to_enter)
915
916     def test_record_limit_A(self):
917         def ip4_gen(i):
918             return "127.0.0." + str(i)
919         self.max_rec_test(rtype=dns.DNS_QTYPE_A, rec_gen=ip4_gen)
920
921     def test_record_limit_AAAA(self):
922         def ip6_gen(i):
923             return "AAAA:0:0:0:0:0:0:" + str(i)
924         self.max_rec_test(rtype=dns.DNS_QTYPE_AAAA, rec_gen=ip6_gen)
925
926     def test_record_limit_SRV(self):
927         def srv_gen(i):
928             rec = dns.srv_record()
929             rec.priority = 1
930             rec.weight = 1
931             rec.port = 92
932             rec.target = "srvtestrec" + str(i)
933             return rec
934         self.max_rec_test(rtype=dns.DNS_QTYPE_SRV, rec_gen=srv_gen)
935
936     # Same as test_record_limit_A but with a preceding CNAME follow
937     def test_cname_limit(self):
938         cname1 = "cnamelimittestrec." + self.get_dns_domain()
939         cname2 = "cnamelimittestrec2." + self.get_dns_domain()
940         cname3 = "cnamelimittestrec3." + self.get_dns_domain()
941         ip_prefix = '127.0.0.'
942         limit = 20
943         num_recs_to_enter = limit + 5
944
945         self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
946         self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
947         num_arecs_to_enter = num_recs_to_enter - 2
948         for i in range(1, num_arecs_to_enter+1):
949             ip = ip_prefix + str(i)
950             self.make_dns_update(cname3, ip, dns.DNS_QTYPE_A)
951
952         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
953         questions = []
954
955         q = self.make_name_question(cname1,
956                                     dns.DNS_QTYPE_A,
957                                     dns.DNS_QCLASS_IN)
958         questions.append(q)
959         self.finish_name_packet(p, questions)
960
961         (response, response_packet) =\
962             self.dns_transaction_udp(p, host=self.server_ip)
963
964         self.assertEqual(len(response.answers), num_recs_to_enter)
965
966     # ANY query on cname record shouldn't follow the link
967     def test_cname_any_query(self):
968         cname1 = "cnameanytestrec." + self.get_dns_domain()
969         cname2 = "cnameanytestrec2." + self.get_dns_domain()
970         cname3 = "cnameanytestrec3." + self.get_dns_domain()
971
972         self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
973         self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
974
975         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
976         questions = []
977
978         q = self.make_name_question(cname1,
979                                     dns.DNS_QTYPE_ALL,
980                                     dns.DNS_QCLASS_IN)
981         questions.append(q)
982         self.finish_name_packet(p, questions)
983
984         (response, response_packet) =\
985             self.dns_transaction_udp(p, host=self.server_ip)
986
987         self.assertEqual(len(response.answers), 1)
988         self.assertEqual(response.answers[0].name, cname1)
989         self.assertEqual(response.answers[0].rdata, cname2)
990
991
992 class TestInvalidQueries(DNSTest):
993     def setUp(self):
994         super(TestInvalidQueries, self).setUp()
995         global server, server_ip, lp, creds, timeout
996         self.server = server_name
997         self.server_ip = server_ip
998         self.lp = lp
999         self.creds = creds
1000         self.timeout = timeout
1001
1002     def test_one_a_query(self):
1003         """send 0 bytes follows by create a query packet
1004            containing one query record"""
1005
1006         s = None
1007         try:
1008             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
1009             s.connect((self.server_ip, 53))
1010             s.send(b"", 0)
1011         finally:
1012             if s is not None:
1013                 s.close()
1014
1015         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1016         questions = []
1017
1018         name = "%s.%s" % (self.server, self.get_dns_domain())
1019         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1020         print("asking for ", q.name)
1021         questions.append(q)
1022
1023         self.finish_name_packet(p, questions)
1024         (response, response_packet) =\
1025             self.dns_transaction_udp(p, host=self.server_ip)
1026         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1027         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1028         self.assertEqual(response.ancount, 1)
1029         self.assertEqual(response.answers[0].rdata,
1030                           self.server_ip)
1031
1032     def test_one_a_reply(self):
1033         "send a reply instead of a query"
1034         global timeout
1035
1036         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1037         questions = []
1038
1039         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
1040         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1041         print("asking for ", q.name)
1042         questions.append(q)
1043
1044         self.finish_name_packet(p, questions)
1045         p.operation |= dns.DNS_FLAG_REPLY
1046         s = None
1047         try:
1048             send_packet = ndr.ndr_pack(p)
1049             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
1050             s.settimeout(timeout)
1051             host = self.server_ip
1052             s.connect((host, 53))
1053             tcp_packet = struct.pack('!H', len(send_packet))
1054             tcp_packet += send_packet
1055             s.send(tcp_packet, 0)
1056             recv_packet = s.recv(0xffff + 2, 0)
1057             self.assertEqual(0, len(recv_packet))
1058         except socket.timeout:
1059             # Windows chooses not to respond to incorrectly formatted queries.
1060             # Although this appears to be non-deterministic even for the same
1061             # request twice, it also appears to be based on a how poorly the
1062             # request is formatted.
1063             pass
1064         finally:
1065             if s is not None:
1066                 s.close()
1067
1068
1069 class TestZones(DNSTest):
1070     def setUp(self):
1071         super(TestZones, self).setUp()
1072         global server, server_ip, lp, creds, timeout
1073         self.server = server_name
1074         self.server_ip = server_ip
1075         self.lp = lp
1076         self.creds = creds
1077         self.timeout = timeout
1078
1079         self.zone = "test.lan"
1080         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1081                                             (self.server_ip),
1082                                             self.lp, self.creds)
1083
1084         self.samdb = SamDB(url="ldap://" + self.server_ip,
1085                            lp=self.get_loadparm(),
1086                            session_info=system_session(),
1087                            credentials=self.creds)
1088         self.zone_dn = "DC=" + self.zone +\
1089                        ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
1090                        str(self.samdb.get_default_basedn())
1091
1092     def tearDown(self):
1093         super(TestZones, self).tearDown()
1094
1095         try:
1096             self.delete_zone(self.zone)
1097         except RuntimeError as e:
1098             (num, string) = e.args
1099             if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
1100                 raise
1101
1102     def make_zone_obj(self, zone, aging_enabled=False):
1103         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1104         zone_create.pszZoneName = zone
1105         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
1106         zone_create.fAging = int(aging_enabled)
1107         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
1108         zone_create.fDsIntegrated = 1
1109         zone_create.fLoadExisting = 1
1110         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
1111         return zone_create
1112
1113     def create_zone(self, zone, aging_enabled=False):
1114         zone_create = self.make_zone_obj(zone, aging_enabled)
1115         try:
1116             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1117             self.rpc_conn.DnssrvOperation2(client_version,
1118                                            0,
1119                                            self.server_ip,
1120                                            None,
1121                                            0,
1122                                            'ZoneCreate',
1123                                            dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
1124                                            zone_create)
1125         except WERRORError as e:
1126             self.fail(e)
1127
1128     def set_params(self, **kwargs):
1129         zone = kwargs.pop('zone', None)
1130         for key, val in kwargs.items():
1131             name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
1132             name_param.dwParam = val
1133             name_param.pszNodeName = key
1134
1135             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1136             nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
1137             try:
1138                 self.rpc_conn.DnssrvOperation2(client_version,
1139                                                0,
1140                                                self.server,
1141                                                zone,
1142                                                0,
1143                                                'ResetDwordProperty',
1144                                                nap_type,
1145                                                name_param)
1146             except WERRORError as e:
1147                 self.fail(str(e))
1148
1149     def ldap_modify_dnsrecs(self, name, func):
1150         dn = 'DC={0},{1}'.format(name, self.zone_dn)
1151         dns_recs = self.ldap_get_dns_records(name)
1152         for rec in dns_recs:
1153             func(rec)
1154         update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1155         self.samdb.modify(ldb.Message.from_dict(self.samdb,
1156                                                 update_dict,
1157                                                 ldb.FLAG_MOD_REPLACE))
1158
1159     def dns_update_record(self, prefix, txt):
1160         p = self.make_txt_update(prefix, txt, self.zone)
1161         (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1162         self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1163         recs = self.ldap_get_dns_records(prefix)
1164         recs = [r for r in recs if r.data.str == txt]
1165         self.assertEqual(len(recs), 1)
1166         return recs[0]
1167
1168     def dns_tombstone(self, prefix, txt, zone):
1169         name = prefix + "." + zone
1170
1171         to = dnsp.DnssrvRpcRecord()
1172         to.dwTimeStamp = 1000
1173         to.wType = dnsp.DNS_TYPE_TOMBSTONE
1174
1175         self.samdb.dns_replace(name, [to])
1176
1177     def ldap_get_records(self, name):
1178         # The use of SCOPE_SUBTREE here avoids raising an exception in the
1179         # 0 results case for a test below.
1180
1181         expr = "(&(objectClass=dnsNode)(name={0}))".format(name)
1182         return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1183                                  expression=expr, attrs=["*"])
1184
1185     def ldap_get_dns_records(self, name):
1186         records = self.ldap_get_records(name)
1187         return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1188                 for r in records[0].get('dnsRecord')]
1189
1190     def ldap_get_zone_settings(self):
1191         records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1192                                     expression="(&(objectClass=dnsZone)" +
1193                                     "(name={0}))".format(self.zone),
1194                                     attrs=["dNSProperty"])
1195         self.assertEqual(len(records), 1)
1196         props = [ndr_unpack(dnsp.DnsProperty, r)
1197                  for r in records[0].get('dNSProperty')]
1198
1199         # We have no choice but to repeat these here.
1200         zone_prop_ids = {0x00: "EMPTY",
1201                          0x01: "TYPE",
1202                          0x02: "ALLOW_UPDATE",
1203                          0x08: "SECURE_TIME",
1204                          0x10: "NOREFRESH_INTERVAL",
1205                          0x11: "SCAVENGING_SERVERS",
1206                          0x12: "AGING_ENABLED_TIME",
1207                          0x20: "REFRESH_INTERVAL",
1208                          0x40: "AGING_STATE",
1209                          0x80: "DELETED_FROM_HOSTNAME",
1210                          0x81: "MASTER_SERVERS",
1211                          0x82: "AUTO_NS_SERVERS",
1212                          0x83: "DCPROMO_CONVERT",
1213                          0x90: "SCAVENGING_SERVERS_DA",
1214                          0x91: "MASTER_SERVERS_DA",
1215                          0x92: "NS_SERVERS_DA",
1216                          0x100: "NODE_DBFLAGS"}
1217         return {zone_prop_ids[p.id].lower(): p.data for p in props}
1218
1219     def set_aging(self, enable=False):
1220         self.create_zone(self.zone, aging_enabled=enable)
1221         self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1222                         Aging=int(bool(enable)), zone=self.zone,
1223                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1224
1225     def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1226         self.set_aging(enable=True)
1227         settings = self.ldap_get_zone_settings()
1228         self.assertTrue(settings['aging_state'] is not None)
1229         self.assertTrue(settings['aging_state'])
1230
1231         rec = self.dns_update_record('agingtest', ['test txt'])
1232         self.assertNotEqual(rec.dwTimeStamp, 0)
1233
1234     def test_set_aging_disabled(self):
1235         self.set_aging(enable=False)
1236         settings = self.ldap_get_zone_settings()
1237         self.assertTrue(settings['aging_state'] is not None)
1238         self.assertFalse(settings['aging_state'])
1239
1240         rec = self.dns_update_record('agingtest', ['test txt'])
1241         self.assertNotEqual(rec.dwTimeStamp, 0)
1242
1243     def test_aging_update(self, enable=True):
1244         name, txt = 'agingtest', ['test txt']
1245         self.set_aging(enable=True)
1246         before_mod = self.dns_update_record(name, txt)
1247         if not enable:
1248             self.set_params(zone=self.zone, Aging=0)
1249         dec = 2
1250
1251         def mod_ts(rec):
1252             self.assertTrue(rec.dwTimeStamp > 0)
1253             rec.dwTimeStamp -= dec
1254         self.ldap_modify_dnsrecs(name, mod_ts)
1255         after_mod = self.ldap_get_dns_records(name)
1256         self.assertEqual(len(after_mod), 1)
1257         after_mod = after_mod[0]
1258         self.assertEqual(after_mod.dwTimeStamp,
1259                          before_mod.dwTimeStamp - dec)
1260         after_update = self.dns_update_record(name, txt)
1261         after_should_equal = before_mod if enable else after_mod
1262         self.assertEqual(after_should_equal.dwTimeStamp,
1263                          after_update.dwTimeStamp)
1264
1265     def test_aging_update_disabled(self):
1266         self.test_aging_update(enable=False)
1267
1268     def test_aging_refresh(self):
1269         name, txt = 'agingtest', ['test txt']
1270         self.create_zone(self.zone, aging_enabled=True)
1271         interval = 10
1272         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1273                         Aging=1, zone=self.zone,
1274                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1275         before_mod = self.dns_update_record(name, txt)
1276
1277         def mod_ts(rec):
1278             self.assertTrue(rec.dwTimeStamp > 0)
1279             rec.dwTimeStamp -= interval // 2
1280         self.ldap_modify_dnsrecs(name, mod_ts)
1281         update_during_norefresh = self.dns_update_record(name, txt)
1282
1283         def mod_ts(rec):
1284             self.assertTrue(rec.dwTimeStamp > 0)
1285             rec.dwTimeStamp -= interval + interval // 2
1286         self.ldap_modify_dnsrecs(name, mod_ts)
1287         update_during_refresh = self.dns_update_record(name, txt)
1288         self.assertEqual(update_during_norefresh.dwTimeStamp,
1289                          before_mod.dwTimeStamp - interval / 2)
1290         self.assertEqual(update_during_refresh.dwTimeStamp,
1291                          before_mod.dwTimeStamp)
1292
1293     def test_rpc_add_no_timestamp(self):
1294         name, txt = 'agingtest', ['test txt']
1295         self.set_aging(enable=True)
1296         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1297         rec_buf.rec = TXTRecord(txt)
1298         self.rpc_conn.DnssrvUpdateRecord2(
1299             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1300             0,
1301             self.server_ip,
1302             self.zone,
1303             name,
1304             rec_buf,
1305             None)
1306         recs = self.ldap_get_dns_records(name)
1307         self.assertEqual(len(recs), 1)
1308         self.assertEqual(recs[0].dwTimeStamp, 0)
1309
1310     def test_static_record_dynamic_update(self):
1311         name, txt = 'agingtest', ['test txt']
1312         txt2 = ['test txt2']
1313         self.set_aging(enable=True)
1314         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1315         rec_buf.rec = TXTRecord(txt)
1316         self.rpc_conn.DnssrvUpdateRecord2(
1317             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1318             0,
1319             self.server_ip,
1320             self.zone,
1321             name,
1322             rec_buf,
1323             None)
1324
1325         rec2 = self.dns_update_record(name, txt2)
1326         self.assertEqual(rec2.dwTimeStamp, 0)
1327
1328     def test_dynamic_record_static_update(self):
1329         name, txt = 'agingtest', ['test txt']
1330         txt2 = ['test txt2']
1331         txt3 = ['test txt3']
1332         self.set_aging(enable=True)
1333
1334         self.dns_update_record(name, txt)
1335
1336         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1337         rec_buf.rec = TXTRecord(txt2)
1338         self.rpc_conn.DnssrvUpdateRecord2(
1339             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1340             0,
1341             self.server_ip,
1342             self.zone,
1343             name,
1344             rec_buf,
1345             None)
1346
1347         self.dns_update_record(name, txt3)
1348
1349         recs = self.ldap_get_dns_records(name)
1350         # Put in dict because ldap recs might be out of order
1351         recs = {str(r.data.str): r for r in recs}
1352         self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1353         self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1354         self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1355
1356     def test_dns_tombstone_custom_match_rule(self):
1357         lp = self.get_loadparm()
1358         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1359                            session_info=system_session(),
1360                            credentials=self.creds)
1361
1362         name, txt = 'agingtest', ['test txt']
1363         name2, txt2 = 'agingtest2', ['test txt2']
1364         name3, txt3 = 'agingtest3', ['test txt3']
1365         name4, txt4 = 'agingtest4', ['test txt4']
1366         name5, txt5 = 'agingtest5', ['test txt5']
1367
1368         self.create_zone(self.zone, aging_enabled=True)
1369         interval = 10
1370         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1371                         Aging=1, zone=self.zone,
1372                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1373
1374         self.dns_update_record(name, txt)
1375
1376         self.dns_update_record(name2, txt)
1377         self.dns_update_record(name2, txt2)
1378
1379         self.dns_update_record(name3, txt)
1380         self.dns_update_record(name3, txt2)
1381         last_update = self.dns_update_record(name3, txt3)
1382
1383         # Modify txt1 of the first 2 names
1384         def mod_ts(rec):
1385             if rec.data.str == txt:
1386                 rec.dwTimeStamp -= 2
1387         self.ldap_modify_dnsrecs(name, mod_ts)
1388         self.ldap_modify_dnsrecs(name2, mod_ts)
1389
1390         # create a static dns record.
1391         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1392         rec_buf.rec = TXTRecord(txt4)
1393         self.rpc_conn.DnssrvUpdateRecord2(
1394             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1395             0,
1396             self.server_ip,
1397             self.zone,
1398             name4,
1399             rec_buf,
1400             None)
1401
1402         # Create a tomb stoned record.
1403         self.dns_update_record(name5, txt5)
1404         self.dns_tombstone(name5, txt5, self.zone)
1405
1406         self.ldap_get_dns_records(name3)
1407         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1408         expr = expr.format(int(last_update.dwTimeStamp) - 1)
1409         try:
1410             res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1411                                     expression=expr, attrs=["*"])
1412         except ldb.LdbError as e:
1413             self.fail(str(e))
1414         updated_names = {str(r.get('name')) for r in res}
1415         self.assertEqual(updated_names, set([name, name2]))
1416
1417     def test_dns_tombstone_custom_match_rule_no_records(self):
1418         lp = self.get_loadparm()
1419         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1420                            session_info=system_session(),
1421                            credentials=self.creds)
1422
1423         self.create_zone(self.zone, aging_enabled=True)
1424         interval = 10
1425         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1426                         Aging=1, zone=self.zone,
1427                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1428
1429         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1430         expr = expr.format(1)
1431
1432         try:
1433             res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1434                                     expression=expr, attrs=["*"])
1435         except ldb.LdbError as e:
1436             self.fail(str(e))
1437         self.assertEqual(0, len(res))
1438
1439     def test_dns_tombstone_custom_match_rule_fail(self):
1440         self.create_zone(self.zone, aging_enabled=True)
1441         samdb = SamDB(url=lp.samdb_url(),
1442                       lp=lp,
1443                       session_info=system_session(),
1444                       credentials=self.creds)
1445
1446         # Property name in not dnsRecord
1447         expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1448         res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1449                            expression=expr, attrs=["*"])
1450         self.assertEqual(len(res), 0)
1451
1452         # No value for tombstone time
1453         try:
1454             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1455             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1456                                expression=expr, attrs=["*"])
1457             self.assertEqual(len(res), 0)
1458             self.fail("Exception: ldb.ldbError not generated")
1459         except ldb.LdbError as e:
1460             (num, msg) = e.args
1461             self.assertEqual(num, ERR_OPERATIONS_ERROR)
1462
1463         # Tombstone time = -
1464         try:
1465             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1466             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1467                                expression=expr, attrs=["*"])
1468             self.assertEqual(len(res), 0)
1469             self.fail("Exception: ldb.ldbError not generated")
1470         except ldb.LdbError as e:
1471             (num, _) = e.args
1472             self.assertEqual(num, ERR_OPERATIONS_ERROR)
1473
1474         # Tombstone time longer than 64 characters
1475         try:
1476             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1477             expr = expr.format("1" * 65)
1478             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1479                                expression=expr, attrs=["*"])
1480             self.assertEqual(len(res), 0)
1481             self.fail("Exception: ldb.ldbError not generated")
1482         except ldb.LdbError as e:
1483             (num, _) = e.args
1484             self.assertEqual(num, ERR_OPERATIONS_ERROR)
1485
1486         # Non numeric Tombstone time
1487         try:
1488             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1489             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1490                                expression=expr, attrs=["*"])
1491             self.assertEqual(len(res), 0)
1492             self.fail("Exception: ldb.ldbError not generated")
1493         except ldb.LdbError as e:
1494             (num, _) = e.args
1495             self.assertEqual(num, ERR_OPERATIONS_ERROR)
1496
1497         # Non system session
1498         try:
1499             db = SamDB(url="ldap://" + self.server_ip,
1500                        lp=self.get_loadparm(),
1501                        credentials=self.creds)
1502
1503             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1504             res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1505                             expression=expr, attrs=["*"])
1506             self.assertEqual(len(res), 0)
1507             self.fail("Exception: ldb.ldbError not generated")
1508         except ldb.LdbError as e:
1509             (num, _) = e.args
1510             self.assertEqual(num, ERR_OPERATIONS_ERROR)
1511
1512     def test_basic_scavenging(self):
1513         lp = self.get_loadparm()
1514         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1515                            session_info=system_session(),
1516                            credentials=self.creds)
1517
1518         self.create_zone(self.zone, aging_enabled=True)
1519         interval = 1
1520         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1521                         zone=self.zone, Aging=1,
1522                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1523         name, txt = 'agingtest', ['test txt']
1524         name2, txt2 = 'agingtest2', ['test txt2']
1525         name3, txt3 = 'agingtest3', ['test txt3']
1526         name4, txt4 = 'agingtest4', ['test txt4']
1527         name5, txt5 = 'agingtest5', ['test txt5']
1528         self.dns_update_record(name, txt)
1529         self.dns_update_record(name2, txt)
1530         self.dns_update_record(name2, txt2)
1531         self.dns_update_record(name3, txt)
1532         self.dns_update_record(name3, txt2)
1533
1534         # Create a tomb stoned record.
1535         self.dns_update_record(name4, txt4)
1536         self.dns_tombstone(name4, txt4, self.zone)
1537         records = self.ldap_get_records(name4)
1538         self.assertTrue("dNSTombstoned" in records[0])
1539         self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1540
1541         # Create an un-tombstoned record, with dnsTombstoned: FALSE
1542         self.dns_update_record(name5, txt5)
1543         self.dns_tombstone(name5, txt5, self.zone)
1544         self.dns_update_record(name5, txt5)
1545         records = self.ldap_get_records(name5)
1546         self.assertTrue("dNSTombstoned" in records[0])
1547         self.assertEqual(records[0]["dNSTombstoned"][0], b"FALSE")
1548
1549         last_add = self.dns_update_record(name3, txt3)
1550
1551         def mod_ts(rec):
1552             self.assertTrue(rec.dwTimeStamp > 0)
1553             if rec.data.str == txt:
1554                 rec.dwTimeStamp -= interval * 5
1555
1556         def mod_ts_all(rec):
1557             rec.dwTimeStamp -= interval * 5
1558         self.ldap_modify_dnsrecs(name, mod_ts)
1559         self.ldap_modify_dnsrecs(name2, mod_ts)
1560         self.ldap_modify_dnsrecs(name3, mod_ts)
1561         self.ldap_modify_dnsrecs(name5, mod_ts_all)
1562         self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1563         dsdb._scavenge_dns_records(self.samdb)
1564
1565         recs = self.ldap_get_dns_records(name)
1566         self.assertEqual(len(recs), 1)
1567         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1568         records = self.ldap_get_records(name)
1569         self.assertTrue("dNSTombstoned" in records[0])
1570         self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1571
1572         recs = self.ldap_get_dns_records(name2)
1573         self.assertEqual(len(recs), 1)
1574         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1575         self.assertEqual(recs[0].data.str, txt2)
1576
1577         recs = self.ldap_get_dns_records(name3)
1578         self.assertEqual(len(recs), 2)
1579         txts = {str(r.data.str) for r in recs}
1580         self.assertEqual(txts, {str(txt2), str(txt3)})
1581         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1582         self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1583
1584         recs = self.ldap_get_dns_records(name4)
1585         self.assertEqual(len(recs), 1)
1586         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1587         records = self.ldap_get_records(name4)
1588         self.assertTrue("dNSTombstoned" in records[0])
1589         self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1590
1591         recs = self.ldap_get_dns_records(name5)
1592         self.assertEqual(len(recs), 1)
1593         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1594         records = self.ldap_get_records(name5)
1595         self.assertTrue("dNSTombstoned" in records[0])
1596         self.assertEqual(records[0]["dNSTombstoned"][0], b"TRUE")
1597
1598         for make_it_work in [False, True]:
1599             inc = -1 if make_it_work else 1
1600
1601             def mod_ts(rec):
1602                 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1603             self.ldap_modify_dnsrecs(name, mod_ts)
1604             dsdb._dns_delete_tombstones(self.samdb)
1605             recs = self.ldap_get_records(name)
1606             if make_it_work:
1607                 self.assertEqual(len(recs), 0)
1608             else:
1609                 self.assertEqual(len(recs), 1)
1610
1611     def test_fully_qualified_zone(self):
1612
1613         def create_zone_expect_exists(zone):
1614             try:
1615                 zone_create = self.make_zone_obj(zone)
1616                 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1617                 zc_type = dnsserver.DNSSRV_TYPEID_ZONE_CREATE
1618                 self.rpc_conn.DnssrvOperation2(client_version,
1619                                                0,
1620                                                self.server_ip,
1621                                                None,
1622                                                0,
1623                                                'ZoneCreate',
1624                                                zc_type,
1625                                                zone_create)
1626             except WERRORError as e:
1627                 enum, _ = e.args
1628                 if enum != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS:
1629                     self.fail(e)
1630                 return
1631             self.fail("Zone {} should already exist".format(zone))
1632
1633         # Create unqualified, then check creating qualified fails.
1634         self.create_zone(self.zone)
1635         create_zone_expect_exists(self.zone + '.')
1636
1637         # Same again, but the other way around.
1638         self.create_zone(self.zone + '2.')
1639         create_zone_expect_exists(self.zone + '2')
1640
1641         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1642         request_filter = dnsserver.DNS_ZONE_REQUEST_PRIMARY
1643         tid = dnsserver.DNSSRV_TYPEID_DWORD
1644         typeid, res = self.rpc_conn.DnssrvComplexOperation2(client_version,
1645                                                             0,
1646                                                             self.server_ip,
1647                                                             None,
1648                                                             'EnumZones',
1649                                                             tid,
1650                                                             request_filter)
1651
1652         self.delete_zone(self.zone)
1653         self.delete_zone(self.zone + '2')
1654
1655         # Two zones should've been created, neither of them fully qualified.
1656         zones_we_just_made = []
1657         zones = [str(z.pszZoneName) for z in res.ZoneArray]
1658         for zone in zones:
1659             if zone.startswith(self.zone):
1660                 zones_we_just_made.append(zone)
1661         self.assertEqual(len(zones_we_just_made), 2)
1662         self.assertEqual(set(zones_we_just_made), {self.zone + '2', self.zone})
1663
1664     def delete_zone(self, zone):
1665         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1666                                        0,
1667                                        self.server_ip,
1668                                        zone,
1669                                        0,
1670                                        'DeleteZoneFromDs',
1671                                        dnsserver.DNSSRV_TYPEID_NULL,
1672                                        None)
1673
1674     def test_soa_query(self):
1675         zone = "test.lan"
1676         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1677         questions = []
1678
1679         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1680         questions.append(q)
1681         self.finish_name_packet(p, questions)
1682
1683         (response, response_packet) =\
1684             self.dns_transaction_udp(p, host=server_ip)
1685         # Windows returns OK while BIND logically seems to return NXDOMAIN
1686         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1687         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1688         self.assertEqual(response.ancount, 0)
1689
1690         self.create_zone(zone)
1691         (response, response_packet) =\
1692             self.dns_transaction_udp(p, host=server_ip)
1693         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1694         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1695         self.assertEqual(response.ancount, 1)
1696         self.assertEqual(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1697
1698         self.delete_zone(zone)
1699         (response, response_packet) =\
1700             self.dns_transaction_udp(p, host=server_ip)
1701         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1702         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1703         self.assertEqual(response.ancount, 0)
1704
1705
1706 class TestRPCRoundtrip(DNSTest):
1707     def setUp(self):
1708         super(TestRPCRoundtrip, self).setUp()
1709         global server, server_ip, lp, creds
1710         self.server = server_name
1711         self.server_ip = server_ip
1712         self.lp = lp
1713         self.creds = creds
1714         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1715                                             (self.server_ip),
1716                                             self.lp,
1717                                             self.creds)
1718
1719     def tearDown(self):
1720         super(TestRPCRoundtrip, self).tearDown()
1721
1722     def rpc_update(self, fqn=None, data=None, wType=None, delete=False):
1723         fqn = fqn or ("rpctestrec." + self.get_dns_domain())
1724
1725         rec = data_to_dns_record(wType, data)
1726         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1727         add_rec_buf.rec = rec
1728
1729         add_arg = add_rec_buf
1730         del_arg = None
1731         if delete:
1732             add_arg = None
1733             del_arg = add_rec_buf
1734
1735         self.rpc_conn.DnssrvUpdateRecord2(
1736             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1737             0,
1738             self.server_ip,
1739             self.get_dns_domain(),
1740             fqn,
1741             add_arg,
1742             del_arg)
1743
1744     def test_rpc_self_referencing_cname(self):
1745         cname = "cnametest2_unqual_rec_loop"
1746         cname_fqn = "%s.%s" % (cname, self.get_dns_domain())
1747
1748         try:
1749             self.rpc_update(fqn=cname, data=cname_fqn,
1750                             wType=dnsp.DNS_TYPE_CNAME, delete=True)
1751         except WERRORError as e:
1752             if e.args[0] != werror.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST:
1753                 self.fail("RPC DNS gaven wrong error on pre-test cleanup "
1754                           "for self referencing CNAME: %s" % e.args[0])
1755
1756         try:
1757             self.rpc_update(fqn=cname, wType=dnsp.DNS_TYPE_CNAME, data=cname_fqn)
1758         except WERRORError as e:
1759             if e.args[0] != werror.WERR_DNS_ERROR_CNAME_LOOP:
1760                 self.fail("RPC DNS gaven wrong error on insertion of "
1761                           "self referencing CNAME: %s" % e.args[0])
1762             return
1763
1764         self.fail("RPC DNS allowed insertion of self referencing CNAME")
1765
1766     def test_update_add_txt_rpc_to_dns(self):
1767         prefix, txt = 'rpctextrec', ['"This is a test"']
1768
1769         name = "%s.%s" % (prefix, self.get_dns_domain())
1770
1771         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1772         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1773         add_rec_buf.rec = rec
1774         try:
1775             self.rpc_conn.DnssrvUpdateRecord2(
1776                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1777                 0,
1778                 self.server_ip,
1779                 self.get_dns_domain(),
1780                 name,
1781                 add_rec_buf,
1782                 None)
1783
1784         except WERRORError as e:
1785             self.fail(str(e))
1786
1787         try:
1788             self.check_query_txt(prefix, txt)
1789         finally:
1790             self.rpc_conn.DnssrvUpdateRecord2(
1791                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1792                 0,
1793                 self.server_ip,
1794                 self.get_dns_domain(),
1795                 name,
1796                 None,
1797                 add_rec_buf)
1798
1799     def test_update_add_null_padded_txt_record(self):
1800         "test adding records works"
1801         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1802         p = self.make_txt_update(prefix, txt)
1803         (response, response_packet) =\
1804             self.dns_transaction_udp(p, host=server_ip)
1805         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1806         self.check_query_txt(prefix, txt)
1807         self.assertIsNotNone(
1808             dns_record_match(self.rpc_conn,
1809                              self.server_ip,
1810                              self.get_dns_domain(),
1811                              "%s.%s" % (prefix, self.get_dns_domain()),
1812                              dnsp.DNS_TYPE_TXT,
1813                              '"\\"This is a test\\"" "" ""'))
1814
1815         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1816         p = self.make_txt_update(prefix, txt)
1817         (response, response_packet) =\
1818             self.dns_transaction_udp(p, host=server_ip)
1819         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1820         self.check_query_txt(prefix, txt)
1821         self.assertIsNotNone(
1822             dns_record_match(
1823                 self.rpc_conn,
1824                 self.server_ip,
1825                 self.get_dns_domain(),
1826                 "%s.%s" % (prefix, self.get_dns_domain()),
1827                 dnsp.DNS_TYPE_TXT,
1828                 '"\\"This is a test\\"" "" "" "more text"'))
1829
1830         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1831         p = self.make_txt_update(prefix, txt)
1832         (response, response_packet) =\
1833             self.dns_transaction_udp(p, host=server_ip)
1834         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1835         self.check_query_txt(prefix, txt)
1836         self.assertIsNotNone(
1837             dns_record_match(
1838                 self.rpc_conn,
1839                 self.server_ip,
1840                 self.get_dns_domain(),
1841                 "%s.%s" % (prefix, self.get_dns_domain()),
1842                 dnsp.DNS_TYPE_TXT,
1843                 '"" "" "\\"This is a test\\""'))
1844
1845     def test_update_add_padding_rpc_to_dns(self):
1846         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1847         prefix = 'rpc' + prefix
1848         name = "%s.%s" % (prefix, self.get_dns_domain())
1849
1850         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1851                                  '"\\"This is a test\\"" "" ""')
1852         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1853         add_rec_buf.rec = rec
1854         try:
1855             self.rpc_conn.DnssrvUpdateRecord2(
1856                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1857                 0,
1858                 self.server_ip,
1859                 self.get_dns_domain(),
1860                 name,
1861                 add_rec_buf,
1862                 None)
1863
1864         except WERRORError as e:
1865             self.fail(str(e))
1866
1867         try:
1868             self.check_query_txt(prefix, txt)
1869         finally:
1870             self.rpc_conn.DnssrvUpdateRecord2(
1871                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1872                 0,
1873                 self.server_ip,
1874                 self.get_dns_domain(),
1875                 name,
1876                 None,
1877                 add_rec_buf)
1878
1879         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1880         prefix = 'rpc' + prefix
1881         name = "%s.%s" % (prefix, self.get_dns_domain())
1882
1883         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1884                                  '"\\"This is a test\\"" "" "" "more text"')
1885         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1886         add_rec_buf.rec = rec
1887         try:
1888             self.rpc_conn.DnssrvUpdateRecord2(
1889                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1890                 0,
1891                 self.server_ip,
1892                 self.get_dns_domain(),
1893                 name,
1894                 add_rec_buf,
1895                 None)
1896
1897         except WERRORError as e:
1898             self.fail(str(e))
1899
1900         try:
1901             self.check_query_txt(prefix, txt)
1902         finally:
1903             self.rpc_conn.DnssrvUpdateRecord2(
1904                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1905                 0,
1906                 self.server_ip,
1907                 self.get_dns_domain(),
1908                 name,
1909                 None,
1910                 add_rec_buf)
1911
1912         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1913         prefix = 'rpc' + prefix
1914         name = "%s.%s" % (prefix, self.get_dns_domain())
1915
1916         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1917                                  '"" "" "\\"This is a test\\""')
1918         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1919         add_rec_buf.rec = rec
1920         try:
1921             self.rpc_conn.DnssrvUpdateRecord2(
1922                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1923                 0,
1924                 self.server_ip,
1925                 self.get_dns_domain(),
1926                 name,
1927                 add_rec_buf,
1928                 None)
1929         except WERRORError as e:
1930             self.fail(str(e))
1931
1932         try:
1933             self.check_query_txt(prefix, txt)
1934         finally:
1935             self.rpc_conn.DnssrvUpdateRecord2(
1936                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1937                 0,
1938                 self.server_ip,
1939                 self.get_dns_domain(),
1940                 name,
1941                 None,
1942                 add_rec_buf)
1943
1944     # Test is incomplete due to strlen against txt records
1945     def test_update_add_null_char_txt_record(self):
1946         "test adding records works"
1947         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1948         p = self.make_txt_update(prefix, txt)
1949         (response, response_packet) =\
1950             self.dns_transaction_udp(p, host=server_ip)
1951         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1952         self.check_query_txt(prefix, ['NULL'])
1953         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1954                                               self.get_dns_domain(),
1955                                               "%s.%s" % (prefix, self.get_dns_domain()),
1956                                               dnsp.DNS_TYPE_TXT, '"NULL"'))
1957
1958         prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1959         p = self.make_txt_update(prefix, txt)
1960         (response, response_packet) =\
1961             self.dns_transaction_udp(p, host=server_ip)
1962         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1963         self.check_query_txt(prefix, ['NULL', 'NULL'])
1964         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1965                                               self.get_dns_domain(),
1966                                               "%s.%s" % (prefix, self.get_dns_domain()),
1967                                               dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1968
1969     def test_update_add_null_char_rpc_to_dns(self):
1970         prefix = 'rpcnulltextrec'
1971         name = "%s.%s" % (prefix, self.get_dns_domain())
1972
1973         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1974         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1975         add_rec_buf.rec = rec
1976         try:
1977             self.rpc_conn.DnssrvUpdateRecord2(
1978                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1979                 0,
1980                 self.server_ip,
1981                 self.get_dns_domain(),
1982                 name,
1983                 add_rec_buf,
1984                 None)
1985
1986         except WERRORError as e:
1987             self.fail(str(e))
1988
1989         try:
1990             self.check_query_txt(prefix, ['NULL'])
1991         finally:
1992             self.rpc_conn.DnssrvUpdateRecord2(
1993                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1994                 0,
1995                 self.server_ip,
1996                 self.get_dns_domain(),
1997                 name,
1998                 None,
1999                 add_rec_buf)
2000
2001     def test_update_add_hex_char_txt_record(self):
2002         "test adding records works"
2003         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
2004         p = self.make_txt_update(prefix, txt)
2005         (response, response_packet) =\
2006             self.dns_transaction_udp(p, host=server_ip)
2007         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2008         self.check_query_txt(prefix, txt)
2009         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2010                                               self.get_dns_domain(),
2011                                               "%s.%s" % (prefix, self.get_dns_domain()),
2012                                               dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
2013
2014     def test_update_add_hex_rpc_to_dns(self):
2015         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
2016         prefix = 'rpc' + prefix
2017         name = "%s.%s" % (prefix, self.get_dns_domain())
2018
2019         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
2020         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2021         add_rec_buf.rec = rec
2022         try:
2023             self.rpc_conn.DnssrvUpdateRecord2(
2024                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2025                 0,
2026                 self.server_ip,
2027                 self.get_dns_domain(),
2028                 name,
2029                 add_rec_buf,
2030                 None)
2031
2032         except WERRORError as e:
2033             self.fail(str(e))
2034
2035         try:
2036             self.check_query_txt(prefix, txt)
2037         finally:
2038             self.rpc_conn.DnssrvUpdateRecord2(
2039                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2040                 0,
2041                 self.server_ip,
2042                 self.get_dns_domain(),
2043                 name,
2044                 None,
2045                 add_rec_buf)
2046
2047     def test_update_add_slash_txt_record(self):
2048         "test adding records works"
2049         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
2050         p = self.make_txt_update(prefix, txt)
2051         (response, response_packet) =\
2052             self.dns_transaction_udp(p, host=server_ip)
2053         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2054         self.check_query_txt(prefix, txt)
2055         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2056                                               self.get_dns_domain(),
2057                                               "%s.%s" % (prefix, self.get_dns_domain()),
2058                                               dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
2059
2060     # This test fails against Windows as it eliminates slashes in RPC
2061     # One typical use for a slash is in records like 'var=value' to
2062     # escape '=' characters.
2063     def test_update_add_slash_rpc_to_dns(self):
2064         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
2065         prefix = 'rpc' + prefix
2066         name = "%s.%s" % (prefix, self.get_dns_domain())
2067
2068         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
2069         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2070         add_rec_buf.rec = rec
2071         try:
2072             self.rpc_conn.DnssrvUpdateRecord2(
2073                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2074                 0,
2075                 self.server_ip,
2076                 self.get_dns_domain(),
2077                 name,
2078                 add_rec_buf,
2079                 None)
2080
2081         except WERRORError as e:
2082             self.fail(str(e))
2083
2084         try:
2085             self.check_query_txt(prefix, txt)
2086
2087         finally:
2088             self.rpc_conn.DnssrvUpdateRecord2(
2089                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2090                 0,
2091                 self.server_ip,
2092                 self.get_dns_domain(),
2093                 name,
2094                 None,
2095                 add_rec_buf)
2096
2097     def test_update_add_two_txt_records(self):
2098         "test adding two txt records works"
2099         prefix, txt = 'textrec2', ['"This is a test"',
2100                                    '"and this is a test, too"']
2101         p = self.make_txt_update(prefix, txt)
2102         (response, response_packet) =\
2103             self.dns_transaction_udp(p, host=server_ip)
2104         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2105         self.check_query_txt(prefix, txt)
2106         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2107                                               self.get_dns_domain(),
2108                                               "%s.%s" % (prefix, self.get_dns_domain()),
2109                                               dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
2110                                               ' "\\"and this is a test, too\\""'))
2111
2112     def test_update_add_two_rpc_to_dns(self):
2113         prefix, txt = 'textrec2', ['"This is a test"',
2114                                    '"and this is a test, too"']
2115         prefix = 'rpc' + prefix
2116         name = "%s.%s" % (prefix, self.get_dns_domain())
2117
2118         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
2119                                  '"\\"This is a test\\""' +
2120                                  ' "\\"and this is a test, too\\""')
2121         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2122         add_rec_buf.rec = rec
2123         try:
2124             self.rpc_conn.DnssrvUpdateRecord2(
2125                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2126                 0,
2127                 self.server_ip,
2128                 self.get_dns_domain(),
2129                 name,
2130                 add_rec_buf,
2131                 None)
2132
2133         except WERRORError as e:
2134             self.fail(str(e))
2135
2136         try:
2137             self.check_query_txt(prefix, txt)
2138         finally:
2139             self.rpc_conn.DnssrvUpdateRecord2(
2140                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2141                 0,
2142                 self.server_ip,
2143                 self.get_dns_domain(),
2144                 name,
2145                 None,
2146                 add_rec_buf)
2147
2148     def test_update_add_empty_txt_records(self):
2149         "test adding two txt records works"
2150         prefix, txt = 'emptytextrec', []
2151         p = self.make_txt_update(prefix, txt)
2152         (response, response_packet) =\
2153             self.dns_transaction_udp(p, host=server_ip)
2154         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2155         self.check_query_txt(prefix, txt)
2156         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2157                                               self.get_dns_domain(),
2158                                               "%s.%s" % (prefix, self.get_dns_domain()),
2159                                               dnsp.DNS_TYPE_TXT, ''))
2160
2161     def test_update_add_empty_rpc_to_dns(self):
2162         prefix, txt = 'rpcemptytextrec', []
2163
2164         name = "%s.%s" % (prefix, self.get_dns_domain())
2165
2166         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
2167         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2168         add_rec_buf.rec = rec
2169         try:
2170             self.rpc_conn.DnssrvUpdateRecord2(
2171                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2172                 0,
2173                 self.server_ip,
2174                 self.get_dns_domain(),
2175                 name,
2176                 add_rec_buf,
2177                 None)
2178         except WERRORError as e:
2179             self.fail(str(e))
2180
2181         try:
2182             self.check_query_txt(prefix, txt)
2183         finally:
2184             self.rpc_conn.DnssrvUpdateRecord2(
2185                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2186                 0,
2187                 self.server_ip,
2188                 self.get_dns_domain(),
2189                 name,
2190                 None,
2191                 add_rec_buf)
2192
2193
2194 TestProgram(module=__name__, opts=subunitopts)