CVE-2018-14629: Tests to expose regression from dns cname loop fix
[sfrench/samba-autobuild/.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
24 import ldb
25 from ldb import ERR_OPERATIONS_ERROR
26 import os
27 import sys
28 import struct
29 import socket
30 import samba.ndr as ndr
31 from samba import credentials
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
38 import optparse
39
40
41 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
42 sambaopts = options.SambaOptions(parser)
43 parser.add_option_group(sambaopts)
44
45 # This timeout only has relevance when testing against Windows
46 # Format errors tend to return patchy responses, so a timeout is needed.
47 parser.add_option("--timeout", type="int", dest="timeout",
48                   help="Specify timeout for DNS requests")
49
50 # use command line creds if available
51 credopts = options.CredentialsOptions(parser)
52 parser.add_option_group(credopts)
53 subunitopts = SubunitOptions(parser)
54 parser.add_option_group(subunitopts)
55
56 opts, args = parser.parse_args()
57
58 lp = sambaopts.get_loadparm()
59 creds = credopts.get_credentials(lp)
60
61 timeout = opts.timeout
62
63 if len(args) < 2:
64     parser.print_usage()
65     sys.exit(1)
66
67 server_name = args[0]
68 server_ip = args[1]
69 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
70
71
72 class TestSimpleQueries(DNSTest):
73     def setUp(self):
74         super(TestSimpleQueries, self).setUp()
75         global server, server_ip, lp, creds, timeout
76         self.server = server_name
77         self.server_ip = server_ip
78         self.lp = lp
79         self.creds = creds
80         self.timeout = timeout
81
82     def test_one_a_query(self):
83         "create a query packet containing one query record"
84         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
85         questions = []
86
87         name = "%s.%s" % (self.server, self.get_dns_domain())
88         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
89         print("asking for ", q.name)
90         questions.append(q)
91
92         self.finish_name_packet(p, questions)
93         (response, response_packet) =\
94             self.dns_transaction_udp(p, host=server_ip)
95         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97         self.assertEquals(response.ancount, 1)
98         self.assertEquals(response.answers[0].rdata,
99                           self.server_ip)
100
101     def test_one_SOA_query(self):
102         "create a query packet containing one query record for the SOA"
103         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
104         questions = []
105
106         name = "%s" % (self.get_dns_domain())
107         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
108         print("asking for ", q.name)
109         questions.append(q)
110
111         self.finish_name_packet(p, questions)
112         (response, response_packet) =\
113             self.dns_transaction_udp(p, host=server_ip)
114         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
115         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
116         self.assertEquals(response.ancount, 1)
117         self.assertEquals(
118             response.answers[0].rdata.mname.upper(),
119             ("%s.%s" % (self.server, self.get_dns_domain())).upper())
120
121     def test_one_a_query_tcp(self):
122         "create a query packet containing one query record via TCP"
123         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
124         questions = []
125
126         name = "%s.%s" % (self.server, self.get_dns_domain())
127         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
128         print("asking for ", q.name)
129         questions.append(q)
130
131         self.finish_name_packet(p, questions)
132         (response, response_packet) =\
133             self.dns_transaction_tcp(p, host=server_ip)
134         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
135         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
136         self.assertEquals(response.ancount, 1)
137         self.assertEquals(response.answers[0].rdata,
138                           self.server_ip)
139
140     def test_one_mx_query(self):
141         "create a query packet causing an empty RCODE_OK answer"
142         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
143         questions = []
144
145         name = "%s.%s" % (self.server, self.get_dns_domain())
146         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
147         print("asking for ", q.name)
148         questions.append(q)
149
150         self.finish_name_packet(p, questions)
151         (response, response_packet) =\
152             self.dns_transaction_udp(p, host=server_ip)
153         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
154         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
155         self.assertEquals(response.ancount, 0)
156
157         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
158         questions = []
159
160         name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
161         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
162         print("asking for ", q.name)
163         questions.append(q)
164
165         self.finish_name_packet(p, questions)
166         (response, response_packet) =\
167             self.dns_transaction_udp(p, host=server_ip)
168         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
169         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
170         self.assertEquals(response.ancount, 0)
171
172     def test_two_queries(self):
173         "create a query packet containing two query records"
174         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
175         questions = []
176
177         name = "%s.%s" % (self.server, self.get_dns_domain())
178         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
179         questions.append(q)
180
181         name = "%s.%s" % ('bogusname', self.get_dns_domain())
182         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
183         questions.append(q)
184
185         self.finish_name_packet(p, questions)
186         try:
187             (response, response_packet) =\
188                 self.dns_transaction_udp(p, host=server_ip)
189             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
190         except socket.timeout:
191             # Windows chooses not to respond to incorrectly formatted queries.
192             # Although this appears to be non-deterministic even for the same
193             # request twice, it also appears to be based on a how poorly the
194             # request is formatted.
195             pass
196
197     def test_qtype_all_query(self):
198         "create a QTYPE_ALL query"
199         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
200         questions = []
201
202         name = "%s.%s" % (self.server, self.get_dns_domain())
203         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
204         print("asking for ", q.name)
205         questions.append(q)
206
207         self.finish_name_packet(p, questions)
208         (response, response_packet) =\
209             self.dns_transaction_udp(p, host=server_ip)
210
211         num_answers = 1
212         dc_ipv6 = os.getenv('SERVER_IPV6')
213         if dc_ipv6 is not None:
214             num_answers += 1
215
216         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
217         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
218         self.assertEquals(response.ancount, num_answers)
219         self.assertEquals(response.answers[0].rdata,
220                           self.server_ip)
221         if dc_ipv6 is not None:
222             self.assertEquals(response.answers[1].rdata, dc_ipv6)
223
224     def test_qclass_none_query(self):
225         "create a QCLASS_NONE query"
226         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
227         questions = []
228
229         name = "%s.%s" % (self.server, self.get_dns_domain())
230         q = self.make_name_question(
231             name,
232             dns.DNS_QTYPE_ALL,
233             dns.DNS_QCLASS_NONE)
234         questions.append(q)
235
236         self.finish_name_packet(p, questions)
237         try:
238             (response, response_packet) =\
239                 self.dns_transaction_udp(p, host=server_ip)
240             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
241         except socket.timeout:
242             # Windows chooses not to respond to incorrectly formatted queries.
243             # Although this appears to be non-deterministic even for the same
244             # request twice, it also appears to be based on a how poorly the
245             # request is formatted.
246             pass
247
248     def test_soa_hostname_query(self):
249         "create a SOA query for a hostname"
250         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
251         questions = []
252
253         name = "%s.%s" % (self.server, self.get_dns_domain())
254         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
255         questions.append(q)
256
257         self.finish_name_packet(p, questions)
258         (response, response_packet) =\
259             self.dns_transaction_udp(p, host=server_ip)
260         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262         # We don't get SOA records for single hosts
263         self.assertEquals(response.ancount, 0)
264         # But we do respond with an authority section
265         self.assertEqual(response.nscount, 1)
266
267     def test_soa_domain_query(self):
268         "create a SOA query for a domain"
269         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
270         questions = []
271
272         name = self.get_dns_domain()
273         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
274         questions.append(q)
275
276         self.finish_name_packet(p, questions)
277         (response, response_packet) =\
278             self.dns_transaction_udp(p, host=server_ip)
279         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
280         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
281         self.assertEquals(response.ancount, 1)
282         self.assertEquals(response.answers[0].rdata.minimum, 3600)
283
284
285 class TestDNSUpdates(DNSTest):
286     def setUp(self):
287         super(TestDNSUpdates, self).setUp()
288         global server, server_ip, lp, creds, timeout
289         self.server = server_name
290         self.server_ip = server_ip
291         self.lp = lp
292         self.creds = creds
293         self.timeout = timeout
294
295     def test_two_updates(self):
296         "create two update requests"
297         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
298         updates = []
299
300         name = "%s.%s" % (self.server, self.get_dns_domain())
301         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
302         updates.append(u)
303
304         name = self.get_dns_domain()
305         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
306         updates.append(u)
307
308         self.finish_name_packet(p, updates)
309         try:
310             (response, response_packet) =\
311                 self.dns_transaction_udp(p, host=server_ip)
312             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
313         except socket.timeout:
314             # Windows chooses not to respond to incorrectly formatted queries.
315             # Although this appears to be non-deterministic even for the same
316             # request twice, it also appears to be based on a how poorly the
317             # request is formatted.
318             pass
319
320     def test_update_wrong_qclass(self):
321         "create update with DNS_QCLASS_NONE"
322         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
323         updates = []
324
325         name = self.get_dns_domain()
326         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
327         updates.append(u)
328
329         self.finish_name_packet(p, updates)
330         (response, response_packet) =\
331             self.dns_transaction_udp(p, host=server_ip)
332         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
333
334     def test_update_prereq_with_non_null_ttl(self):
335         "test update with a non-null TTL"
336         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
337         updates = []
338
339         name = self.get_dns_domain()
340
341         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
342         updates.append(u)
343         self.finish_name_packet(p, updates)
344
345         prereqs = []
346         r = dns.res_rec()
347         r.name = "%s.%s" % (self.server, self.get_dns_domain())
348         r.rr_type = dns.DNS_QTYPE_TXT
349         r.rr_class = dns.DNS_QCLASS_NONE
350         r.ttl = 1
351         r.length = 0
352         prereqs.append(r)
353
354         p.ancount = len(prereqs)
355         p.answers = prereqs
356
357         try:
358             (response, response_packet) =\
359                 self.dns_transaction_udp(p, host=server_ip)
360             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
361         except socket.timeout:
362             # Windows chooses not to respond to incorrectly formatted queries.
363             # Although this appears to be non-deterministic even for the same
364             # request twice, it also appears to be based on a how poorly the
365             # request is formatted.
366             pass
367
368     def test_update_prereq_with_non_null_length(self):
369         "test update with a non-null length"
370         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
371         updates = []
372
373         name = self.get_dns_domain()
374
375         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
376         updates.append(u)
377         self.finish_name_packet(p, updates)
378
379         prereqs = []
380         r = dns.res_rec()
381         r.name = "%s.%s" % (self.server, self.get_dns_domain())
382         r.rr_type = dns.DNS_QTYPE_TXT
383         r.rr_class = dns.DNS_QCLASS_ANY
384         r.ttl = 0
385         r.length = 1
386         prereqs.append(r)
387
388         p.ancount = len(prereqs)
389         p.answers = prereqs
390
391         (response, response_packet) =\
392             self.dns_transaction_udp(p, host=server_ip)
393         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
394
395     def test_update_prereq_nonexisting_name(self):
396         "test update with a nonexisting name"
397         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
398         updates = []
399
400         name = self.get_dns_domain()
401
402         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
403         updates.append(u)
404         self.finish_name_packet(p, updates)
405
406         prereqs = []
407         r = dns.res_rec()
408         r.name = "idontexist.%s" % self.get_dns_domain()
409         r.rr_type = dns.DNS_QTYPE_TXT
410         r.rr_class = dns.DNS_QCLASS_ANY
411         r.ttl = 0
412         r.length = 0
413         prereqs.append(r)
414
415         p.ancount = len(prereqs)
416         p.answers = prereqs
417
418         (response, response_packet) =\
419             self.dns_transaction_udp(p, host=server_ip)
420         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
421
422     def test_update_add_txt_record(self):
423         "test adding records works"
424         prefix, txt = 'textrec', ['"This is a test"']
425         p = self.make_txt_update(prefix, txt)
426         (response, response_packet) =\
427             self.dns_transaction_udp(p, host=server_ip)
428         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
429         self.check_query_txt(prefix, txt)
430
431     def test_delete_record(self):
432         "Test if deleting records works"
433
434         NAME = "deleterec.%s" % self.get_dns_domain()
435
436         # First, create a record to make sure we have a record to delete.
437         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
438         updates = []
439
440         name = self.get_dns_domain()
441
442         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
443         updates.append(u)
444         self.finish_name_packet(p, updates)
445
446         updates = []
447         r = dns.res_rec()
448         r.name = NAME
449         r.rr_type = dns.DNS_QTYPE_TXT
450         r.rr_class = dns.DNS_QCLASS_IN
451         r.ttl = 900
452         r.length = 0xffff
453         rdata = self.make_txt_record(['"This is a test"'])
454         r.rdata = rdata
455         updates.append(r)
456         p.nscount = len(updates)
457         p.nsrecs = updates
458
459         (response, response_packet) =\
460             self.dns_transaction_udp(p, host=server_ip)
461         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
462
463         # Now check the record is around
464         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
465         questions = []
466         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
467         questions.append(q)
468
469         self.finish_name_packet(p, questions)
470         (response, response_packet) =\
471             self.dns_transaction_udp(p, host=server_ip)
472         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
473
474         # Now delete the record
475         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
476         updates = []
477
478         name = self.get_dns_domain()
479
480         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
481         updates.append(u)
482         self.finish_name_packet(p, updates)
483
484         updates = []
485         r = dns.res_rec()
486         r.name = NAME
487         r.rr_type = dns.DNS_QTYPE_TXT
488         r.rr_class = dns.DNS_QCLASS_NONE
489         r.ttl = 0
490         r.length = 0xffff
491         rdata = self.make_txt_record(['"This is a test"'])
492         r.rdata = rdata
493         updates.append(r)
494         p.nscount = len(updates)
495         p.nsrecs = updates
496
497         (response, response_packet) =\
498             self.dns_transaction_udp(p, host=server_ip)
499         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
500
501         # And finally check it's gone
502         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
503         questions = []
504
505         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
506         questions.append(q)
507
508         self.finish_name_packet(p, questions)
509         (response, response_packet) =\
510             self.dns_transaction_udp(p, host=server_ip)
511         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
512
513     def test_readd_record(self):
514         "Test if adding, deleting and then readding a records works"
515
516         NAME = "readdrec.%s" % self.get_dns_domain()
517
518         # Create the record
519         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
520         updates = []
521
522         name = self.get_dns_domain()
523
524         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
525         updates.append(u)
526         self.finish_name_packet(p, updates)
527
528         updates = []
529         r = dns.res_rec()
530         r.name = NAME
531         r.rr_type = dns.DNS_QTYPE_TXT
532         r.rr_class = dns.DNS_QCLASS_IN
533         r.ttl = 900
534         r.length = 0xffff
535         rdata = self.make_txt_record(['"This is a test"'])
536         r.rdata = rdata
537         updates.append(r)
538         p.nscount = len(updates)
539         p.nsrecs = updates
540
541         (response, response_packet) =\
542             self.dns_transaction_udp(p, host=server_ip)
543         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
544
545         # Now check the record is around
546         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
547         questions = []
548         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
549         questions.append(q)
550
551         self.finish_name_packet(p, questions)
552         (response, response_packet) =\
553             self.dns_transaction_udp(p, host=server_ip)
554         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
555
556         # Now delete the record
557         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
558         updates = []
559
560         name = self.get_dns_domain()
561
562         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
563         updates.append(u)
564         self.finish_name_packet(p, updates)
565
566         updates = []
567         r = dns.res_rec()
568         r.name = NAME
569         r.rr_type = dns.DNS_QTYPE_TXT
570         r.rr_class = dns.DNS_QCLASS_NONE
571         r.ttl = 0
572         r.length = 0xffff
573         rdata = self.make_txt_record(['"This is a test"'])
574         r.rdata = rdata
575         updates.append(r)
576         p.nscount = len(updates)
577         p.nsrecs = updates
578
579         (response, response_packet) =\
580             self.dns_transaction_udp(p, host=server_ip)
581         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
582
583         # check it's gone
584         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
585         questions = []
586
587         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
588         questions.append(q)
589
590         self.finish_name_packet(p, questions)
591         (response, response_packet) =\
592             self.dns_transaction_udp(p, host=server_ip)
593         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
594
595         # recreate the record
596         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
597         updates = []
598
599         name = self.get_dns_domain()
600
601         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
602         updates.append(u)
603         self.finish_name_packet(p, updates)
604
605         updates = []
606         r = dns.res_rec()
607         r.name = NAME
608         r.rr_type = dns.DNS_QTYPE_TXT
609         r.rr_class = dns.DNS_QCLASS_IN
610         r.ttl = 900
611         r.length = 0xffff
612         rdata = self.make_txt_record(['"This is a test"'])
613         r.rdata = rdata
614         updates.append(r)
615         p.nscount = len(updates)
616         p.nsrecs = updates
617
618         (response, response_packet) =\
619             self.dns_transaction_udp(p, host=server_ip)
620         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
621
622         # Now check the record is around
623         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
624         questions = []
625         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
626         questions.append(q)
627
628         self.finish_name_packet(p, questions)
629         (response, response_packet) =\
630             self.dns_transaction_udp(p, host=server_ip)
631         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
632
633     def test_update_add_mx_record(self):
634         "test adding MX records works"
635         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
636         updates = []
637
638         name = self.get_dns_domain()
639
640         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
641         updates.append(u)
642         self.finish_name_packet(p, updates)
643
644         updates = []
645         r = dns.res_rec()
646         r.name = "%s" % self.get_dns_domain()
647         r.rr_type = dns.DNS_QTYPE_MX
648         r.rr_class = dns.DNS_QCLASS_IN
649         r.ttl = 900
650         r.length = 0xffff
651         rdata = dns.mx_record()
652         rdata.preference = 10
653         rdata.exchange = 'mail.%s' % self.get_dns_domain()
654         r.rdata = rdata
655         updates.append(r)
656         p.nscount = len(updates)
657         p.nsrecs = updates
658
659         (response, response_packet) =\
660             self.dns_transaction_udp(p, host=server_ip)
661         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
662
663         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
664         questions = []
665
666         name = "%s" % self.get_dns_domain()
667         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
668         questions.append(q)
669
670         self.finish_name_packet(p, questions)
671         (response, response_packet) =\
672             self.dns_transaction_udp(p, host=server_ip)
673         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
674         self.assertEqual(response.ancount, 1)
675         ans = response.answers[0]
676         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
677         self.assertEqual(ans.rdata.preference, 10)
678         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
679
680
681 class TestComplexQueries(DNSTest):
682     def make_dns_update(self, key, value, qtype):
683         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
684
685         name = self.get_dns_domain()
686         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
687         self.finish_name_packet(p, [u])
688
689         r = dns.res_rec()
690         r.name = key
691         r.rr_type = qtype
692         r.rr_class = dns.DNS_QCLASS_IN
693         r.ttl = 900
694         r.length = 0xffff
695         r.rdata = value
696         p.nscount = 1
697         p.nsrecs = [r]
698         (response, response_packet) =\
699             self.dns_transaction_udp(p, host=server_ip)
700         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
701
702     def setUp(self):
703         super(TestComplexQueries, self).setUp()
704
705         global server, server_ip, lp, creds, timeout
706         self.server = server_name
707         self.server_ip = server_ip
708         self.lp = lp
709         self.creds = creds
710         self.timeout = timeout
711
712     def test_one_a_query(self):
713         "create a query packet containing one query record"
714
715         try:
716
717             # Create the record
718             name = "cname_test.%s" % self.get_dns_domain()
719             rdata = "%s.%s" % (self.server, self.get_dns_domain())
720             self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
721
722             p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
723             questions = []
724
725             # Check the record
726             name = "cname_test.%s" % self.get_dns_domain()
727             q = self.make_name_question(name,
728                                         dns.DNS_QTYPE_A,
729                                         dns.DNS_QCLASS_IN)
730             print("asking for ", q.name)
731             questions.append(q)
732
733             self.finish_name_packet(p, questions)
734             (response, response_packet) =\
735                 self.dns_transaction_udp(p, host=self.server_ip)
736             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
737             self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
738             self.assertEquals(response.ancount, 2)
739             self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
740             self.assertEquals(response.answers[0].rdata, "%s.%s" %
741                               (self.server, self.get_dns_domain()))
742             self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
743             self.assertEquals(response.answers[1].rdata,
744                               self.server_ip)
745
746         finally:
747             # Delete the record
748             p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
749             updates = []
750
751             name = self.get_dns_domain()
752
753             u = self.make_name_question(name,
754                                         dns.DNS_QTYPE_SOA,
755                                         dns.DNS_QCLASS_IN)
756             updates.append(u)
757             self.finish_name_packet(p, updates)
758
759             updates = []
760             r = dns.res_rec()
761             r.name = "cname_test.%s" % self.get_dns_domain()
762             r.rr_type = dns.DNS_QTYPE_CNAME
763             r.rr_class = dns.DNS_QCLASS_NONE
764             r.ttl = 0
765             r.length = 0xffff
766             r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
767             updates.append(r)
768             p.nscount = len(updates)
769             p.nsrecs = updates
770
771             (response, response_packet) =\
772                 self.dns_transaction_udp(p, host=self.server_ip)
773             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
774
775     def test_cname_two_chain(self):
776         name0 = "cnamechain0.%s" % self.get_dns_domain()
777         name1 = "cnamechain1.%s" % self.get_dns_domain()
778         name2 = "cnamechain2.%s" % self.get_dns_domain()
779         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
780         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
781         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
782
783         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
784         questions = []
785         q = self.make_name_question(name1, dns.DNS_QTYPE_A,
786                                     dns.DNS_QCLASS_IN)
787         questions.append(q)
788
789         self.finish_name_packet(p, questions)
790         (response, response_packet) =\
791             self.dns_transaction_udp(p, host=server_ip)
792         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
793         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
794         self.assertEquals(response.ancount, 3)
795
796         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
797         self.assertEquals(response.answers[0].name, name1)
798         self.assertEquals(response.answers[0].rdata, name2)
799
800         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
801         self.assertEquals(response.answers[1].name, name2)
802         self.assertEquals(response.answers[1].rdata, name0)
803
804         self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
805         self.assertEquals(response.answers[2].rdata,
806                           self.server_ip)
807
808     def test_invalid_empty_cname(self):
809         name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
810         try:
811             self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
812         except AssertionError:
813             pass
814         else:
815             self.fail("Successfully added empty CNAME, which is invalid.")
816
817     def test_cname_two_chain_not_matching_qtype(self):
818         name0 = "cnamechain0.%s" % self.get_dns_domain()
819         name1 = "cnamechain1.%s" % self.get_dns_domain()
820         name2 = "cnamechain2.%s" % self.get_dns_domain()
821         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
822         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
823         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
824
825         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
826         questions = []
827         q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
828                                     dns.DNS_QCLASS_IN)
829         questions.append(q)
830
831         self.finish_name_packet(p, questions)
832         (response, response_packet) =\
833             self.dns_transaction_udp(p, host=server_ip)
834         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
835         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
836
837         # CNAME should return all intermediate results!
838         # Only the A records exists, not the TXT.
839         self.assertEquals(response.ancount, 2)
840
841         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
842         self.assertEquals(response.answers[0].name, name1)
843         self.assertEquals(response.answers[0].rdata, name2)
844
845         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
846         self.assertEquals(response.answers[1].name, name2)
847         self.assertEquals(response.answers[1].rdata, name0)
848
849     def test_cname_loop(self):
850         cname1 = "cnamelooptestrec." + self.get_dns_domain()
851         cname2 = "cnamelooptestrec2." + self.get_dns_domain()
852         cname3 = "cnamelooptestrec3." + self.get_dns_domain()
853         self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
854         self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
855         self.make_dns_update(cname3, cname1, dnsp.DNS_TYPE_CNAME)
856
857         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
858         questions = []
859
860         q = self.make_name_question(cname1,
861                                     dns.DNS_QTYPE_A,
862                                     dns.DNS_QCLASS_IN)
863         questions.append(q)
864         self.finish_name_packet(p, questions)
865
866         (response, response_packet) =\
867             self.dns_transaction_udp(p, host=self.server_ip)
868
869         max_recursion_depth = 20
870         self.assertEquals(len(response.answers), max_recursion_depth)
871
872     # Make sure cname limit doesn't count other records.  This is a generic
873     # test called in tests below
874     def max_rec_test(self, rtype, rec_gen):
875         name = "limittestrec{0}.{1}".format(rtype, self.get_dns_domain())
876         limit = 20
877         num_recs_to_enter = limit + 5
878
879         for i in range(1, num_recs_to_enter+1):
880             ip = rec_gen(i)
881             self.make_dns_update(name, ip, rtype)
882
883         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
884         questions = []
885
886         q = self.make_name_question(name,
887                                     rtype,
888                                     dns.DNS_QCLASS_IN)
889         questions.append(q)
890         self.finish_name_packet(p, questions)
891
892         (response, response_packet) =\
893             self.dns_transaction_udp(p, host=self.server_ip)
894
895         self.assertEqual(len(response.answers), num_recs_to_enter)
896
897     def test_record_limit_A(self):
898         def ip4_gen(i):
899             return "127.0.0." + str(i)
900         self.max_rec_test(rtype=dns.DNS_QTYPE_A, rec_gen=ip4_gen)
901
902     def test_record_limit_AAAA(self):
903         def ip6_gen(i):
904             return "AAAA:0:0:0:0:0:0:" + str(i)
905         self.max_rec_test(rtype=dns.DNS_QTYPE_AAAA, rec_gen=ip6_gen)
906
907     def test_record_limit_SRV(self):
908         def srv_gen(i):
909             rec = dns.srv_record()
910             rec.priority = 1
911             rec.weight = 1
912             rec.port = 92
913             rec.target = "srvtestrec" + str(i)
914             return rec
915         self.max_rec_test(rtype=dns.DNS_QTYPE_SRV, rec_gen=srv_gen)
916
917     # Same as test_record_limit_A but with a preceding CNAME follow
918     def test_cname_limit(self):
919         cname1 = "cnamelimittestrec." + self.get_dns_domain()
920         cname2 = "cnamelimittestrec2." + self.get_dns_domain()
921         cname3 = "cnamelimittestrec3." + self.get_dns_domain()
922         ip_prefix = '127.0.0.'
923         limit = 20
924         num_recs_to_enter = limit + 5
925
926         self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
927         self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
928         num_arecs_to_enter = num_recs_to_enter - 2
929         for i in range(1, num_arecs_to_enter+1):
930             ip = ip_prefix + str(i)
931             self.make_dns_update(cname3, ip, dns.DNS_QTYPE_A)
932
933         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
934         questions = []
935
936         q = self.make_name_question(cname1,
937                                     dns.DNS_QTYPE_A,
938                                     dns.DNS_QCLASS_IN)
939         questions.append(q)
940         self.finish_name_packet(p, questions)
941
942         (response, response_packet) =\
943             self.dns_transaction_udp(p, host=self.server_ip)
944
945         self.assertEqual(len(response.answers), num_recs_to_enter)
946
947     # ANY query on cname record shouldn't follow the link
948     def test_cname_any_query(self):
949         cname1 = "cnameanytestrec." + self.get_dns_domain()
950         cname2 = "cnameanytestrec2." + self.get_dns_domain()
951         cname3 = "cnameanytestrec3." + self.get_dns_domain()
952
953         self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
954         self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
955
956         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
957         questions = []
958
959         q = self.make_name_question(cname1,
960                                     dns.DNS_QTYPE_ALL,
961                                     dns.DNS_QCLASS_IN)
962         questions.append(q)
963         self.finish_name_packet(p, questions)
964
965         (response, response_packet) =\
966             self.dns_transaction_udp(p, host=self.server_ip)
967
968         self.assertEqual(len(response.answers), 1)
969         self.assertEqual(response.answers[0].name, cname1)
970         self.assertEqual(response.answers[0].rdata, cname2)
971
972
973 class TestInvalidQueries(DNSTest):
974     def setUp(self):
975         super(TestInvalidQueries, self).setUp()
976         global server, server_ip, lp, creds, timeout
977         self.server = server_name
978         self.server_ip = server_ip
979         self.lp = lp
980         self.creds = creds
981         self.timeout = timeout
982
983     def test_one_a_query(self):
984         """send 0 bytes follows by create a query packet
985            containing one query record"""
986
987         s = None
988         try:
989             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
990             s.connect((self.server_ip, 53))
991             s.send("", 0)
992         finally:
993             if s is not None:
994                 s.close()
995
996         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
997         questions = []
998
999         name = "%s.%s" % (self.server, self.get_dns_domain())
1000         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1001         print("asking for ", q.name)
1002         questions.append(q)
1003
1004         self.finish_name_packet(p, questions)
1005         (response, response_packet) =\
1006             self.dns_transaction_udp(p, host=self.server_ip)
1007         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1008         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1009         self.assertEquals(response.ancount, 1)
1010         self.assertEquals(response.answers[0].rdata,
1011                           self.server_ip)
1012
1013     def test_one_a_reply(self):
1014         "send a reply instead of a query"
1015         global timeout
1016
1017         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1018         questions = []
1019
1020         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
1021         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
1022         print("asking for ", q.name)
1023         questions.append(q)
1024
1025         self.finish_name_packet(p, questions)
1026         p.operation |= dns.DNS_FLAG_REPLY
1027         s = None
1028         try:
1029             send_packet = ndr.ndr_pack(p)
1030             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
1031             s.settimeout(timeout)
1032             host = self.server_ip
1033             s.connect((host, 53))
1034             tcp_packet = struct.pack('!H', len(send_packet))
1035             tcp_packet += send_packet
1036             s.send(tcp_packet, 0)
1037             recv_packet = s.recv(0xffff + 2, 0)
1038             self.assertEquals(0, len(recv_packet))
1039         except socket.timeout:
1040             # Windows chooses not to respond to incorrectly formatted queries.
1041             # Although this appears to be non-deterministic even for the same
1042             # request twice, it also appears to be based on a how poorly the
1043             # request is formatted.
1044             pass
1045         finally:
1046             if s is not None:
1047                 s.close()
1048
1049
1050 class TestZones(DNSTest):
1051     def setUp(self):
1052         super(TestZones, self).setUp()
1053         global server, server_ip, lp, creds, timeout
1054         self.server = server_name
1055         self.server_ip = server_ip
1056         self.lp = lp
1057         self.creds = creds
1058         self.timeout = timeout
1059
1060         self.zone = "test.lan"
1061         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1062                                             (self.server_ip),
1063                                             self.lp, self.creds)
1064
1065         self.samdb = SamDB(url="ldap://" + self.server_ip,
1066                            lp=self.get_loadparm(),
1067                            session_info=system_session(),
1068                            credentials=self.creds)
1069         self.zone_dn = "DC=" + self.zone +\
1070                        ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
1071                        str(self.samdb.get_default_basedn())
1072
1073     def tearDown(self):
1074         super(TestZones, self).tearDown()
1075
1076         try:
1077             self.delete_zone(self.zone)
1078         except RuntimeError as e:
1079             (num, string) = e.args
1080             if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
1081                 raise
1082
1083     def create_zone(self, zone, aging_enabled=False):
1084         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1085         zone_create.pszZoneName = zone
1086         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
1087         zone_create.fAging = int(aging_enabled)
1088         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
1089         zone_create.fDsIntegrated = 1
1090         zone_create.fLoadExisting = 1
1091         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
1092         try:
1093             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1094             self.rpc_conn.DnssrvOperation2(client_version,
1095                                            0,
1096                                            self.server_ip,
1097                                            None,
1098                                            0,
1099                                            'ZoneCreate',
1100                                            dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
1101                                            zone_create)
1102         except WERRORError as e:
1103             self.fail(str(e))
1104
1105     def set_params(self, **kwargs):
1106         zone = kwargs.pop('zone', None)
1107         for key, val in kwargs.items():
1108             name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
1109             name_param.dwParam = val
1110             name_param.pszNodeName = key
1111
1112             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1113             nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
1114             try:
1115                 self.rpc_conn.DnssrvOperation2(client_version,
1116                                                0,
1117                                                self.server,
1118                                                zone,
1119                                                0,
1120                                                'ResetDwordProperty',
1121                                                nap_type,
1122                                                name_param)
1123             except WERRORError as e:
1124                 self.fail(str(e))
1125
1126     def ldap_modify_dnsrecs(self, name, func):
1127         dn = 'DC={0},{1}'.format(name, self.zone_dn)
1128         dns_recs = self.ldap_get_dns_records(name)
1129         for rec in dns_recs:
1130             func(rec)
1131         update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1132         self.samdb.modify(ldb.Message.from_dict(self.samdb,
1133                                                 update_dict,
1134                                                 ldb.FLAG_MOD_REPLACE))
1135
1136     def dns_update_record(self, prefix, txt):
1137         p = self.make_txt_update(prefix, txt, self.zone)
1138         (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1139         self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1140         recs = self.ldap_get_dns_records(prefix)
1141         recs = [r for r in recs if r.data.str == txt]
1142         self.assertEqual(len(recs), 1)
1143         return recs[0]
1144
1145     def dns_tombstone(self, prefix, txt, zone):
1146         name = prefix + "." + zone
1147
1148         to = dnsp.DnssrvRpcRecord()
1149         to.dwTimeStamp = 1000
1150         to.wType = dnsp.DNS_TYPE_TOMBSTONE
1151
1152         self.samdb.dns_replace(name, [to])
1153
1154     def ldap_get_records(self, name):
1155         # The use of SCOPE_SUBTREE here avoids raising an exception in the
1156         # 0 results case for a test below.
1157
1158         expr = "(&(objectClass=dnsNode)(name={0}))".format(name)
1159         return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1160                                  expression=expr, attrs=["*"])
1161
1162     def ldap_get_dns_records(self, name):
1163         records = self.ldap_get_records(name)
1164         return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1165                 for r in records[0].get('dnsRecord')]
1166
1167     def ldap_get_zone_settings(self):
1168         records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1169                                     expression="(&(objectClass=dnsZone)" +
1170                                     "(name={0}))".format(self.zone),
1171                                     attrs=["dNSProperty"])
1172         self.assertEqual(len(records), 1)
1173         props = [ndr_unpack(dnsp.DnsProperty, r)
1174                  for r in records[0].get('dNSProperty')]
1175
1176         # We have no choice but to repeat these here.
1177         zone_prop_ids = {0x00: "EMPTY",
1178                          0x01: "TYPE",
1179                          0x02: "ALLOW_UPDATE",
1180                          0x08: "SECURE_TIME",
1181                          0x10: "NOREFRESH_INTERVAL",
1182                          0x11: "SCAVENGING_SERVERS",
1183                          0x12: "AGING_ENABLED_TIME",
1184                          0x20: "REFRESH_INTERVAL",
1185                          0x40: "AGING_STATE",
1186                          0x80: "DELETED_FROM_HOSTNAME",
1187                          0x81: "MASTER_SERVERS",
1188                          0x82: "AUTO_NS_SERVERS",
1189                          0x83: "DCPROMO_CONVERT",
1190                          0x90: "SCAVENGING_SERVERS_DA",
1191                          0x91: "MASTER_SERVERS_DA",
1192                          0x92: "NS_SERVERS_DA",
1193                          0x100: "NODE_DBFLAGS"}
1194         return {zone_prop_ids[p.id].lower(): p.data for p in props}
1195
1196     def set_aging(self, enable=False):
1197         self.create_zone(self.zone, aging_enabled=enable)
1198         self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1199                         Aging=int(bool(enable)), zone=self.zone,
1200                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1201
1202     def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1203         self.set_aging(enable=True)
1204         settings = self.ldap_get_zone_settings()
1205         self.assertTrue(settings['aging_state'] is not None)
1206         self.assertTrue(settings['aging_state'])
1207
1208         rec = self.dns_update_record('agingtest', ['test txt'])
1209         self.assertNotEqual(rec.dwTimeStamp, 0)
1210
1211     def test_set_aging_disabled(self):
1212         self.set_aging(enable=False)
1213         settings = self.ldap_get_zone_settings()
1214         self.assertTrue(settings['aging_state'] is not None)
1215         self.assertFalse(settings['aging_state'])
1216
1217         rec = self.dns_update_record('agingtest', ['test txt'])
1218         self.assertNotEqual(rec.dwTimeStamp, 0)
1219
1220     def test_aging_update(self, enable=True):
1221         name, txt = 'agingtest', ['test txt']
1222         self.set_aging(enable=True)
1223         before_mod = self.dns_update_record(name, txt)
1224         if not enable:
1225             self.set_params(zone=self.zone, Aging=0)
1226         dec = 2
1227
1228         def mod_ts(rec):
1229             self.assertTrue(rec.dwTimeStamp > 0)
1230             rec.dwTimeStamp -= dec
1231         self.ldap_modify_dnsrecs(name, mod_ts)
1232         after_mod = self.ldap_get_dns_records(name)
1233         self.assertEqual(len(after_mod), 1)
1234         after_mod = after_mod[0]
1235         self.assertEqual(after_mod.dwTimeStamp,
1236                          before_mod.dwTimeStamp - dec)
1237         after_update = self.dns_update_record(name, txt)
1238         after_should_equal = before_mod if enable else after_mod
1239         self.assertEqual(after_should_equal.dwTimeStamp,
1240                          after_update.dwTimeStamp)
1241
1242     def test_aging_update_disabled(self):
1243         self.test_aging_update(enable=False)
1244
1245     def test_aging_refresh(self):
1246         name, txt = 'agingtest', ['test txt']
1247         self.create_zone(self.zone, aging_enabled=True)
1248         interval = 10
1249         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1250                         Aging=1, zone=self.zone,
1251                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1252         before_mod = self.dns_update_record(name, txt)
1253
1254         def mod_ts(rec):
1255             self.assertTrue(rec.dwTimeStamp > 0)
1256             rec.dwTimeStamp -= interval / 2
1257         self.ldap_modify_dnsrecs(name, mod_ts)
1258         update_during_norefresh = self.dns_update_record(name, txt)
1259
1260         def mod_ts(rec):
1261             self.assertTrue(rec.dwTimeStamp > 0)
1262             rec.dwTimeStamp -= interval + interval / 2
1263         self.ldap_modify_dnsrecs(name, mod_ts)
1264         update_during_refresh = self.dns_update_record(name, txt)
1265         self.assertEqual(update_during_norefresh.dwTimeStamp,
1266                          before_mod.dwTimeStamp - interval / 2)
1267         self.assertEqual(update_during_refresh.dwTimeStamp,
1268                          before_mod.dwTimeStamp)
1269
1270     def test_rpc_add_no_timestamp(self):
1271         name, txt = 'agingtest', ['test txt']
1272         self.set_aging(enable=True)
1273         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1274         rec_buf.rec = TXTRecord(txt)
1275         self.rpc_conn.DnssrvUpdateRecord2(
1276             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1277             0,
1278             self.server_ip,
1279             self.zone,
1280             name,
1281             rec_buf,
1282             None)
1283         recs = self.ldap_get_dns_records(name)
1284         self.assertEqual(len(recs), 1)
1285         self.assertEqual(recs[0].dwTimeStamp, 0)
1286
1287     def test_static_record_dynamic_update(self):
1288         name, txt = 'agingtest', ['test txt']
1289         txt2 = ['test txt2']
1290         self.set_aging(enable=True)
1291         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1292         rec_buf.rec = TXTRecord(txt)
1293         self.rpc_conn.DnssrvUpdateRecord2(
1294             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1295             0,
1296             self.server_ip,
1297             self.zone,
1298             name,
1299             rec_buf,
1300             None)
1301
1302         rec2 = self.dns_update_record(name, txt2)
1303         self.assertEqual(rec2.dwTimeStamp, 0)
1304
1305     def test_dynamic_record_static_update(self):
1306         name, txt = 'agingtest', ['test txt']
1307         txt2 = ['test txt2']
1308         txt3 = ['test txt3']
1309         self.set_aging(enable=True)
1310
1311         self.dns_update_record(name, txt)
1312
1313         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1314         rec_buf.rec = TXTRecord(txt2)
1315         self.rpc_conn.DnssrvUpdateRecord2(
1316             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1317             0,
1318             self.server_ip,
1319             self.zone,
1320             name,
1321             rec_buf,
1322             None)
1323
1324         self.dns_update_record(name, txt3)
1325
1326         recs = self.ldap_get_dns_records(name)
1327         # Put in dict because ldap recs might be out of order
1328         recs = {str(r.data.str): r for r in recs}
1329         self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1330         self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1331         self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1332
1333     def test_dns_tombstone_custom_match_rule(self):
1334         lp = self.get_loadparm()
1335         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1336                            session_info=system_session(),
1337                            credentials=self.creds)
1338
1339         name, txt = 'agingtest', ['test txt']
1340         name2, txt2 = 'agingtest2', ['test txt2']
1341         name3, txt3 = 'agingtest3', ['test txt3']
1342         name4, txt4 = 'agingtest4', ['test txt4']
1343         name5, txt5 = 'agingtest5', ['test txt5']
1344
1345         self.create_zone(self.zone, aging_enabled=True)
1346         interval = 10
1347         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1348                         Aging=1, zone=self.zone,
1349                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1350
1351         self.dns_update_record(name, txt)
1352
1353         self.dns_update_record(name2, txt)
1354         self.dns_update_record(name2, txt2)
1355
1356         self.dns_update_record(name3, txt)
1357         self.dns_update_record(name3, txt2)
1358         last_update = self.dns_update_record(name3, txt3)
1359
1360         # Modify txt1 of the first 2 names
1361         def mod_ts(rec):
1362             if rec.data.str == txt:
1363                 rec.dwTimeStamp -= 2
1364         self.ldap_modify_dnsrecs(name, mod_ts)
1365         self.ldap_modify_dnsrecs(name2, mod_ts)
1366
1367         # create a static dns record.
1368         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1369         rec_buf.rec = TXTRecord(txt4)
1370         self.rpc_conn.DnssrvUpdateRecord2(
1371             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1372             0,
1373             self.server_ip,
1374             self.zone,
1375             name4,
1376             rec_buf,
1377             None)
1378
1379         # Create a tomb stoned record.
1380         self.dns_update_record(name5, txt5)
1381         self.dns_tombstone(name5, txt5, self.zone)
1382
1383         self.ldap_get_dns_records(name3)
1384         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1385         expr = expr.format(int(last_update.dwTimeStamp) - 1)
1386         try:
1387             res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1388                                     expression=expr, attrs=["*"])
1389         except ldb.LdbError as e:
1390             self.fail(str(e))
1391         updated_names = {str(r.get('name')) for r in res}
1392         self.assertEqual(updated_names, set([name, name2]))
1393
1394     def test_dns_tombstone_custom_match_rule_no_records(self):
1395         lp = self.get_loadparm()
1396         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1397                            session_info=system_session(),
1398                            credentials=self.creds)
1399
1400         self.create_zone(self.zone, aging_enabled=True)
1401         interval = 10
1402         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1403                         Aging=1, zone=self.zone,
1404                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1405
1406         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1407         expr = expr.format(1)
1408
1409         try:
1410             res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1411                                     expression=expr, attrs=["*"])
1412         except ldb.LdbError as e:
1413             self.fail(str(e))
1414         self.assertEqual(0, len(res))
1415
1416     def test_dns_tombstone_custom_match_rule_fail(self):
1417         self.create_zone(self.zone, aging_enabled=True)
1418         samdb = SamDB(url=lp.samdb_url(),
1419                       lp=lp,
1420                       session_info=system_session(),
1421                       credentials=self.creds)
1422
1423         # Property name in not dnsRecord
1424         expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1425         res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1426                            expression=expr, attrs=["*"])
1427         self.assertEquals(len(res), 0)
1428
1429         # No value for tombstone time
1430         try:
1431             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1432             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1433                                expression=expr, attrs=["*"])
1434             self.assertEquals(len(res), 0)
1435             self.fail("Exception: ldb.ldbError not generated")
1436         except ldb.LdbError as e:
1437             (num, msg) = e.args
1438             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1439
1440         # Tombstone time = -
1441         try:
1442             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1443             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1444                                expression=expr, attrs=["*"])
1445             self.assertEquals(len(res), 0)
1446             self.fail("Exception: ldb.ldbError not generated")
1447         except ldb.LdbError as e:
1448             (num, _) = e.args
1449             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1450
1451         # Tombstone time longer than 64 characters
1452         try:
1453             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1454             expr = expr.format("1" * 65)
1455             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1456                                expression=expr, attrs=["*"])
1457             self.assertEquals(len(res), 0)
1458             self.fail("Exception: ldb.ldbError not generated")
1459         except ldb.LdbError as e:
1460             (num, _) = e.args
1461             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1462
1463         # Non numeric Tombstone time
1464         try:
1465             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1466             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1467                                expression=expr, attrs=["*"])
1468             self.assertEquals(len(res), 0)
1469             self.fail("Exception: ldb.ldbError not generated")
1470         except ldb.LdbError as e:
1471             (num, _) = e.args
1472             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1473
1474         # Non system session
1475         try:
1476             db = SamDB(url="ldap://" + self.server_ip,
1477                        lp=self.get_loadparm(),
1478                        credentials=self.creds)
1479
1480             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1481             res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1482                             expression=expr, attrs=["*"])
1483             self.assertEquals(len(res), 0)
1484             self.fail("Exception: ldb.ldbError not generated")
1485         except ldb.LdbError as e:
1486             (num, _) = e.args
1487             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1488
1489     def test_basic_scavenging(self):
1490         lp = self.get_loadparm()
1491         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1492                            session_info=system_session(),
1493                            credentials=self.creds)
1494
1495         self.create_zone(self.zone, aging_enabled=True)
1496         interval = 1
1497         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1498                         zone=self.zone, Aging=1,
1499                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1500         name, txt = 'agingtest', ['test txt']
1501         name2, txt2 = 'agingtest2', ['test txt2']
1502         name3, txt3 = 'agingtest3', ['test txt3']
1503         self.dns_update_record(name, txt)
1504         self.dns_update_record(name2, txt)
1505         self.dns_update_record(name2, txt2)
1506         self.dns_update_record(name3, txt)
1507         self.dns_update_record(name3, txt2)
1508         last_add = self.dns_update_record(name3, txt3)
1509
1510         def mod_ts(rec):
1511             self.assertTrue(rec.dwTimeStamp > 0)
1512             if rec.data.str == txt:
1513                 rec.dwTimeStamp -= interval * 5
1514         self.ldap_modify_dnsrecs(name, mod_ts)
1515         self.ldap_modify_dnsrecs(name2, mod_ts)
1516         self.ldap_modify_dnsrecs(name3, mod_ts)
1517         self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1518         dsdb._scavenge_dns_records(self.samdb)
1519
1520         recs = self.ldap_get_dns_records(name)
1521         self.assertEqual(len(recs), 1)
1522         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1523
1524         recs = self.ldap_get_dns_records(name2)
1525         self.assertEqual(len(recs), 1)
1526         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1527         self.assertEqual(recs[0].data.str, txt2)
1528
1529         recs = self.ldap_get_dns_records(name3)
1530         self.assertEqual(len(recs), 2)
1531         txts = {str(r.data.str) for r in recs}
1532         self.assertEqual(txts, {str(txt2), str(txt3)})
1533         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1534         self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1535
1536         for make_it_work in [False, True]:
1537             inc = -1 if make_it_work else 1
1538
1539             def mod_ts(rec):
1540                 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1541             self.ldap_modify_dnsrecs(name, mod_ts)
1542             dsdb._dns_delete_tombstones(self.samdb)
1543             recs = self.ldap_get_records(name)
1544             if make_it_work:
1545                 self.assertEqual(len(recs), 0)
1546             else:
1547                 self.assertEqual(len(recs), 1)
1548
1549     def delete_zone(self, zone):
1550         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1551                                        0,
1552                                        self.server_ip,
1553                                        zone,
1554                                        0,
1555                                        'DeleteZoneFromDs',
1556                                        dnsserver.DNSSRV_TYPEID_NULL,
1557                                        None)
1558
1559     def test_soa_query(self):
1560         zone = "test.lan"
1561         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1562         questions = []
1563
1564         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1565         questions.append(q)
1566         self.finish_name_packet(p, questions)
1567
1568         (response, response_packet) =\
1569             self.dns_transaction_udp(p, host=server_ip)
1570         # Windows returns OK while BIND logically seems to return NXDOMAIN
1571         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1572         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1573         self.assertEquals(response.ancount, 0)
1574
1575         self.create_zone(zone)
1576         (response, response_packet) =\
1577             self.dns_transaction_udp(p, host=server_ip)
1578         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1579         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1580         self.assertEquals(response.ancount, 1)
1581         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1582
1583         self.delete_zone(zone)
1584         (response, response_packet) =\
1585             self.dns_transaction_udp(p, host=server_ip)
1586         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1587         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1588         self.assertEquals(response.ancount, 0)
1589
1590
1591 class TestRPCRoundtrip(DNSTest):
1592     def setUp(self):
1593         super(TestRPCRoundtrip, self).setUp()
1594         global server, server_ip, lp, creds
1595         self.server = server_name
1596         self.server_ip = server_ip
1597         self.lp = lp
1598         self.creds = creds
1599         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1600                                             (self.server_ip),
1601                                             self.lp,
1602                                             self.creds)
1603
1604     def tearDown(self):
1605         super(TestRPCRoundtrip, self).tearDown()
1606
1607     def rpc_update(self, fqn=None, data=None, wType=None, delete=False):
1608         fqn = fqn or ("rpctestrec." + self.get_dns_domain())
1609
1610         rec = data_to_dns_record(wType, data)
1611         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1612         add_rec_buf.rec = rec
1613
1614         add_arg = add_rec_buf
1615         del_arg = None
1616         if delete:
1617             add_arg = None
1618             del_arg = add_rec_buf
1619
1620         self.rpc_conn.DnssrvUpdateRecord2(
1621             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1622             0,
1623             self.server_ip,
1624             self.get_dns_domain(),
1625             fqn,
1626             add_arg,
1627             del_arg)
1628
1629     def test_rpc_self_referencing_cname(self):
1630         cname = "cnametest2_unqual_rec_loop"
1631         cname_fqn = "%s.%s" % (cname, self.get_dns_domain())
1632
1633         try:
1634             self.rpc_update(fqn=cname, data=cname_fqn,
1635                             wType=dnsp.DNS_TYPE_CNAME, delete=True)
1636         except WERRORError as e:
1637             if e.args[0] != werror.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST:
1638                 self.fail("RPC DNS gaven wrong error on pre-test cleanup "
1639                           "for self referencing CNAME: %s" % e.args[0])
1640
1641         try:
1642             self.rpc_update(fqn=cname, wType=dnsp.DNS_TYPE_CNAME, data=cname_fqn)
1643         except WERRORError as e:
1644             if e.args[0] != werror.WERR_DNS_ERROR_CNAME_LOOP:
1645                 self.fail("RPC DNS gaven wrong error on insertion of "
1646                           "self referencing CNAME: %s" % e.args[0])
1647             return
1648
1649         self.fail("RPC DNS allowed insertion of self referencing CNAME")
1650
1651     def test_update_add_txt_rpc_to_dns(self):
1652         prefix, txt = 'rpctextrec', ['"This is a test"']
1653
1654         name = "%s.%s" % (prefix, self.get_dns_domain())
1655
1656         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1657         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1658         add_rec_buf.rec = rec
1659         try:
1660             self.rpc_conn.DnssrvUpdateRecord2(
1661                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1662                 0,
1663                 self.server_ip,
1664                 self.get_dns_domain(),
1665                 name,
1666                 add_rec_buf,
1667                 None)
1668
1669         except WERRORError as e:
1670             self.fail(str(e))
1671
1672         try:
1673             self.check_query_txt(prefix, txt)
1674         finally:
1675             self.rpc_conn.DnssrvUpdateRecord2(
1676                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1677                 0,
1678                 self.server_ip,
1679                 self.get_dns_domain(),
1680                 name,
1681                 None,
1682                 add_rec_buf)
1683
1684     def test_update_add_null_padded_txt_record(self):
1685         "test adding records works"
1686         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1687         p = self.make_txt_update(prefix, txt)
1688         (response, response_packet) =\
1689             self.dns_transaction_udp(p, host=server_ip)
1690         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1691         self.check_query_txt(prefix, txt)
1692         self.assertIsNotNone(
1693             dns_record_match(self.rpc_conn,
1694                              self.server_ip,
1695                              self.get_dns_domain(),
1696                              "%s.%s" % (prefix, self.get_dns_domain()),
1697                              dnsp.DNS_TYPE_TXT,
1698                              '"\\"This is a test\\"" "" ""'))
1699
1700         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1701         p = self.make_txt_update(prefix, txt)
1702         (response, response_packet) =\
1703             self.dns_transaction_udp(p, host=server_ip)
1704         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1705         self.check_query_txt(prefix, txt)
1706         self.assertIsNotNone(
1707             dns_record_match(
1708                 self.rpc_conn,
1709                 self.server_ip,
1710                 self.get_dns_domain(),
1711                 "%s.%s" % (prefix, self.get_dns_domain()),
1712                 dnsp.DNS_TYPE_TXT,
1713                 '"\\"This is a test\\"" "" "" "more text"'))
1714
1715         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1716         p = self.make_txt_update(prefix, txt)
1717         (response, response_packet) =\
1718             self.dns_transaction_udp(p, host=server_ip)
1719         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1720         self.check_query_txt(prefix, txt)
1721         self.assertIsNotNone(
1722             dns_record_match(
1723                 self.rpc_conn,
1724                 self.server_ip,
1725                 self.get_dns_domain(),
1726                 "%s.%s" % (prefix, self.get_dns_domain()),
1727                 dnsp.DNS_TYPE_TXT,
1728                 '"" "" "\\"This is a test\\""'))
1729
1730     def test_update_add_padding_rpc_to_dns(self):
1731         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1732         prefix = 'rpc' + prefix
1733         name = "%s.%s" % (prefix, self.get_dns_domain())
1734
1735         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1736                                  '"\\"This is a test\\"" "" ""')
1737         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1738         add_rec_buf.rec = rec
1739         try:
1740             self.rpc_conn.DnssrvUpdateRecord2(
1741                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1742                 0,
1743                 self.server_ip,
1744                 self.get_dns_domain(),
1745                 name,
1746                 add_rec_buf,
1747                 None)
1748
1749         except WERRORError as e:
1750             self.fail(str(e))
1751
1752         try:
1753             self.check_query_txt(prefix, txt)
1754         finally:
1755             self.rpc_conn.DnssrvUpdateRecord2(
1756                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1757                 0,
1758                 self.server_ip,
1759                 self.get_dns_domain(),
1760                 name,
1761                 None,
1762                 add_rec_buf)
1763
1764         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1765         prefix = 'rpc' + prefix
1766         name = "%s.%s" % (prefix, self.get_dns_domain())
1767
1768         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1769                                  '"\\"This is a test\\"" "" "" "more text"')
1770         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1771         add_rec_buf.rec = rec
1772         try:
1773             self.rpc_conn.DnssrvUpdateRecord2(
1774                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1775                 0,
1776                 self.server_ip,
1777                 self.get_dns_domain(),
1778                 name,
1779                 add_rec_buf,
1780                 None)
1781
1782         except WERRORError as e:
1783             self.fail(str(e))
1784
1785         try:
1786             self.check_query_txt(prefix, txt)
1787         finally:
1788             self.rpc_conn.DnssrvUpdateRecord2(
1789                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1790                 0,
1791                 self.server_ip,
1792                 self.get_dns_domain(),
1793                 name,
1794                 None,
1795                 add_rec_buf)
1796
1797         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1798         prefix = 'rpc' + prefix
1799         name = "%s.%s" % (prefix, self.get_dns_domain())
1800
1801         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1802                                  '"" "" "\\"This is a test\\""')
1803         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1804         add_rec_buf.rec = rec
1805         try:
1806             self.rpc_conn.DnssrvUpdateRecord2(
1807                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1808                 0,
1809                 self.server_ip,
1810                 self.get_dns_domain(),
1811                 name,
1812                 add_rec_buf,
1813                 None)
1814         except WERRORError as e:
1815             self.fail(str(e))
1816
1817         try:
1818             self.check_query_txt(prefix, txt)
1819         finally:
1820             self.rpc_conn.DnssrvUpdateRecord2(
1821                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1822                 0,
1823                 self.server_ip,
1824                 self.get_dns_domain(),
1825                 name,
1826                 None,
1827                 add_rec_buf)
1828
1829     # Test is incomplete due to strlen against txt records
1830     def test_update_add_null_char_txt_record(self):
1831         "test adding records works"
1832         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1833         p = self.make_txt_update(prefix, txt)
1834         (response, response_packet) =\
1835             self.dns_transaction_udp(p, host=server_ip)
1836         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1837         self.check_query_txt(prefix, ['NULL'])
1838         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1839                                               self.get_dns_domain(),
1840                                               "%s.%s" % (prefix, self.get_dns_domain()),
1841                                               dnsp.DNS_TYPE_TXT, '"NULL"'))
1842
1843         prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1844         p = self.make_txt_update(prefix, txt)
1845         (response, response_packet) =\
1846             self.dns_transaction_udp(p, host=server_ip)
1847         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1848         self.check_query_txt(prefix, ['NULL', 'NULL'])
1849         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1850                                               self.get_dns_domain(),
1851                                               "%s.%s" % (prefix, self.get_dns_domain()),
1852                                               dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1853
1854     def test_update_add_null_char_rpc_to_dns(self):
1855         prefix = 'rpcnulltextrec'
1856         name = "%s.%s" % (prefix, self.get_dns_domain())
1857
1858         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1859         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1860         add_rec_buf.rec = rec
1861         try:
1862             self.rpc_conn.DnssrvUpdateRecord2(
1863                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1864                 0,
1865                 self.server_ip,
1866                 self.get_dns_domain(),
1867                 name,
1868                 add_rec_buf,
1869                 None)
1870
1871         except WERRORError as e:
1872             self.fail(str(e))
1873
1874         try:
1875             self.check_query_txt(prefix, ['NULL'])
1876         finally:
1877             self.rpc_conn.DnssrvUpdateRecord2(
1878                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1879                 0,
1880                 self.server_ip,
1881                 self.get_dns_domain(),
1882                 name,
1883                 None,
1884                 add_rec_buf)
1885
1886     def test_update_add_hex_char_txt_record(self):
1887         "test adding records works"
1888         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1889         p = self.make_txt_update(prefix, txt)
1890         (response, response_packet) =\
1891             self.dns_transaction_udp(p, host=server_ip)
1892         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1893         self.check_query_txt(prefix, txt)
1894         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1895                                               self.get_dns_domain(),
1896                                               "%s.%s" % (prefix, self.get_dns_domain()),
1897                                               dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1898
1899     def test_update_add_hex_rpc_to_dns(self):
1900         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1901         prefix = 'rpc' + prefix
1902         name = "%s.%s" % (prefix, self.get_dns_domain())
1903
1904         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1905         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1906         add_rec_buf.rec = rec
1907         try:
1908             self.rpc_conn.DnssrvUpdateRecord2(
1909                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1910                 0,
1911                 self.server_ip,
1912                 self.get_dns_domain(),
1913                 name,
1914                 add_rec_buf,
1915                 None)
1916
1917         except WERRORError as e:
1918             self.fail(str(e))
1919
1920         try:
1921             self.check_query_txt(prefix, txt)
1922         finally:
1923             self.rpc_conn.DnssrvUpdateRecord2(
1924                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1925                 0,
1926                 self.server_ip,
1927                 self.get_dns_domain(),
1928                 name,
1929                 None,
1930                 add_rec_buf)
1931
1932     def test_update_add_slash_txt_record(self):
1933         "test adding records works"
1934         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1935         p = self.make_txt_update(prefix, txt)
1936         (response, response_packet) =\
1937             self.dns_transaction_udp(p, host=server_ip)
1938         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1939         self.check_query_txt(prefix, txt)
1940         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1941                                               self.get_dns_domain(),
1942                                               "%s.%s" % (prefix, self.get_dns_domain()),
1943                                               dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1944
1945     # This test fails against Windows as it eliminates slashes in RPC
1946     # One typical use for a slash is in records like 'var=value' to
1947     # escape '=' characters.
1948     def test_update_add_slash_rpc_to_dns(self):
1949         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1950         prefix = 'rpc' + prefix
1951         name = "%s.%s" % (prefix, self.get_dns_domain())
1952
1953         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1954         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1955         add_rec_buf.rec = rec
1956         try:
1957             self.rpc_conn.DnssrvUpdateRecord2(
1958                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1959                 0,
1960                 self.server_ip,
1961                 self.get_dns_domain(),
1962                 name,
1963                 add_rec_buf,
1964                 None)
1965
1966         except WERRORError as e:
1967             self.fail(str(e))
1968
1969         try:
1970             self.check_query_txt(prefix, txt)
1971
1972         finally:
1973             self.rpc_conn.DnssrvUpdateRecord2(
1974                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1975                 0,
1976                 self.server_ip,
1977                 self.get_dns_domain(),
1978                 name,
1979                 None,
1980                 add_rec_buf)
1981
1982     def test_update_add_two_txt_records(self):
1983         "test adding two txt records works"
1984         prefix, txt = 'textrec2', ['"This is a test"',
1985                                    '"and this is a test, too"']
1986         p = self.make_txt_update(prefix, txt)
1987         (response, response_packet) =\
1988             self.dns_transaction_udp(p, host=server_ip)
1989         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1990         self.check_query_txt(prefix, txt)
1991         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1992                                               self.get_dns_domain(),
1993                                               "%s.%s" % (prefix, self.get_dns_domain()),
1994                                               dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1995                                               ' "\\"and this is a test, too\\""'))
1996
1997     def test_update_add_two_rpc_to_dns(self):
1998         prefix, txt = 'textrec2', ['"This is a test"',
1999                                    '"and this is a test, too"']
2000         prefix = 'rpc' + prefix
2001         name = "%s.%s" % (prefix, self.get_dns_domain())
2002
2003         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
2004                                  '"\\"This is a test\\""' +
2005                                  ' "\\"and this is a test, too\\""')
2006         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2007         add_rec_buf.rec = rec
2008         try:
2009             self.rpc_conn.DnssrvUpdateRecord2(
2010                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2011                 0,
2012                 self.server_ip,
2013                 self.get_dns_domain(),
2014                 name,
2015                 add_rec_buf,
2016                 None)
2017
2018         except WERRORError as e:
2019             self.fail(str(e))
2020
2021         try:
2022             self.check_query_txt(prefix, txt)
2023         finally:
2024             self.rpc_conn.DnssrvUpdateRecord2(
2025                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2026                 0,
2027                 self.server_ip,
2028                 self.get_dns_domain(),
2029                 name,
2030                 None,
2031                 add_rec_buf)
2032
2033     def test_update_add_empty_txt_records(self):
2034         "test adding two txt records works"
2035         prefix, txt = 'emptytextrec', []
2036         p = self.make_txt_update(prefix, txt)
2037         (response, response_packet) =\
2038             self.dns_transaction_udp(p, host=server_ip)
2039         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
2040         self.check_query_txt(prefix, txt)
2041         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
2042                                               self.get_dns_domain(),
2043                                               "%s.%s" % (prefix, self.get_dns_domain()),
2044                                               dnsp.DNS_TYPE_TXT, ''))
2045
2046     def test_update_add_empty_rpc_to_dns(self):
2047         prefix, txt = 'rpcemptytextrec', []
2048
2049         name = "%s.%s" % (prefix, self.get_dns_domain())
2050
2051         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
2052         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
2053         add_rec_buf.rec = rec
2054         try:
2055             self.rpc_conn.DnssrvUpdateRecord2(
2056                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2057                 0,
2058                 self.server_ip,
2059                 self.get_dns_domain(),
2060                 name,
2061                 add_rec_buf,
2062                 None)
2063         except WERRORError as e:
2064             self.fail(str(e))
2065
2066         try:
2067             self.check_query_txt(prefix, txt)
2068         finally:
2069             self.rpc_conn.DnssrvUpdateRecord2(
2070                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2071                 0,
2072                 self.server_ip,
2073                 self.get_dns_domain(),
2074                 name,
2075                 None,
2076                 add_rec_buf)
2077
2078
2079 TestProgram(module=__name__, opts=subunitopts)