CVE-2018-14629 dns: CNAME loop prevention using counter
[bbaumbach/samba-autobuild/.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
24 import ldb
25 from ldb import ERR_OPERATIONS_ERROR
26 import os
27 import sys
28 import struct
29 import socket
30 import samba.ndr as ndr
31 from samba import credentials
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
38 import optparse
39
40
41 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
42 sambaopts = options.SambaOptions(parser)
43 parser.add_option_group(sambaopts)
44
45 # This timeout only has relevance when testing against Windows
46 # Format errors tend to return patchy responses, so a timeout is needed.
47 parser.add_option("--timeout", type="int", dest="timeout",
48                   help="Specify timeout for DNS requests")
49
50 # use command line creds if available
51 credopts = options.CredentialsOptions(parser)
52 parser.add_option_group(credopts)
53 subunitopts = SubunitOptions(parser)
54 parser.add_option_group(subunitopts)
55
56 opts, args = parser.parse_args()
57
58 lp = sambaopts.get_loadparm()
59 creds = credopts.get_credentials(lp)
60
61 timeout = opts.timeout
62
63 if len(args) < 2:
64     parser.print_usage()
65     sys.exit(1)
66
67 server_name = args[0]
68 server_ip = args[1]
69 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
70
71
72 class TestSimpleQueries(DNSTest):
73     def setUp(self):
74         super(TestSimpleQueries, self).setUp()
75         global server, server_ip, lp, creds, timeout
76         self.server = server_name
77         self.server_ip = server_ip
78         self.lp = lp
79         self.creds = creds
80         self.timeout = timeout
81
82     def test_one_a_query(self):
83         "create a query packet containing one query record"
84         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
85         questions = []
86
87         name = "%s.%s" % (self.server, self.get_dns_domain())
88         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
89         print("asking for ", q.name)
90         questions.append(q)
91
92         self.finish_name_packet(p, questions)
93         (response, response_packet) =\
94             self.dns_transaction_udp(p, host=server_ip)
95         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97         self.assertEquals(response.ancount, 1)
98         self.assertEquals(response.answers[0].rdata,
99                           self.server_ip)
100
101     def test_one_SOA_query(self):
102         "create a query packet containing one query record for the SOA"
103         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
104         questions = []
105
106         name = "%s" % (self.get_dns_domain())
107         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
108         print("asking for ", q.name)
109         questions.append(q)
110
111         self.finish_name_packet(p, questions)
112         (response, response_packet) =\
113             self.dns_transaction_udp(p, host=server_ip)
114         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
115         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
116         self.assertEquals(response.ancount, 1)
117         self.assertEquals(
118             response.answers[0].rdata.mname.upper(),
119             ("%s.%s" % (self.server, self.get_dns_domain())).upper())
120
121     def test_one_a_query_tcp(self):
122         "create a query packet containing one query record via TCP"
123         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
124         questions = []
125
126         name = "%s.%s" % (self.server, self.get_dns_domain())
127         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
128         print("asking for ", q.name)
129         questions.append(q)
130
131         self.finish_name_packet(p, questions)
132         (response, response_packet) =\
133             self.dns_transaction_tcp(p, host=server_ip)
134         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
135         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
136         self.assertEquals(response.ancount, 1)
137         self.assertEquals(response.answers[0].rdata,
138                           self.server_ip)
139
140     def test_one_mx_query(self):
141         "create a query packet causing an empty RCODE_OK answer"
142         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
143         questions = []
144
145         name = "%s.%s" % (self.server, self.get_dns_domain())
146         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
147         print("asking for ", q.name)
148         questions.append(q)
149
150         self.finish_name_packet(p, questions)
151         (response, response_packet) =\
152             self.dns_transaction_udp(p, host=server_ip)
153         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
154         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
155         self.assertEquals(response.ancount, 0)
156
157         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
158         questions = []
159
160         name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
161         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
162         print("asking for ", q.name)
163         questions.append(q)
164
165         self.finish_name_packet(p, questions)
166         (response, response_packet) =\
167             self.dns_transaction_udp(p, host=server_ip)
168         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
169         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
170         self.assertEquals(response.ancount, 0)
171
172     def test_two_queries(self):
173         "create a query packet containing two query records"
174         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
175         questions = []
176
177         name = "%s.%s" % (self.server, self.get_dns_domain())
178         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
179         questions.append(q)
180
181         name = "%s.%s" % ('bogusname', self.get_dns_domain())
182         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
183         questions.append(q)
184
185         self.finish_name_packet(p, questions)
186         try:
187             (response, response_packet) =\
188                 self.dns_transaction_udp(p, host=server_ip)
189             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
190         except socket.timeout:
191             # Windows chooses not to respond to incorrectly formatted queries.
192             # Although this appears to be non-deterministic even for the same
193             # request twice, it also appears to be based on a how poorly the
194             # request is formatted.
195             pass
196
197     def test_qtype_all_query(self):
198         "create a QTYPE_ALL query"
199         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
200         questions = []
201
202         name = "%s.%s" % (self.server, self.get_dns_domain())
203         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
204         print("asking for ", q.name)
205         questions.append(q)
206
207         self.finish_name_packet(p, questions)
208         (response, response_packet) =\
209             self.dns_transaction_udp(p, host=server_ip)
210
211         num_answers = 1
212         dc_ipv6 = os.getenv('SERVER_IPV6')
213         if dc_ipv6 is not None:
214             num_answers += 1
215
216         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
217         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
218         self.assertEquals(response.ancount, num_answers)
219         self.assertEquals(response.answers[0].rdata,
220                           self.server_ip)
221         if dc_ipv6 is not None:
222             self.assertEquals(response.answers[1].rdata, dc_ipv6)
223
224     def test_qclass_none_query(self):
225         "create a QCLASS_NONE query"
226         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
227         questions = []
228
229         name = "%s.%s" % (self.server, self.get_dns_domain())
230         q = self.make_name_question(
231             name,
232             dns.DNS_QTYPE_ALL,
233             dns.DNS_QCLASS_NONE)
234         questions.append(q)
235
236         self.finish_name_packet(p, questions)
237         try:
238             (response, response_packet) =\
239                 self.dns_transaction_udp(p, host=server_ip)
240             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
241         except socket.timeout:
242             # Windows chooses not to respond to incorrectly formatted queries.
243             # Although this appears to be non-deterministic even for the same
244             # request twice, it also appears to be based on a how poorly the
245             # request is formatted.
246             pass
247
248     def test_soa_hostname_query(self):
249         "create a SOA query for a hostname"
250         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
251         questions = []
252
253         name = "%s.%s" % (self.server, self.get_dns_domain())
254         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
255         questions.append(q)
256
257         self.finish_name_packet(p, questions)
258         (response, response_packet) =\
259             self.dns_transaction_udp(p, host=server_ip)
260         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262         # We don't get SOA records for single hosts
263         self.assertEquals(response.ancount, 0)
264         # But we do respond with an authority section
265         self.assertEqual(response.nscount, 1)
266
267     def test_soa_domain_query(self):
268         "create a SOA query for a domain"
269         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
270         questions = []
271
272         name = self.get_dns_domain()
273         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
274         questions.append(q)
275
276         self.finish_name_packet(p, questions)
277         (response, response_packet) =\
278             self.dns_transaction_udp(p, host=server_ip)
279         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
280         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
281         self.assertEquals(response.ancount, 1)
282         self.assertEquals(response.answers[0].rdata.minimum, 3600)
283
284
285 class TestDNSUpdates(DNSTest):
286     def setUp(self):
287         super(TestDNSUpdates, self).setUp()
288         global server, server_ip, lp, creds, timeout
289         self.server = server_name
290         self.server_ip = server_ip
291         self.lp = lp
292         self.creds = creds
293         self.timeout = timeout
294
295     def test_two_updates(self):
296         "create two update requests"
297         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
298         updates = []
299
300         name = "%s.%s" % (self.server, self.get_dns_domain())
301         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
302         updates.append(u)
303
304         name = self.get_dns_domain()
305         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
306         updates.append(u)
307
308         self.finish_name_packet(p, updates)
309         try:
310             (response, response_packet) =\
311                 self.dns_transaction_udp(p, host=server_ip)
312             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
313         except socket.timeout:
314             # Windows chooses not to respond to incorrectly formatted queries.
315             # Although this appears to be non-deterministic even for the same
316             # request twice, it also appears to be based on a how poorly the
317             # request is formatted.
318             pass
319
320     def test_update_wrong_qclass(self):
321         "create update with DNS_QCLASS_NONE"
322         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
323         updates = []
324
325         name = self.get_dns_domain()
326         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
327         updates.append(u)
328
329         self.finish_name_packet(p, updates)
330         (response, response_packet) =\
331             self.dns_transaction_udp(p, host=server_ip)
332         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
333
334     def test_update_prereq_with_non_null_ttl(self):
335         "test update with a non-null TTL"
336         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
337         updates = []
338
339         name = self.get_dns_domain()
340
341         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
342         updates.append(u)
343         self.finish_name_packet(p, updates)
344
345         prereqs = []
346         r = dns.res_rec()
347         r.name = "%s.%s" % (self.server, self.get_dns_domain())
348         r.rr_type = dns.DNS_QTYPE_TXT
349         r.rr_class = dns.DNS_QCLASS_NONE
350         r.ttl = 1
351         r.length = 0
352         prereqs.append(r)
353
354         p.ancount = len(prereqs)
355         p.answers = prereqs
356
357         try:
358             (response, response_packet) =\
359                 self.dns_transaction_udp(p, host=server_ip)
360             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
361         except socket.timeout:
362             # Windows chooses not to respond to incorrectly formatted queries.
363             # Although this appears to be non-deterministic even for the same
364             # request twice, it also appears to be based on a how poorly the
365             # request is formatted.
366             pass
367
368     def test_update_prereq_with_non_null_length(self):
369         "test update with a non-null length"
370         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
371         updates = []
372
373         name = self.get_dns_domain()
374
375         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
376         updates.append(u)
377         self.finish_name_packet(p, updates)
378
379         prereqs = []
380         r = dns.res_rec()
381         r.name = "%s.%s" % (self.server, self.get_dns_domain())
382         r.rr_type = dns.DNS_QTYPE_TXT
383         r.rr_class = dns.DNS_QCLASS_ANY
384         r.ttl = 0
385         r.length = 1
386         prereqs.append(r)
387
388         p.ancount = len(prereqs)
389         p.answers = prereqs
390
391         (response, response_packet) =\
392             self.dns_transaction_udp(p, host=server_ip)
393         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
394
395     def test_update_prereq_nonexisting_name(self):
396         "test update with a nonexisting name"
397         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
398         updates = []
399
400         name = self.get_dns_domain()
401
402         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
403         updates.append(u)
404         self.finish_name_packet(p, updates)
405
406         prereqs = []
407         r = dns.res_rec()
408         r.name = "idontexist.%s" % self.get_dns_domain()
409         r.rr_type = dns.DNS_QTYPE_TXT
410         r.rr_class = dns.DNS_QCLASS_ANY
411         r.ttl = 0
412         r.length = 0
413         prereqs.append(r)
414
415         p.ancount = len(prereqs)
416         p.answers = prereqs
417
418         (response, response_packet) =\
419             self.dns_transaction_udp(p, host=server_ip)
420         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
421
422     def test_update_add_txt_record(self):
423         "test adding records works"
424         prefix, txt = 'textrec', ['"This is a test"']
425         p = self.make_txt_update(prefix, txt)
426         (response, response_packet) =\
427             self.dns_transaction_udp(p, host=server_ip)
428         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
429         self.check_query_txt(prefix, txt)
430
431     def test_delete_record(self):
432         "Test if deleting records works"
433
434         NAME = "deleterec.%s" % self.get_dns_domain()
435
436         # First, create a record to make sure we have a record to delete.
437         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
438         updates = []
439
440         name = self.get_dns_domain()
441
442         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
443         updates.append(u)
444         self.finish_name_packet(p, updates)
445
446         updates = []
447         r = dns.res_rec()
448         r.name = NAME
449         r.rr_type = dns.DNS_QTYPE_TXT
450         r.rr_class = dns.DNS_QCLASS_IN
451         r.ttl = 900
452         r.length = 0xffff
453         rdata = self.make_txt_record(['"This is a test"'])
454         r.rdata = rdata
455         updates.append(r)
456         p.nscount = len(updates)
457         p.nsrecs = updates
458
459         (response, response_packet) =\
460             self.dns_transaction_udp(p, host=server_ip)
461         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
462
463         # Now check the record is around
464         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
465         questions = []
466         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
467         questions.append(q)
468
469         self.finish_name_packet(p, questions)
470         (response, response_packet) =\
471             self.dns_transaction_udp(p, host=server_ip)
472         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
473
474         # Now delete the record
475         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
476         updates = []
477
478         name = self.get_dns_domain()
479
480         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
481         updates.append(u)
482         self.finish_name_packet(p, updates)
483
484         updates = []
485         r = dns.res_rec()
486         r.name = NAME
487         r.rr_type = dns.DNS_QTYPE_TXT
488         r.rr_class = dns.DNS_QCLASS_NONE
489         r.ttl = 0
490         r.length = 0xffff
491         rdata = self.make_txt_record(['"This is a test"'])
492         r.rdata = rdata
493         updates.append(r)
494         p.nscount = len(updates)
495         p.nsrecs = updates
496
497         (response, response_packet) =\
498             self.dns_transaction_udp(p, host=server_ip)
499         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
500
501         # And finally check it's gone
502         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
503         questions = []
504
505         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
506         questions.append(q)
507
508         self.finish_name_packet(p, questions)
509         (response, response_packet) =\
510             self.dns_transaction_udp(p, host=server_ip)
511         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
512
513     def test_readd_record(self):
514         "Test if adding, deleting and then readding a records works"
515
516         NAME = "readdrec.%s" % self.get_dns_domain()
517
518         # Create the record
519         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
520         updates = []
521
522         name = self.get_dns_domain()
523
524         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
525         updates.append(u)
526         self.finish_name_packet(p, updates)
527
528         updates = []
529         r = dns.res_rec()
530         r.name = NAME
531         r.rr_type = dns.DNS_QTYPE_TXT
532         r.rr_class = dns.DNS_QCLASS_IN
533         r.ttl = 900
534         r.length = 0xffff
535         rdata = self.make_txt_record(['"This is a test"'])
536         r.rdata = rdata
537         updates.append(r)
538         p.nscount = len(updates)
539         p.nsrecs = updates
540
541         (response, response_packet) =\
542             self.dns_transaction_udp(p, host=server_ip)
543         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
544
545         # Now check the record is around
546         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
547         questions = []
548         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
549         questions.append(q)
550
551         self.finish_name_packet(p, questions)
552         (response, response_packet) =\
553             self.dns_transaction_udp(p, host=server_ip)
554         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
555
556         # Now delete the record
557         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
558         updates = []
559
560         name = self.get_dns_domain()
561
562         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
563         updates.append(u)
564         self.finish_name_packet(p, updates)
565
566         updates = []
567         r = dns.res_rec()
568         r.name = NAME
569         r.rr_type = dns.DNS_QTYPE_TXT
570         r.rr_class = dns.DNS_QCLASS_NONE
571         r.ttl = 0
572         r.length = 0xffff
573         rdata = self.make_txt_record(['"This is a test"'])
574         r.rdata = rdata
575         updates.append(r)
576         p.nscount = len(updates)
577         p.nsrecs = updates
578
579         (response, response_packet) =\
580             self.dns_transaction_udp(p, host=server_ip)
581         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
582
583         # check it's gone
584         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
585         questions = []
586
587         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
588         questions.append(q)
589
590         self.finish_name_packet(p, questions)
591         (response, response_packet) =\
592             self.dns_transaction_udp(p, host=server_ip)
593         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
594
595         # recreate the record
596         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
597         updates = []
598
599         name = self.get_dns_domain()
600
601         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
602         updates.append(u)
603         self.finish_name_packet(p, updates)
604
605         updates = []
606         r = dns.res_rec()
607         r.name = NAME
608         r.rr_type = dns.DNS_QTYPE_TXT
609         r.rr_class = dns.DNS_QCLASS_IN
610         r.ttl = 900
611         r.length = 0xffff
612         rdata = self.make_txt_record(['"This is a test"'])
613         r.rdata = rdata
614         updates.append(r)
615         p.nscount = len(updates)
616         p.nsrecs = updates
617
618         (response, response_packet) =\
619             self.dns_transaction_udp(p, host=server_ip)
620         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
621
622         # Now check the record is around
623         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
624         questions = []
625         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
626         questions.append(q)
627
628         self.finish_name_packet(p, questions)
629         (response, response_packet) =\
630             self.dns_transaction_udp(p, host=server_ip)
631         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
632
633     def test_update_add_mx_record(self):
634         "test adding MX records works"
635         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
636         updates = []
637
638         name = self.get_dns_domain()
639
640         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
641         updates.append(u)
642         self.finish_name_packet(p, updates)
643
644         updates = []
645         r = dns.res_rec()
646         r.name = "%s" % self.get_dns_domain()
647         r.rr_type = dns.DNS_QTYPE_MX
648         r.rr_class = dns.DNS_QCLASS_IN
649         r.ttl = 900
650         r.length = 0xffff
651         rdata = dns.mx_record()
652         rdata.preference = 10
653         rdata.exchange = 'mail.%s' % self.get_dns_domain()
654         r.rdata = rdata
655         updates.append(r)
656         p.nscount = len(updates)
657         p.nsrecs = updates
658
659         (response, response_packet) =\
660             self.dns_transaction_udp(p, host=server_ip)
661         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
662
663         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
664         questions = []
665
666         name = "%s" % self.get_dns_domain()
667         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
668         questions.append(q)
669
670         self.finish_name_packet(p, questions)
671         (response, response_packet) =\
672             self.dns_transaction_udp(p, host=server_ip)
673         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
674         self.assertEqual(response.ancount, 1)
675         ans = response.answers[0]
676         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
677         self.assertEqual(ans.rdata.preference, 10)
678         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
679
680
681 class TestComplexQueries(DNSTest):
682     def make_dns_update(self, key, value, qtype):
683         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
684
685         name = self.get_dns_domain()
686         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
687         self.finish_name_packet(p, [u])
688
689         r = dns.res_rec()
690         r.name = key
691         r.rr_type = qtype
692         r.rr_class = dns.DNS_QCLASS_IN
693         r.ttl = 900
694         r.length = 0xffff
695         r.rdata = value
696         p.nscount = 1
697         p.nsrecs = [r]
698         (response, response_packet) =\
699             self.dns_transaction_udp(p, host=server_ip)
700         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
701
702     def setUp(self):
703         super(TestComplexQueries, self).setUp()
704
705         global server, server_ip, lp, creds, timeout
706         self.server = server_name
707         self.server_ip = server_ip
708         self.lp = lp
709         self.creds = creds
710         self.timeout = timeout
711
712     def test_one_a_query(self):
713         "create a query packet containing one query record"
714
715         try:
716
717             # Create the record
718             name = "cname_test.%s" % self.get_dns_domain()
719             rdata = "%s.%s" % (self.server, self.get_dns_domain())
720             self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
721
722             p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
723             questions = []
724
725             # Check the record
726             name = "cname_test.%s" % self.get_dns_domain()
727             q = self.make_name_question(name,
728                                         dns.DNS_QTYPE_A,
729                                         dns.DNS_QCLASS_IN)
730             print("asking for ", q.name)
731             questions.append(q)
732
733             self.finish_name_packet(p, questions)
734             (response, response_packet) =\
735                 self.dns_transaction_udp(p, host=self.server_ip)
736             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
737             self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
738             self.assertEquals(response.ancount, 2)
739             self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
740             self.assertEquals(response.answers[0].rdata, "%s.%s" %
741                               (self.server, self.get_dns_domain()))
742             self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
743             self.assertEquals(response.answers[1].rdata,
744                               self.server_ip)
745
746         finally:
747             # Delete the record
748             p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
749             updates = []
750
751             name = self.get_dns_domain()
752
753             u = self.make_name_question(name,
754                                         dns.DNS_QTYPE_SOA,
755                                         dns.DNS_QCLASS_IN)
756             updates.append(u)
757             self.finish_name_packet(p, updates)
758
759             updates = []
760             r = dns.res_rec()
761             r.name = "cname_test.%s" % self.get_dns_domain()
762             r.rr_type = dns.DNS_QTYPE_CNAME
763             r.rr_class = dns.DNS_QCLASS_NONE
764             r.ttl = 0
765             r.length = 0xffff
766             r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
767             updates.append(r)
768             p.nscount = len(updates)
769             p.nsrecs = updates
770
771             (response, response_packet) =\
772                 self.dns_transaction_udp(p, host=self.server_ip)
773             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
774
775     def test_cname_two_chain(self):
776         name0 = "cnamechain0.%s" % self.get_dns_domain()
777         name1 = "cnamechain1.%s" % self.get_dns_domain()
778         name2 = "cnamechain2.%s" % self.get_dns_domain()
779         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
780         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
781         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
782
783         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
784         questions = []
785         q = self.make_name_question(name1, dns.DNS_QTYPE_A,
786                                     dns.DNS_QCLASS_IN)
787         questions.append(q)
788
789         self.finish_name_packet(p, questions)
790         (response, response_packet) =\
791             self.dns_transaction_udp(p, host=server_ip)
792         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
793         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
794         self.assertEquals(response.ancount, 3)
795
796         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
797         self.assertEquals(response.answers[0].name, name1)
798         self.assertEquals(response.answers[0].rdata, name2)
799
800         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
801         self.assertEquals(response.answers[1].name, name2)
802         self.assertEquals(response.answers[1].rdata, name0)
803
804         self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
805         self.assertEquals(response.answers[2].rdata,
806                           self.server_ip)
807
808     def test_invalid_empty_cname(self):
809         name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
810         try:
811             self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
812         except AssertionError:
813             pass
814         else:
815             self.fail("Successfully added empty CNAME, which is invalid.")
816
817     def test_cname_two_chain_not_matching_qtype(self):
818         name0 = "cnamechain0.%s" % self.get_dns_domain()
819         name1 = "cnamechain1.%s" % self.get_dns_domain()
820         name2 = "cnamechain2.%s" % self.get_dns_domain()
821         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
822         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
823         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
824
825         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
826         questions = []
827         q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
828                                     dns.DNS_QCLASS_IN)
829         questions.append(q)
830
831         self.finish_name_packet(p, questions)
832         (response, response_packet) =\
833             self.dns_transaction_udp(p, host=server_ip)
834         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
835         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
836
837         # CNAME should return all intermediate results!
838         # Only the A records exists, not the TXT.
839         self.assertEquals(response.ancount, 2)
840
841         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
842         self.assertEquals(response.answers[0].name, name1)
843         self.assertEquals(response.answers[0].rdata, name2)
844
845         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
846         self.assertEquals(response.answers[1].name, name2)
847         self.assertEquals(response.answers[1].rdata, name0)
848
849     def test_cname_loop(self):
850         cname1 = "cnamelooptestrec." + self.get_dns_domain()
851         cname2 = "cnamelooptestrec2." + self.get_dns_domain()
852         cname3 = "cnamelooptestrec3." + self.get_dns_domain()
853         self.make_dns_update(cname1, cname2, dnsp.DNS_TYPE_CNAME)
854         self.make_dns_update(cname2, cname3, dnsp.DNS_TYPE_CNAME)
855         self.make_dns_update(cname3, cname1, dnsp.DNS_TYPE_CNAME)
856
857         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
858         questions = []
859
860         q = self.make_name_question(cname1,
861                                     dns.DNS_QTYPE_A,
862                                     dns.DNS_QCLASS_IN)
863         questions.append(q)
864         self.finish_name_packet(p, questions)
865
866         (response, response_packet) =\
867             self.dns_transaction_udp(p, host=self.server_ip)
868
869         max_recursion_depth = 20
870         self.assertEquals(len(response.answers), max_recursion_depth)
871
872 class TestInvalidQueries(DNSTest):
873     def setUp(self):
874         super(TestInvalidQueries, self).setUp()
875         global server, server_ip, lp, creds, timeout
876         self.server = server_name
877         self.server_ip = server_ip
878         self.lp = lp
879         self.creds = creds
880         self.timeout = timeout
881
882     def test_one_a_query(self):
883         """send 0 bytes follows by create a query packet
884            containing one query record"""
885
886         s = None
887         try:
888             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
889             s.connect((self.server_ip, 53))
890             s.send("", 0)
891         finally:
892             if s is not None:
893                 s.close()
894
895         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
896         questions = []
897
898         name = "%s.%s" % (self.server, self.get_dns_domain())
899         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
900         print("asking for ", q.name)
901         questions.append(q)
902
903         self.finish_name_packet(p, questions)
904         (response, response_packet) =\
905             self.dns_transaction_udp(p, host=self.server_ip)
906         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
907         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
908         self.assertEquals(response.ancount, 1)
909         self.assertEquals(response.answers[0].rdata,
910                           self.server_ip)
911
912     def test_one_a_reply(self):
913         "send a reply instead of a query"
914         global timeout
915
916         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
917         questions = []
918
919         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
920         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
921         print("asking for ", q.name)
922         questions.append(q)
923
924         self.finish_name_packet(p, questions)
925         p.operation |= dns.DNS_FLAG_REPLY
926         s = None
927         try:
928             send_packet = ndr.ndr_pack(p)
929             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
930             s.settimeout(timeout)
931             host = self.server_ip
932             s.connect((host, 53))
933             tcp_packet = struct.pack('!H', len(send_packet))
934             tcp_packet += send_packet
935             s.send(tcp_packet, 0)
936             recv_packet = s.recv(0xffff + 2, 0)
937             self.assertEquals(0, len(recv_packet))
938         except socket.timeout:
939             # Windows chooses not to respond to incorrectly formatted queries.
940             # Although this appears to be non-deterministic even for the same
941             # request twice, it also appears to be based on a how poorly the
942             # request is formatted.
943             pass
944         finally:
945             if s is not None:
946                 s.close()
947
948
949 class TestZones(DNSTest):
950     def setUp(self):
951         super(TestZones, self).setUp()
952         global server, server_ip, lp, creds, timeout
953         self.server = server_name
954         self.server_ip = server_ip
955         self.lp = lp
956         self.creds = creds
957         self.timeout = timeout
958
959         self.zone = "test.lan"
960         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
961                                             (self.server_ip),
962                                             self.lp, self.creds)
963
964         self.samdb = SamDB(url="ldap://" + self.server_ip,
965                            lp=self.get_loadparm(),
966                            session_info=system_session(),
967                            credentials=self.creds)
968         self.zone_dn = "DC=" + self.zone +\
969                        ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
970                        str(self.samdb.get_default_basedn())
971
972     def tearDown(self):
973         super(TestZones, self).tearDown()
974
975         try:
976             self.delete_zone(self.zone)
977         except RuntimeError as e:
978             (num, string) = e.args
979             if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
980                 raise
981
982     def create_zone(self, zone, aging_enabled=False):
983         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
984         zone_create.pszZoneName = zone
985         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
986         zone_create.fAging = int(aging_enabled)
987         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
988         zone_create.fDsIntegrated = 1
989         zone_create.fLoadExisting = 1
990         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
991         try:
992             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
993             self.rpc_conn.DnssrvOperation2(client_version,
994                                            0,
995                                            self.server_ip,
996                                            None,
997                                            0,
998                                            'ZoneCreate',
999                                            dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
1000                                            zone_create)
1001         except WERRORError as e:
1002             self.fail(str(e))
1003
1004     def set_params(self, **kwargs):
1005         zone = kwargs.pop('zone', None)
1006         for key, val in kwargs.items():
1007             name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
1008             name_param.dwParam = val
1009             name_param.pszNodeName = key
1010
1011             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1012             nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
1013             try:
1014                 self.rpc_conn.DnssrvOperation2(client_version,
1015                                                0,
1016                                                self.server,
1017                                                zone,
1018                                                0,
1019                                                'ResetDwordProperty',
1020                                                nap_type,
1021                                                name_param)
1022             except WERRORError as e:
1023                 self.fail(str(e))
1024
1025     def ldap_modify_dnsrecs(self, name, func):
1026         dn = 'DC={0},{1}'.format(name, self.zone_dn)
1027         dns_recs = self.ldap_get_dns_records(name)
1028         for rec in dns_recs:
1029             func(rec)
1030         update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1031         self.samdb.modify(ldb.Message.from_dict(self.samdb,
1032                                                 update_dict,
1033                                                 ldb.FLAG_MOD_REPLACE))
1034
1035     def dns_update_record(self, prefix, txt):
1036         p = self.make_txt_update(prefix, txt, self.zone)
1037         (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1038         self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1039         recs = self.ldap_get_dns_records(prefix)
1040         recs = [r for r in recs if r.data.str == txt]
1041         self.assertEqual(len(recs), 1)
1042         return recs[0]
1043
1044     def dns_tombstone(self, prefix, txt, zone):
1045         name = prefix + "." + zone
1046
1047         to = dnsp.DnssrvRpcRecord()
1048         to.dwTimeStamp = 1000
1049         to.wType = dnsp.DNS_TYPE_TOMBSTONE
1050
1051         self.samdb.dns_replace(name, [to])
1052
1053     def ldap_get_records(self, name):
1054         # The use of SCOPE_SUBTREE here avoids raising an exception in the
1055         # 0 results case for a test below.
1056
1057         expr = "(&(objectClass=dnsNode)(name={0}))".format(name)
1058         return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1059                                  expression=expr, attrs=["*"])
1060
1061     def ldap_get_dns_records(self, name):
1062         records = self.ldap_get_records(name)
1063         return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1064                 for r in records[0].get('dnsRecord')]
1065
1066     def ldap_get_zone_settings(self):
1067         records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1068                                     expression="(&(objectClass=dnsZone)" +
1069                                     "(name={0}))".format(self.zone),
1070                                     attrs=["dNSProperty"])
1071         self.assertEqual(len(records), 1)
1072         props = [ndr_unpack(dnsp.DnsProperty, r)
1073                  for r in records[0].get('dNSProperty')]
1074
1075         # We have no choice but to repeat these here.
1076         zone_prop_ids = {0x00: "EMPTY",
1077                          0x01: "TYPE",
1078                          0x02: "ALLOW_UPDATE",
1079                          0x08: "SECURE_TIME",
1080                          0x10: "NOREFRESH_INTERVAL",
1081                          0x11: "SCAVENGING_SERVERS",
1082                          0x12: "AGING_ENABLED_TIME",
1083                          0x20: "REFRESH_INTERVAL",
1084                          0x40: "AGING_STATE",
1085                          0x80: "DELETED_FROM_HOSTNAME",
1086                          0x81: "MASTER_SERVERS",
1087                          0x82: "AUTO_NS_SERVERS",
1088                          0x83: "DCPROMO_CONVERT",
1089                          0x90: "SCAVENGING_SERVERS_DA",
1090                          0x91: "MASTER_SERVERS_DA",
1091                          0x92: "NS_SERVERS_DA",
1092                          0x100: "NODE_DBFLAGS"}
1093         return {zone_prop_ids[p.id].lower(): p.data for p in props}
1094
1095     def set_aging(self, enable=False):
1096         self.create_zone(self.zone, aging_enabled=enable)
1097         self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1098                         Aging=int(bool(enable)), zone=self.zone,
1099                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1100
1101     def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1102         self.set_aging(enable=True)
1103         settings = self.ldap_get_zone_settings()
1104         self.assertTrue(settings['aging_state'] is not None)
1105         self.assertTrue(settings['aging_state'])
1106
1107         rec = self.dns_update_record('agingtest', ['test txt'])
1108         self.assertNotEqual(rec.dwTimeStamp, 0)
1109
1110     def test_set_aging_disabled(self):
1111         self.set_aging(enable=False)
1112         settings = self.ldap_get_zone_settings()
1113         self.assertTrue(settings['aging_state'] is not None)
1114         self.assertFalse(settings['aging_state'])
1115
1116         rec = self.dns_update_record('agingtest', ['test txt'])
1117         self.assertNotEqual(rec.dwTimeStamp, 0)
1118
1119     def test_aging_update(self, enable=True):
1120         name, txt = 'agingtest', ['test txt']
1121         self.set_aging(enable=True)
1122         before_mod = self.dns_update_record(name, txt)
1123         if not enable:
1124             self.set_params(zone=self.zone, Aging=0)
1125         dec = 2
1126
1127         def mod_ts(rec):
1128             self.assertTrue(rec.dwTimeStamp > 0)
1129             rec.dwTimeStamp -= dec
1130         self.ldap_modify_dnsrecs(name, mod_ts)
1131         after_mod = self.ldap_get_dns_records(name)
1132         self.assertEqual(len(after_mod), 1)
1133         after_mod = after_mod[0]
1134         self.assertEqual(after_mod.dwTimeStamp,
1135                          before_mod.dwTimeStamp - dec)
1136         after_update = self.dns_update_record(name, txt)
1137         after_should_equal = before_mod if enable else after_mod
1138         self.assertEqual(after_should_equal.dwTimeStamp,
1139                          after_update.dwTimeStamp)
1140
1141     def test_aging_update_disabled(self):
1142         self.test_aging_update(enable=False)
1143
1144     def test_aging_refresh(self):
1145         name, txt = 'agingtest', ['test txt']
1146         self.create_zone(self.zone, aging_enabled=True)
1147         interval = 10
1148         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1149                         Aging=1, zone=self.zone,
1150                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1151         before_mod = self.dns_update_record(name, txt)
1152
1153         def mod_ts(rec):
1154             self.assertTrue(rec.dwTimeStamp > 0)
1155             rec.dwTimeStamp -= interval / 2
1156         self.ldap_modify_dnsrecs(name, mod_ts)
1157         update_during_norefresh = self.dns_update_record(name, txt)
1158
1159         def mod_ts(rec):
1160             self.assertTrue(rec.dwTimeStamp > 0)
1161             rec.dwTimeStamp -= interval + interval / 2
1162         self.ldap_modify_dnsrecs(name, mod_ts)
1163         update_during_refresh = self.dns_update_record(name, txt)
1164         self.assertEqual(update_during_norefresh.dwTimeStamp,
1165                          before_mod.dwTimeStamp - interval / 2)
1166         self.assertEqual(update_during_refresh.dwTimeStamp,
1167                          before_mod.dwTimeStamp)
1168
1169     def test_rpc_add_no_timestamp(self):
1170         name, txt = 'agingtest', ['test txt']
1171         self.set_aging(enable=True)
1172         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1173         rec_buf.rec = TXTRecord(txt)
1174         self.rpc_conn.DnssrvUpdateRecord2(
1175             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1176             0,
1177             self.server_ip,
1178             self.zone,
1179             name,
1180             rec_buf,
1181             None)
1182         recs = self.ldap_get_dns_records(name)
1183         self.assertEqual(len(recs), 1)
1184         self.assertEqual(recs[0].dwTimeStamp, 0)
1185
1186     def test_static_record_dynamic_update(self):
1187         name, txt = 'agingtest', ['test txt']
1188         txt2 = ['test txt2']
1189         self.set_aging(enable=True)
1190         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1191         rec_buf.rec = TXTRecord(txt)
1192         self.rpc_conn.DnssrvUpdateRecord2(
1193             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1194             0,
1195             self.server_ip,
1196             self.zone,
1197             name,
1198             rec_buf,
1199             None)
1200
1201         rec2 = self.dns_update_record(name, txt2)
1202         self.assertEqual(rec2.dwTimeStamp, 0)
1203
1204     def test_dynamic_record_static_update(self):
1205         name, txt = 'agingtest', ['test txt']
1206         txt2 = ['test txt2']
1207         txt3 = ['test txt3']
1208         self.set_aging(enable=True)
1209
1210         self.dns_update_record(name, txt)
1211
1212         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1213         rec_buf.rec = TXTRecord(txt2)
1214         self.rpc_conn.DnssrvUpdateRecord2(
1215             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1216             0,
1217             self.server_ip,
1218             self.zone,
1219             name,
1220             rec_buf,
1221             None)
1222
1223         self.dns_update_record(name, txt3)
1224
1225         recs = self.ldap_get_dns_records(name)
1226         # Put in dict because ldap recs might be out of order
1227         recs = {str(r.data.str): r for r in recs}
1228         self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1229         self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1230         self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1231
1232     def test_dns_tombstone_custom_match_rule(self):
1233         lp = self.get_loadparm()
1234         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1235                            session_info=system_session(),
1236                            credentials=self.creds)
1237
1238         name, txt = 'agingtest', ['test txt']
1239         name2, txt2 = 'agingtest2', ['test txt2']
1240         name3, txt3 = 'agingtest3', ['test txt3']
1241         name4, txt4 = 'agingtest4', ['test txt4']
1242         name5, txt5 = 'agingtest5', ['test txt5']
1243
1244         self.create_zone(self.zone, aging_enabled=True)
1245         interval = 10
1246         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1247                         Aging=1, zone=self.zone,
1248                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1249
1250         self.dns_update_record(name, txt)
1251
1252         self.dns_update_record(name2, txt)
1253         self.dns_update_record(name2, txt2)
1254
1255         self.dns_update_record(name3, txt)
1256         self.dns_update_record(name3, txt2)
1257         last_update = self.dns_update_record(name3, txt3)
1258
1259         # Modify txt1 of the first 2 names
1260         def mod_ts(rec):
1261             if rec.data.str == txt:
1262                 rec.dwTimeStamp -= 2
1263         self.ldap_modify_dnsrecs(name, mod_ts)
1264         self.ldap_modify_dnsrecs(name2, mod_ts)
1265
1266         # create a static dns record.
1267         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1268         rec_buf.rec = TXTRecord(txt4)
1269         self.rpc_conn.DnssrvUpdateRecord2(
1270             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1271             0,
1272             self.server_ip,
1273             self.zone,
1274             name4,
1275             rec_buf,
1276             None)
1277
1278         # Create a tomb stoned record.
1279         self.dns_update_record(name5, txt5)
1280         self.dns_tombstone(name5, txt5, self.zone)
1281
1282         self.ldap_get_dns_records(name3)
1283         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1284         expr = expr.format(int(last_update.dwTimeStamp) - 1)
1285         try:
1286             res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1287                                     expression=expr, attrs=["*"])
1288         except ldb.LdbError as e:
1289             self.fail(str(e))
1290         updated_names = {str(r.get('name')) for r in res}
1291         self.assertEqual(updated_names, set([name, name2]))
1292
1293     def test_dns_tombstone_custom_match_rule_no_records(self):
1294         lp = self.get_loadparm()
1295         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1296                            session_info=system_session(),
1297                            credentials=self.creds)
1298
1299         self.create_zone(self.zone, aging_enabled=True)
1300         interval = 10
1301         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1302                         Aging=1, zone=self.zone,
1303                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1304
1305         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1306         expr = expr.format(1)
1307
1308         try:
1309             res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1310                                     expression=expr, attrs=["*"])
1311         except ldb.LdbError as e:
1312             self.fail(str(e))
1313         self.assertEqual(0, len(res))
1314
1315     def test_dns_tombstone_custom_match_rule_fail(self):
1316         self.create_zone(self.zone, aging_enabled=True)
1317         samdb = SamDB(url=lp.samdb_url(),
1318                       lp=lp,
1319                       session_info=system_session(),
1320                       credentials=self.creds)
1321
1322         # Property name in not dnsRecord
1323         expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1324         res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1325                            expression=expr, attrs=["*"])
1326         self.assertEquals(len(res), 0)
1327
1328         # No value for tombstone time
1329         try:
1330             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1331             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1332                                expression=expr, attrs=["*"])
1333             self.assertEquals(len(res), 0)
1334             self.fail("Exception: ldb.ldbError not generated")
1335         except ldb.LdbError as e:
1336             (num, msg) = e.args
1337             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1338
1339         # Tombstone time = -
1340         try:
1341             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1342             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1343                                expression=expr, attrs=["*"])
1344             self.assertEquals(len(res), 0)
1345             self.fail("Exception: ldb.ldbError not generated")
1346         except ldb.LdbError as e:
1347             (num, _) = e.args
1348             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1349
1350         # Tombstone time longer than 64 characters
1351         try:
1352             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={0})"
1353             expr = expr.format("1" * 65)
1354             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1355                                expression=expr, attrs=["*"])
1356             self.assertEquals(len(res), 0)
1357             self.fail("Exception: ldb.ldbError not generated")
1358         except ldb.LdbError as e:
1359             (num, _) = e.args
1360             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1361
1362         # Non numeric Tombstone time
1363         try:
1364             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1365             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1366                                expression=expr, attrs=["*"])
1367             self.assertEquals(len(res), 0)
1368             self.fail("Exception: ldb.ldbError not generated")
1369         except ldb.LdbError as e:
1370             (num, _) = e.args
1371             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1372
1373         # Non system session
1374         try:
1375             db = SamDB(url="ldap://" + self.server_ip,
1376                        lp=self.get_loadparm(),
1377                        credentials=self.creds)
1378
1379             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1380             res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1381                             expression=expr, attrs=["*"])
1382             self.assertEquals(len(res), 0)
1383             self.fail("Exception: ldb.ldbError not generated")
1384         except ldb.LdbError as e:
1385             (num, _) = e.args
1386             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1387
1388     def test_basic_scavenging(self):
1389         lp = self.get_loadparm()
1390         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1391                            session_info=system_session(),
1392                            credentials=self.creds)
1393
1394         self.create_zone(self.zone, aging_enabled=True)
1395         interval = 1
1396         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1397                         zone=self.zone, Aging=1,
1398                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1399         name, txt = 'agingtest', ['test txt']
1400         name2, txt2 = 'agingtest2', ['test txt2']
1401         name3, txt3 = 'agingtest3', ['test txt3']
1402         self.dns_update_record(name, txt)
1403         self.dns_update_record(name2, txt)
1404         self.dns_update_record(name2, txt2)
1405         self.dns_update_record(name3, txt)
1406         self.dns_update_record(name3, txt2)
1407         last_add = self.dns_update_record(name3, txt3)
1408
1409         def mod_ts(rec):
1410             self.assertTrue(rec.dwTimeStamp > 0)
1411             if rec.data.str == txt:
1412                 rec.dwTimeStamp -= interval * 5
1413         self.ldap_modify_dnsrecs(name, mod_ts)
1414         self.ldap_modify_dnsrecs(name2, mod_ts)
1415         self.ldap_modify_dnsrecs(name3, mod_ts)
1416         self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1417         dsdb._scavenge_dns_records(self.samdb)
1418
1419         recs = self.ldap_get_dns_records(name)
1420         self.assertEqual(len(recs), 1)
1421         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1422
1423         recs = self.ldap_get_dns_records(name2)
1424         self.assertEqual(len(recs), 1)
1425         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1426         self.assertEqual(recs[0].data.str, txt2)
1427
1428         recs = self.ldap_get_dns_records(name3)
1429         self.assertEqual(len(recs), 2)
1430         txts = {str(r.data.str) for r in recs}
1431         self.assertEqual(txts, {str(txt2), str(txt3)})
1432         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1433         self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1434
1435         for make_it_work in [False, True]:
1436             inc = -1 if make_it_work else 1
1437
1438             def mod_ts(rec):
1439                 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1440             self.ldap_modify_dnsrecs(name, mod_ts)
1441             dsdb._dns_delete_tombstones(self.samdb)
1442             recs = self.ldap_get_records(name)
1443             if make_it_work:
1444                 self.assertEqual(len(recs), 0)
1445             else:
1446                 self.assertEqual(len(recs), 1)
1447
1448     def delete_zone(self, zone):
1449         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1450                                        0,
1451                                        self.server_ip,
1452                                        zone,
1453                                        0,
1454                                        'DeleteZoneFromDs',
1455                                        dnsserver.DNSSRV_TYPEID_NULL,
1456                                        None)
1457
1458     def test_soa_query(self):
1459         zone = "test.lan"
1460         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1461         questions = []
1462
1463         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1464         questions.append(q)
1465         self.finish_name_packet(p, questions)
1466
1467         (response, response_packet) =\
1468             self.dns_transaction_udp(p, host=server_ip)
1469         # Windows returns OK while BIND logically seems to return NXDOMAIN
1470         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1471         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1472         self.assertEquals(response.ancount, 0)
1473
1474         self.create_zone(zone)
1475         (response, response_packet) =\
1476             self.dns_transaction_udp(p, host=server_ip)
1477         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1478         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1479         self.assertEquals(response.ancount, 1)
1480         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1481
1482         self.delete_zone(zone)
1483         (response, response_packet) =\
1484             self.dns_transaction_udp(p, host=server_ip)
1485         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1486         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1487         self.assertEquals(response.ancount, 0)
1488
1489
1490 class TestRPCRoundtrip(DNSTest):
1491     def setUp(self):
1492         super(TestRPCRoundtrip, self).setUp()
1493         global server, server_ip, lp, creds
1494         self.server = server_name
1495         self.server_ip = server_ip
1496         self.lp = lp
1497         self.creds = creds
1498         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1499                                             (self.server_ip),
1500                                             self.lp,
1501                                             self.creds)
1502
1503     def tearDown(self):
1504         super(TestRPCRoundtrip, self).tearDown()
1505
1506     def rpc_update(self, fqn=None, data=None, wType=None, delete=False):
1507         fqn = fqn or ("rpctestrec." + self.get_dns_domain())
1508
1509         rec = data_to_dns_record(wType, data)
1510         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1511         add_rec_buf.rec = rec
1512
1513         add_arg = add_rec_buf
1514         del_arg = None
1515         if delete:
1516             add_arg = None
1517             del_arg = add_rec_buf
1518
1519         self.rpc_conn.DnssrvUpdateRecord2(
1520             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1521             0,
1522             self.server_ip,
1523             self.get_dns_domain(),
1524             fqn,
1525             add_arg,
1526             del_arg)
1527
1528     def test_rpc_self_referencing_cname(self):
1529         cname = "cnametest2_unqual_rec_loop"
1530         cname_fqn = "%s.%s" % (cname, self.get_dns_domain())
1531
1532         try:
1533             self.rpc_update(fqn=cname, data=cname_fqn,
1534                             wType=dnsp.DNS_TYPE_CNAME, delete=True)
1535         except WERRORError as e:
1536             if e.args[0] != werror.WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST:
1537                 self.fail("RPC DNS gaven wrong error on pre-test cleanup "
1538                           "for self referencing CNAME: %s" % e.args[0])
1539
1540         try:
1541             self.rpc_update(fqn=cname, wType=dnsp.DNS_TYPE_CNAME, data=cname_fqn)
1542         except WERRORError as e:
1543             if e.args[0] != werror.WERR_DNS_ERROR_CNAME_LOOP:
1544                 self.fail("RPC DNS gaven wrong error on insertion of "
1545                           "self referencing CNAME: %s" % e.args[0])
1546             return
1547
1548         self.fail("RPC DNS allowed insertion of self referencing CNAME")
1549
1550     def test_update_add_txt_rpc_to_dns(self):
1551         prefix, txt = 'rpctextrec', ['"This is a test"']
1552
1553         name = "%s.%s" % (prefix, self.get_dns_domain())
1554
1555         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1556         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1557         add_rec_buf.rec = rec
1558         try:
1559             self.rpc_conn.DnssrvUpdateRecord2(
1560                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1561                 0,
1562                 self.server_ip,
1563                 self.get_dns_domain(),
1564                 name,
1565                 add_rec_buf,
1566                 None)
1567
1568         except WERRORError as e:
1569             self.fail(str(e))
1570
1571         try:
1572             self.check_query_txt(prefix, txt)
1573         finally:
1574             self.rpc_conn.DnssrvUpdateRecord2(
1575                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1576                 0,
1577                 self.server_ip,
1578                 self.get_dns_domain(),
1579                 name,
1580                 None,
1581                 add_rec_buf)
1582
1583     def test_update_add_null_padded_txt_record(self):
1584         "test adding records works"
1585         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1586         p = self.make_txt_update(prefix, txt)
1587         (response, response_packet) =\
1588             self.dns_transaction_udp(p, host=server_ip)
1589         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1590         self.check_query_txt(prefix, txt)
1591         self.assertIsNotNone(
1592             dns_record_match(self.rpc_conn,
1593                              self.server_ip,
1594                              self.get_dns_domain(),
1595                              "%s.%s" % (prefix, self.get_dns_domain()),
1596                              dnsp.DNS_TYPE_TXT,
1597                              '"\\"This is a test\\"" "" ""'))
1598
1599         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1600         p = self.make_txt_update(prefix, txt)
1601         (response, response_packet) =\
1602             self.dns_transaction_udp(p, host=server_ip)
1603         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1604         self.check_query_txt(prefix, txt)
1605         self.assertIsNotNone(
1606             dns_record_match(
1607                 self.rpc_conn,
1608                 self.server_ip,
1609                 self.get_dns_domain(),
1610                 "%s.%s" % (prefix, self.get_dns_domain()),
1611                 dnsp.DNS_TYPE_TXT,
1612                 '"\\"This is a test\\"" "" "" "more text"'))
1613
1614         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1615         p = self.make_txt_update(prefix, txt)
1616         (response, response_packet) =\
1617             self.dns_transaction_udp(p, host=server_ip)
1618         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1619         self.check_query_txt(prefix, txt)
1620         self.assertIsNotNone(
1621             dns_record_match(
1622                 self.rpc_conn,
1623                 self.server_ip,
1624                 self.get_dns_domain(),
1625                 "%s.%s" % (prefix, self.get_dns_domain()),
1626                 dnsp.DNS_TYPE_TXT,
1627                 '"" "" "\\"This is a test\\""'))
1628
1629     def test_update_add_padding_rpc_to_dns(self):
1630         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1631         prefix = 'rpc' + prefix
1632         name = "%s.%s" % (prefix, self.get_dns_domain())
1633
1634         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1635                                  '"\\"This is a test\\"" "" ""')
1636         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1637         add_rec_buf.rec = rec
1638         try:
1639             self.rpc_conn.DnssrvUpdateRecord2(
1640                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1641                 0,
1642                 self.server_ip,
1643                 self.get_dns_domain(),
1644                 name,
1645                 add_rec_buf,
1646                 None)
1647
1648         except WERRORError as e:
1649             self.fail(str(e))
1650
1651         try:
1652             self.check_query_txt(prefix, txt)
1653         finally:
1654             self.rpc_conn.DnssrvUpdateRecord2(
1655                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1656                 0,
1657                 self.server_ip,
1658                 self.get_dns_domain(),
1659                 name,
1660                 None,
1661                 add_rec_buf)
1662
1663         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1664         prefix = 'rpc' + prefix
1665         name = "%s.%s" % (prefix, self.get_dns_domain())
1666
1667         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1668                                  '"\\"This is a test\\"" "" "" "more text"')
1669         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1670         add_rec_buf.rec = rec
1671         try:
1672             self.rpc_conn.DnssrvUpdateRecord2(
1673                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1674                 0,
1675                 self.server_ip,
1676                 self.get_dns_domain(),
1677                 name,
1678                 add_rec_buf,
1679                 None)
1680
1681         except WERRORError as e:
1682             self.fail(str(e))
1683
1684         try:
1685             self.check_query_txt(prefix, txt)
1686         finally:
1687             self.rpc_conn.DnssrvUpdateRecord2(
1688                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1689                 0,
1690                 self.server_ip,
1691                 self.get_dns_domain(),
1692                 name,
1693                 None,
1694                 add_rec_buf)
1695
1696         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1697         prefix = 'rpc' + prefix
1698         name = "%s.%s" % (prefix, self.get_dns_domain())
1699
1700         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1701                                  '"" "" "\\"This is a test\\""')
1702         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1703         add_rec_buf.rec = rec
1704         try:
1705             self.rpc_conn.DnssrvUpdateRecord2(
1706                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1707                 0,
1708                 self.server_ip,
1709                 self.get_dns_domain(),
1710                 name,
1711                 add_rec_buf,
1712                 None)
1713         except WERRORError as e:
1714             self.fail(str(e))
1715
1716         try:
1717             self.check_query_txt(prefix, txt)
1718         finally:
1719             self.rpc_conn.DnssrvUpdateRecord2(
1720                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1721                 0,
1722                 self.server_ip,
1723                 self.get_dns_domain(),
1724                 name,
1725                 None,
1726                 add_rec_buf)
1727
1728     # Test is incomplete due to strlen against txt records
1729     def test_update_add_null_char_txt_record(self):
1730         "test adding records works"
1731         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1732         p = self.make_txt_update(prefix, txt)
1733         (response, response_packet) =\
1734             self.dns_transaction_udp(p, host=server_ip)
1735         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1736         self.check_query_txt(prefix, ['NULL'])
1737         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1738                                               self.get_dns_domain(),
1739                                               "%s.%s" % (prefix, self.get_dns_domain()),
1740                                               dnsp.DNS_TYPE_TXT, '"NULL"'))
1741
1742         prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1743         p = self.make_txt_update(prefix, txt)
1744         (response, response_packet) =\
1745             self.dns_transaction_udp(p, host=server_ip)
1746         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1747         self.check_query_txt(prefix, ['NULL', 'NULL'])
1748         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1749                                               self.get_dns_domain(),
1750                                               "%s.%s" % (prefix, self.get_dns_domain()),
1751                                               dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1752
1753     def test_update_add_null_char_rpc_to_dns(self):
1754         prefix = 'rpcnulltextrec'
1755         name = "%s.%s" % (prefix, self.get_dns_domain())
1756
1757         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1758         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1759         add_rec_buf.rec = rec
1760         try:
1761             self.rpc_conn.DnssrvUpdateRecord2(
1762                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1763                 0,
1764                 self.server_ip,
1765                 self.get_dns_domain(),
1766                 name,
1767                 add_rec_buf,
1768                 None)
1769
1770         except WERRORError as e:
1771             self.fail(str(e))
1772
1773         try:
1774             self.check_query_txt(prefix, ['NULL'])
1775         finally:
1776             self.rpc_conn.DnssrvUpdateRecord2(
1777                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1778                 0,
1779                 self.server_ip,
1780                 self.get_dns_domain(),
1781                 name,
1782                 None,
1783                 add_rec_buf)
1784
1785     def test_update_add_hex_char_txt_record(self):
1786         "test adding records works"
1787         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1788         p = self.make_txt_update(prefix, txt)
1789         (response, response_packet) =\
1790             self.dns_transaction_udp(p, host=server_ip)
1791         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1792         self.check_query_txt(prefix, txt)
1793         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1794                                               self.get_dns_domain(),
1795                                               "%s.%s" % (prefix, self.get_dns_domain()),
1796                                               dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1797
1798     def test_update_add_hex_rpc_to_dns(self):
1799         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1800         prefix = 'rpc' + prefix
1801         name = "%s.%s" % (prefix, self.get_dns_domain())
1802
1803         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1804         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1805         add_rec_buf.rec = rec
1806         try:
1807             self.rpc_conn.DnssrvUpdateRecord2(
1808                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1809                 0,
1810                 self.server_ip,
1811                 self.get_dns_domain(),
1812                 name,
1813                 add_rec_buf,
1814                 None)
1815
1816         except WERRORError as e:
1817             self.fail(str(e))
1818
1819         try:
1820             self.check_query_txt(prefix, txt)
1821         finally:
1822             self.rpc_conn.DnssrvUpdateRecord2(
1823                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1824                 0,
1825                 self.server_ip,
1826                 self.get_dns_domain(),
1827                 name,
1828                 None,
1829                 add_rec_buf)
1830
1831     def test_update_add_slash_txt_record(self):
1832         "test adding records works"
1833         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1834         p = self.make_txt_update(prefix, txt)
1835         (response, response_packet) =\
1836             self.dns_transaction_udp(p, host=server_ip)
1837         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1838         self.check_query_txt(prefix, txt)
1839         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1840                                               self.get_dns_domain(),
1841                                               "%s.%s" % (prefix, self.get_dns_domain()),
1842                                               dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1843
1844     # This test fails against Windows as it eliminates slashes in RPC
1845     # One typical use for a slash is in records like 'var=value' to
1846     # escape '=' characters.
1847     def test_update_add_slash_rpc_to_dns(self):
1848         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1849         prefix = 'rpc' + prefix
1850         name = "%s.%s" % (prefix, self.get_dns_domain())
1851
1852         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1853         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1854         add_rec_buf.rec = rec
1855         try:
1856             self.rpc_conn.DnssrvUpdateRecord2(
1857                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1858                 0,
1859                 self.server_ip,
1860                 self.get_dns_domain(),
1861                 name,
1862                 add_rec_buf,
1863                 None)
1864
1865         except WERRORError as e:
1866             self.fail(str(e))
1867
1868         try:
1869             self.check_query_txt(prefix, txt)
1870
1871         finally:
1872             self.rpc_conn.DnssrvUpdateRecord2(
1873                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1874                 0,
1875                 self.server_ip,
1876                 self.get_dns_domain(),
1877                 name,
1878                 None,
1879                 add_rec_buf)
1880
1881     def test_update_add_two_txt_records(self):
1882         "test adding two txt records works"
1883         prefix, txt = 'textrec2', ['"This is a test"',
1884                                    '"and this is a test, too"']
1885         p = self.make_txt_update(prefix, txt)
1886         (response, response_packet) =\
1887             self.dns_transaction_udp(p, host=server_ip)
1888         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1889         self.check_query_txt(prefix, txt)
1890         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1891                                               self.get_dns_domain(),
1892                                               "%s.%s" % (prefix, self.get_dns_domain()),
1893                                               dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1894                                               ' "\\"and this is a test, too\\""'))
1895
1896     def test_update_add_two_rpc_to_dns(self):
1897         prefix, txt = 'textrec2', ['"This is a test"',
1898                                    '"and this is a test, too"']
1899         prefix = 'rpc' + prefix
1900         name = "%s.%s" % (prefix, self.get_dns_domain())
1901
1902         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1903                                  '"\\"This is a test\\""' +
1904                                  ' "\\"and this is a test, too\\""')
1905         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1906         add_rec_buf.rec = rec
1907         try:
1908             self.rpc_conn.DnssrvUpdateRecord2(
1909                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1910                 0,
1911                 self.server_ip,
1912                 self.get_dns_domain(),
1913                 name,
1914                 add_rec_buf,
1915                 None)
1916
1917         except WERRORError as e:
1918             self.fail(str(e))
1919
1920         try:
1921             self.check_query_txt(prefix, txt)
1922         finally:
1923             self.rpc_conn.DnssrvUpdateRecord2(
1924                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1925                 0,
1926                 self.server_ip,
1927                 self.get_dns_domain(),
1928                 name,
1929                 None,
1930                 add_rec_buf)
1931
1932     def test_update_add_empty_txt_records(self):
1933         "test adding two txt records works"
1934         prefix, txt = 'emptytextrec', []
1935         p = self.make_txt_update(prefix, txt)
1936         (response, response_packet) =\
1937             self.dns_transaction_udp(p, host=server_ip)
1938         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1939         self.check_query_txt(prefix, txt)
1940         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1941                                               self.get_dns_domain(),
1942                                               "%s.%s" % (prefix, self.get_dns_domain()),
1943                                               dnsp.DNS_TYPE_TXT, ''))
1944
1945     def test_update_add_empty_rpc_to_dns(self):
1946         prefix, txt = 'rpcemptytextrec', []
1947
1948         name = "%s.%s" % (prefix, self.get_dns_domain())
1949
1950         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1951         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1952         add_rec_buf.rec = rec
1953         try:
1954             self.rpc_conn.DnssrvUpdateRecord2(
1955                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1956                 0,
1957                 self.server_ip,
1958                 self.get_dns_domain(),
1959                 name,
1960                 add_rec_buf,
1961                 None)
1962         except WERRORError as e:
1963             self.fail(str(e))
1964
1965         try:
1966             self.check_query_txt(prefix, txt)
1967         finally:
1968             self.rpc_conn.DnssrvUpdateRecord2(
1969                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1970                 0,
1971                 self.server_ip,
1972                 self.get_dns_domain(),
1973                 name,
1974                 None,
1975                 add_rec_buf)
1976
1977
1978 TestProgram(module=__name__, opts=subunitopts)