800ce576dec9f2fb6195bfbcbca535e3a7b112ab
[samba.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
24 import ldb
25 import os
26 import sys
27 import struct
28 import random
29 import socket
30 import samba.ndr as ndr
31 from samba import credentials, param
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
38 import optparse
39
40 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
41 sambaopts = options.SambaOptions(parser)
42 parser.add_option_group(sambaopts)
43
44 # This timeout only has relevance when testing against Windows
45 # Format errors tend to return patchy responses, so a timeout is needed.
46 parser.add_option("--timeout", type="int", dest="timeout",
47                   help="Specify timeout for DNS requests")
48
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54
55 opts, args = parser.parse_args()
56
57 lp = sambaopts.get_loadparm()
58 creds = credopts.get_credentials(lp)
59
60 timeout = opts.timeout
61
62 if len(args) < 2:
63     parser.print_usage()
64     sys.exit(1)
65
66 server_name = args[0]
67 server_ip = args[1]
68 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
69
70 class TestSimpleQueries(DNSTest):
71     def setUp(self):
72         super(TestSimpleQueries, self).setUp()
73         global server, server_ip, lp, creds, timeout
74         self.server = server_name
75         self.server_ip = server_ip
76         self.lp = lp
77         self.creds = creds
78         self.timeout = timeout
79
80     def test_one_a_query(self):
81         "create a query packet containing one query record"
82         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
83         questions = []
84
85         name = "%s.%s" % (self.server, self.get_dns_domain())
86         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
87         print("asking for ", q.name)
88         questions.append(q)
89
90         self.finish_name_packet(p, questions)
91         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
92         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
93         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
94         self.assertEquals(response.ancount, 1)
95         self.assertEquals(response.answers[0].rdata,
96                           self.server_ip)
97
98     def test_one_SOA_query(self):
99         "create a query packet containing one query record for the SOA"
100         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
101         questions = []
102
103         name = "%s" % (self.get_dns_domain())
104         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
105         print("asking for ", q.name)
106         questions.append(q)
107
108         self.finish_name_packet(p, questions)
109         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
110         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
111         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
112         self.assertEquals(response.ancount, 1)
113         self.assertEquals(response.answers[0].rdata.mname.upper(),
114                           ("%s.%s" % (self.server, self.get_dns_domain())).upper())
115
116     def test_one_a_query_tcp(self):
117         "create a query packet containing one query record via TCP"
118         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
119         questions = []
120
121         name = "%s.%s" % (self.server, self.get_dns_domain())
122         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
123         print("asking for ", q.name)
124         questions.append(q)
125
126         self.finish_name_packet(p, questions)
127         (response, response_packet) = self.dns_transaction_tcp(p, host=server_ip)
128         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
129         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
130         self.assertEquals(response.ancount, 1)
131         self.assertEquals(response.answers[0].rdata,
132                           self.server_ip)
133
134     def test_one_mx_query(self):
135         "create a query packet causing an empty RCODE_OK answer"
136         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
137         questions = []
138
139         name = "%s.%s" % (self.server, self.get_dns_domain())
140         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
141         print("asking for ", q.name)
142         questions.append(q)
143
144         self.finish_name_packet(p, questions)
145         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
146         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
147         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
148         self.assertEquals(response.ancount, 0)
149
150         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
151         questions = []
152
153         name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
154         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
155         print("asking for ", q.name)
156         questions.append(q)
157
158         self.finish_name_packet(p, questions)
159         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
160         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
161         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
162         self.assertEquals(response.ancount, 0)
163
164     def test_two_queries(self):
165         "create a query packet containing two query records"
166         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
167         questions = []
168
169         name = "%s.%s" % (self.server, self.get_dns_domain())
170         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
171         questions.append(q)
172
173         name = "%s.%s" % ('bogusname', self.get_dns_domain())
174         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
175         questions.append(q)
176
177         self.finish_name_packet(p, questions)
178         try:
179             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
180             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
181         except socket.timeout:
182             # Windows chooses not to respond to incorrectly formatted queries.
183             # Although this appears to be non-deterministic even for the same
184             # request twice, it also appears to be based on a how poorly the
185             # request is formatted.
186             pass
187
188     def test_qtype_all_query(self):
189         "create a QTYPE_ALL query"
190         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
191         questions = []
192
193         name = "%s.%s" % (self.server, self.get_dns_domain())
194         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
195         print("asking for ", q.name)
196         questions.append(q)
197
198         self.finish_name_packet(p, questions)
199         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
200
201         num_answers = 1
202         dc_ipv6 = os.getenv('SERVER_IPV6')
203         if dc_ipv6 is not None:
204             num_answers += 1
205
206         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
207         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
208         self.assertEquals(response.ancount, num_answers)
209         self.assertEquals(response.answers[0].rdata,
210                           self.server_ip)
211         if dc_ipv6 is not None:
212             self.assertEquals(response.answers[1].rdata, dc_ipv6)
213
214     def test_qclass_none_query(self):
215         "create a QCLASS_NONE query"
216         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
217         questions = []
218
219         name = "%s.%s" % (self.server, self.get_dns_domain())
220         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
221         questions.append(q)
222
223         self.finish_name_packet(p, questions)
224         try:
225             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
226             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
227         except socket.timeout:
228             # Windows chooses not to respond to incorrectly formatted queries.
229             # Although this appears to be non-deterministic even for the same
230             # request twice, it also appears to be based on a how poorly the
231             # request is formatted.
232             pass
233
234     def test_soa_hostname_query(self):
235         "create a SOA query for a hostname"
236         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
237         questions = []
238
239         name = "%s.%s" % (self.server, self.get_dns_domain())
240         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
241         questions.append(q)
242
243         self.finish_name_packet(p, questions)
244         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
245         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
246         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
247         # We don't get SOA records for single hosts
248         self.assertEquals(response.ancount, 0)
249         # But we do respond with an authority section
250         self.assertEqual(response.nscount, 1)
251
252     def test_soa_domain_query(self):
253         "create a SOA query for a domain"
254         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
255         questions = []
256
257         name = self.get_dns_domain()
258         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
259         questions.append(q)
260
261         self.finish_name_packet(p, questions)
262         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
263         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
264         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
265         self.assertEquals(response.ancount, 1)
266         self.assertEquals(response.answers[0].rdata.minimum, 3600)
267
268
269 class TestDNSUpdates(DNSTest):
270     def setUp(self):
271         super(TestDNSUpdates, self).setUp()
272         global server, server_ip, lp, creds, timeout
273         self.server = server_name
274         self.server_ip = server_ip
275         self.lp = lp
276         self.creds = creds
277         self.timeout = timeout
278
279     def test_two_updates(self):
280         "create two update requests"
281         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
282         updates = []
283
284         name = "%s.%s" % (self.server, self.get_dns_domain())
285         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
286         updates.append(u)
287
288         name = self.get_dns_domain()
289         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
290         updates.append(u)
291
292         self.finish_name_packet(p, updates)
293         try:
294             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
295             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
296         except socket.timeout:
297             # Windows chooses not to respond to incorrectly formatted queries.
298             # Although this appears to be non-deterministic even for the same
299             # request twice, it also appears to be based on a how poorly the
300             # request is formatted.
301             pass
302
303     def test_update_wrong_qclass(self):
304         "create update with DNS_QCLASS_NONE"
305         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
306         updates = []
307
308         name = self.get_dns_domain()
309         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
310         updates.append(u)
311
312         self.finish_name_packet(p, updates)
313         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
314         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
315
316     def test_update_prereq_with_non_null_ttl(self):
317         "test update with a non-null TTL"
318         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
319         updates = []
320
321         name = self.get_dns_domain()
322
323         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
324         updates.append(u)
325         self.finish_name_packet(p, updates)
326
327         prereqs = []
328         r = dns.res_rec()
329         r.name = "%s.%s" % (self.server, self.get_dns_domain())
330         r.rr_type = dns.DNS_QTYPE_TXT
331         r.rr_class = dns.DNS_QCLASS_NONE
332         r.ttl = 1
333         r.length = 0
334         prereqs.append(r)
335
336         p.ancount = len(prereqs)
337         p.answers = prereqs
338
339         try:
340             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
341             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
342         except socket.timeout:
343             # Windows chooses not to respond to incorrectly formatted queries.
344             # Although this appears to be non-deterministic even for the same
345             # request twice, it also appears to be based on a how poorly the
346             # request is formatted.
347             pass
348
349     def test_update_prereq_with_non_null_length(self):
350         "test update with a non-null length"
351         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
352         updates = []
353
354         name = self.get_dns_domain()
355
356         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
357         updates.append(u)
358         self.finish_name_packet(p, updates)
359
360         prereqs = []
361         r = dns.res_rec()
362         r.name = "%s.%s" % (self.server, self.get_dns_domain())
363         r.rr_type = dns.DNS_QTYPE_TXT
364         r.rr_class = dns.DNS_QCLASS_ANY
365         r.ttl = 0
366         r.length = 1
367         prereqs.append(r)
368
369         p.ancount = len(prereqs)
370         p.answers = prereqs
371
372         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
373         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
374
375     def test_update_prereq_nonexisting_name(self):
376         "test update with a nonexisting name"
377         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
378         updates = []
379
380         name = self.get_dns_domain()
381
382         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
383         updates.append(u)
384         self.finish_name_packet(p, updates)
385
386         prereqs = []
387         r = dns.res_rec()
388         r.name = "idontexist.%s" % self.get_dns_domain()
389         r.rr_type = dns.DNS_QTYPE_TXT
390         r.rr_class = dns.DNS_QCLASS_ANY
391         r.ttl = 0
392         r.length = 0
393         prereqs.append(r)
394
395         p.ancount = len(prereqs)
396         p.answers = prereqs
397
398         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
399         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
400
401     def test_update_add_txt_record(self):
402         "test adding records works"
403         prefix, txt = 'textrec', ['"This is a test"']
404         p = self.make_txt_update(prefix, txt)
405         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
406         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
407         self.check_query_txt(prefix, txt)
408
409     def test_delete_record(self):
410         "Test if deleting records works"
411
412         NAME = "deleterec.%s" % self.get_dns_domain()
413
414         # First, create a record to make sure we have a record to delete.
415         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
416         updates = []
417
418         name = self.get_dns_domain()
419
420         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
421         updates.append(u)
422         self.finish_name_packet(p, updates)
423
424         updates = []
425         r = dns.res_rec()
426         r.name = NAME
427         r.rr_type = dns.DNS_QTYPE_TXT
428         r.rr_class = dns.DNS_QCLASS_IN
429         r.ttl = 900
430         r.length = 0xffff
431         rdata = self.make_txt_record(['"This is a test"'])
432         r.rdata = rdata
433         updates.append(r)
434         p.nscount = len(updates)
435         p.nsrecs = updates
436
437         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
438         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
439
440         # Now check the record is around
441         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
442         questions = []
443         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
444         questions.append(q)
445
446         self.finish_name_packet(p, questions)
447         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
448         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
449
450         # Now delete the record
451         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
452         updates = []
453
454         name = self.get_dns_domain()
455
456         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
457         updates.append(u)
458         self.finish_name_packet(p, updates)
459
460         updates = []
461         r = dns.res_rec()
462         r.name = NAME
463         r.rr_type = dns.DNS_QTYPE_TXT
464         r.rr_class = dns.DNS_QCLASS_NONE
465         r.ttl = 0
466         r.length = 0xffff
467         rdata = self.make_txt_record(['"This is a test"'])
468         r.rdata = rdata
469         updates.append(r)
470         p.nscount = len(updates)
471         p.nsrecs = updates
472
473         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
474         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
475
476         # And finally check it's gone
477         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
478         questions = []
479
480         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
481         questions.append(q)
482
483         self.finish_name_packet(p, questions)
484         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
485         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
486
487     def test_readd_record(self):
488         "Test if adding, deleting and then readding a records works"
489
490         NAME = "readdrec.%s" % self.get_dns_domain()
491
492         # Create the record
493         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
494         updates = []
495
496         name = self.get_dns_domain()
497
498         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
499         updates.append(u)
500         self.finish_name_packet(p, updates)
501
502         updates = []
503         r = dns.res_rec()
504         r.name = NAME
505         r.rr_type = dns.DNS_QTYPE_TXT
506         r.rr_class = dns.DNS_QCLASS_IN
507         r.ttl = 900
508         r.length = 0xffff
509         rdata = self.make_txt_record(['"This is a test"'])
510         r.rdata = rdata
511         updates.append(r)
512         p.nscount = len(updates)
513         p.nsrecs = updates
514
515         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
516         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
517
518         # Now check the record is around
519         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
520         questions = []
521         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
522         questions.append(q)
523
524         self.finish_name_packet(p, questions)
525         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
526         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
527
528         # Now delete the record
529         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
530         updates = []
531
532         name = self.get_dns_domain()
533
534         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
535         updates.append(u)
536         self.finish_name_packet(p, updates)
537
538         updates = []
539         r = dns.res_rec()
540         r.name = NAME
541         r.rr_type = dns.DNS_QTYPE_TXT
542         r.rr_class = dns.DNS_QCLASS_NONE
543         r.ttl = 0
544         r.length = 0xffff
545         rdata = self.make_txt_record(['"This is a test"'])
546         r.rdata = rdata
547         updates.append(r)
548         p.nscount = len(updates)
549         p.nsrecs = updates
550
551         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
552         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
553
554         # check it's gone
555         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
556         questions = []
557
558         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
559         questions.append(q)
560
561         self.finish_name_packet(p, questions)
562         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
563         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
564
565         # recreate the record
566         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
567         updates = []
568
569         name = self.get_dns_domain()
570
571         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
572         updates.append(u)
573         self.finish_name_packet(p, updates)
574
575         updates = []
576         r = dns.res_rec()
577         r.name = NAME
578         r.rr_type = dns.DNS_QTYPE_TXT
579         r.rr_class = dns.DNS_QCLASS_IN
580         r.ttl = 900
581         r.length = 0xffff
582         rdata = self.make_txt_record(['"This is a test"'])
583         r.rdata = rdata
584         updates.append(r)
585         p.nscount = len(updates)
586         p.nsrecs = updates
587
588         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
589         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
590
591         # Now check the record is around
592         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
593         questions = []
594         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
595         questions.append(q)
596
597         self.finish_name_packet(p, questions)
598         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
599         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
600
601     def test_update_add_mx_record(self):
602         "test adding MX records works"
603         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
604         updates = []
605
606         name = self.get_dns_domain()
607
608         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
609         updates.append(u)
610         self.finish_name_packet(p, updates)
611
612         updates = []
613         r = dns.res_rec()
614         r.name = "%s" % self.get_dns_domain()
615         r.rr_type = dns.DNS_QTYPE_MX
616         r.rr_class = dns.DNS_QCLASS_IN
617         r.ttl = 900
618         r.length = 0xffff
619         rdata = dns.mx_record()
620         rdata.preference = 10
621         rdata.exchange = 'mail.%s' % self.get_dns_domain()
622         r.rdata = rdata
623         updates.append(r)
624         p.nscount = len(updates)
625         p.nsrecs = updates
626
627         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
628         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
629
630         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
631         questions = []
632
633         name = "%s" % self.get_dns_domain()
634         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
635         questions.append(q)
636
637         self.finish_name_packet(p, questions)
638         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
639         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
640         self.assertEqual(response.ancount, 1)
641         ans = response.answers[0]
642         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
643         self.assertEqual(ans.rdata.preference, 10)
644         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
645
646
647 class TestComplexQueries(DNSTest):
648     def make_dns_update(self, key, value, qtype):
649         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
650
651         name = self.get_dns_domain()
652         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
653         self.finish_name_packet(p, [u])
654
655         r = dns.res_rec()
656         r.name = key
657         r.rr_type = qtype
658         r.rr_class = dns.DNS_QCLASS_IN
659         r.ttl = 900
660         r.length = 0xffff
661         r.rdata = value
662         p.nscount = 1
663         p.nsrecs = [r]
664         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
665         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
666
667     def setUp(self):
668         super(TestComplexQueries, self).setUp()
669
670         global server, server_ip, lp, creds, timeout
671         self.server = server_name
672         self.server_ip = server_ip
673         self.lp = lp
674         self.creds = creds
675         self.timeout = timeout
676
677     def test_one_a_query(self):
678         "create a query packet containing one query record"
679
680         try:
681
682             # Create the record
683             name = "cname_test.%s" % self.get_dns_domain()
684             rdata = "%s.%s" % (self.server, self.get_dns_domain())
685             self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
686
687             p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
688             questions = []
689
690             # Check the record
691             name = "cname_test.%s" % self.get_dns_domain()
692             q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
693             print("asking for ", q.name)
694             questions.append(q)
695
696             self.finish_name_packet(p, questions)
697             (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
698             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
699             self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
700             self.assertEquals(response.ancount, 2)
701             self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
702             self.assertEquals(response.answers[0].rdata, "%s.%s" %
703                               (self.server, self.get_dns_domain()))
704             self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
705             self.assertEquals(response.answers[1].rdata,
706                               self.server_ip)
707
708         finally:
709             # Delete the record
710             p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
711             updates = []
712
713             name = self.get_dns_domain()
714
715             u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
716             updates.append(u)
717             self.finish_name_packet(p, updates)
718
719             updates = []
720             r = dns.res_rec()
721             r.name = "cname_test.%s" % self.get_dns_domain()
722             r.rr_type = dns.DNS_QTYPE_CNAME
723             r.rr_class = dns.DNS_QCLASS_NONE
724             r.ttl = 0
725             r.length = 0xffff
726             r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
727             updates.append(r)
728             p.nscount = len(updates)
729             p.nsrecs = updates
730
731             (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
732             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
733
734     def test_cname_two_chain(self):
735         name0 = "cnamechain0.%s" % self.get_dns_domain()
736         name1 = "cnamechain1.%s" % self.get_dns_domain()
737         name2 = "cnamechain2.%s" % self.get_dns_domain()
738         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
739         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
740         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
741
742         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
743         questions = []
744         q = self.make_name_question(name1, dns.DNS_QTYPE_A,
745                                     dns.DNS_QCLASS_IN)
746         questions.append(q)
747
748         self.finish_name_packet(p, questions)
749         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
750         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
751         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
752         self.assertEquals(response.ancount, 3)
753
754         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
755         self.assertEquals(response.answers[0].name, name1)
756         self.assertEquals(response.answers[0].rdata, name2)
757
758         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
759         self.assertEquals(response.answers[1].name, name2)
760         self.assertEquals(response.answers[1].rdata, name0)
761
762         self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
763         self.assertEquals(response.answers[2].rdata,
764                           self.server_ip)
765
766     def test_invalid_empty_cname(self):
767         name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
768         try:
769             self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
770         except AssertionError as e:
771             pass
772         else:
773             self.fail("Successfully added empty CNAME, which is invalid.")
774
775     def test_cname_two_chain_not_matching_qtype(self):
776         name0 = "cnamechain0.%s" % self.get_dns_domain()
777         name1 = "cnamechain1.%s" % self.get_dns_domain()
778         name2 = "cnamechain2.%s" % self.get_dns_domain()
779         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
780         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
781         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
782
783         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
784         questions = []
785         q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
786                                     dns.DNS_QCLASS_IN)
787         questions.append(q)
788
789         self.finish_name_packet(p, questions)
790         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
791         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
792         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
793
794         # CNAME should return all intermediate results!
795         # Only the A records exists, not the TXT.
796         self.assertEquals(response.ancount, 2)
797
798         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
799         self.assertEquals(response.answers[0].name, name1)
800         self.assertEquals(response.answers[0].rdata, name2)
801
802         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
803         self.assertEquals(response.answers[1].name, name2)
804         self.assertEquals(response.answers[1].rdata, name0)
805
806 class TestInvalidQueries(DNSTest):
807     def setUp(self):
808         super(TestInvalidQueries, self).setUp()
809         global server, server_ip, lp, creds, timeout
810         self.server = server_name
811         self.server_ip = server_ip
812         self.lp = lp
813         self.creds = creds
814         self.timeout = timeout
815
816     def test_one_a_query(self):
817         "send 0 bytes follows by create a query packet containing one query record"
818
819         s = None
820         try:
821             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
822             s.connect((self.server_ip, 53))
823             s.send("", 0)
824         finally:
825             if s is not None:
826                 s.close()
827
828         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
829         questions = []
830
831         name = "%s.%s" % (self.server, self.get_dns_domain())
832         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
833         print("asking for ", q.name)
834         questions.append(q)
835
836         self.finish_name_packet(p, questions)
837         (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
838         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
839         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
840         self.assertEquals(response.ancount, 1)
841         self.assertEquals(response.answers[0].rdata,
842                           self.server_ip)
843
844     def test_one_a_reply(self):
845         "send a reply instead of a query"
846         global timeout
847
848         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
849         questions = []
850
851         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
852         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
853         print("asking for ", q.name)
854         questions.append(q)
855
856         self.finish_name_packet(p, questions)
857         p.operation |= dns.DNS_FLAG_REPLY
858         s = None
859         try:
860             send_packet = ndr.ndr_pack(p)
861             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
862             s.settimeout(timeout)
863             host=self.server_ip
864             s.connect((host, 53))
865             tcp_packet = struct.pack('!H', len(send_packet))
866             tcp_packet += send_packet
867             s.send(tcp_packet, 0)
868             recv_packet = s.recv(0xffff + 2, 0)
869             self.assertEquals(0, len(recv_packet))
870         except socket.timeout:
871             # Windows chooses not to respond to incorrectly formatted queries.
872             # Although this appears to be non-deterministic even for the same
873             # request twice, it also appears to be based on a how poorly the
874             # request is formatted.
875             pass
876         finally:
877             if s is not None:
878                 s.close()
879
880 class TestZones(DNSTest):
881     def setUp(self):
882         super(TestZones, self).setUp()
883         global server, server_ip, lp, creds, timeout
884         self.server = server_name
885         self.server_ip = server_ip
886         self.lp = lp
887         self.creds = creds
888         self.timeout = timeout
889
890         self.zone = "test.lan"
891         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %\
892                                             (self.server_ip),
893                                             self.lp, self.creds)
894
895         self.samdb = SamDB(url="ldap://" + self.server_ip,
896                            lp = self.get_loadparm(),
897                            session_info=system_session(),
898                            credentials=self.creds)
899
900         self.zone_dn = "DC=" + self.zone +\
901                        ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
902                        str(self.samdb.get_default_basedn())
903
904     def tearDown(self):
905         super(TestZones, self).tearDown()
906
907         try:
908             self.delete_zone(self.zone)
909         except RuntimeError as e:
910             (num, string) = e.args
911             if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
912                 raise
913
914     def create_zone(self, zone, aging_enabled=False):
915         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
916         zone_create.pszZoneName = zone
917         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
918         zone_create.fAging = int(aging_enabled)
919         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
920         zone_create.fDsIntegrated = 1
921         zone_create.fLoadExisting = 1
922         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
923         try:
924             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
925             self.rpc_conn.DnssrvOperation2(client_version,
926                                            0,
927                                            self.server_ip,
928                                            None,
929                                            0,
930                                            'ZoneCreate',
931                                            dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
932                                            zone_create)
933         except WERRORError as e:
934             self.fail(str(e))
935
936     def set_params(self, **kwargs):
937         zone = kwargs.pop('zone', None)
938         for key,val in kwargs.items():
939             name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
940             name_param.dwParam = val
941             name_param.pszNodeName = key
942
943             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
944             nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
945             try:
946                 self.rpc_conn.DnssrvOperation2(client_version, 0, self.server, zone,
947                                                0, 'ResetDwordProperty', nap_type,
948                                                name_param)
949             except WERRORError as e:
950                 self.fail(str(e))
951
952     def ldap_modify_dnsrecs(self, name, func):
953         dn = 'DC={},{}'.format(name, self.zone_dn)
954         dns_recs = self.ldap_get_dns_records(name)
955         for rec in dns_recs:
956             func(rec)
957         update_dict = {'dn':dn, 'dnsRecord':[ndr_pack(r) for r in dns_recs]}
958         self.samdb.modify(ldb.Message.from_dict(self.samdb,
959                                                 update_dict,
960                                                 ldb.FLAG_MOD_REPLACE))
961
962     def dns_update_record(self, prefix, txt):
963         p = self.make_txt_update(prefix, txt, self.zone)
964         (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
965         self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
966         recs = self.ldap_get_dns_records(prefix)
967         recs = [r for r in recs if r.data.str == txt]
968         self.assertEqual(len(recs), 1)
969         return recs[0]
970
971     def ldap_get_records(self, name):
972         dn = 'DC={},{}'.format(name, self.zone_dn)
973         expr = "(&(objectClass=dnsNode)(name={}))".format(name)
974         return self.samdb.search(base=dn, scope=ldb.SCOPE_SUBTREE,
975                                  expression=expr, attrs=["*"])
976
977     def ldap_get_dns_records(self, name):
978         records = self.ldap_get_records(name)
979         return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
980                 for r in records[0].get('dnsRecord')]
981
982     def ldap_get_zone_settings(self):
983         records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
984                    expression="(&(objectClass=dnsZone)"+\
985                                 "(name={}))".format(self.zone),
986                                     attrs=["dNSProperty"])
987         self.assertEqual(len(records), 1)
988         props = [ndr_unpack(dnsp.DnsProperty, r)
989                  for r in records[0].get('dNSProperty')]
990
991         #We have no choice but to repeat these here.
992         zone_prop_ids = {0x00: "EMPTY",
993                          0x01: "TYPE",
994                          0x02: "ALLOW_UPDATE",
995                          0x08: "SECURE_TIME",
996                          0x10: "NOREFRESH_INTERVAL",
997                          0x11: "SCAVENGING_SERVERS",
998                          0x12: "AGING_ENABLED_TIME",
999                          0x20: "REFRESH_INTERVAL",
1000                          0x40: "AGING_STATE",
1001                          0x80: "DELETED_FROM_HOSTNAME",
1002                          0x81: "MASTER_SERVERS",
1003                          0x82: "AUTO_NS_SERVERS",
1004                          0x83: "DCPROMO_CONVERT",
1005                          0x90: "SCAVENGING_SERVERS_DA",
1006                          0x91: "MASTER_SERVERS_DA",
1007                          0x92: "NS_SERVERS_DA",
1008                          0x100: "NODE_DBFLAGS"}
1009         return {zone_prop_ids[p.id].lower(): p.data for p in props}
1010
1011     def set_aging(self, enable=False):
1012         self.create_zone(self.zone, aging_enabled=enable)
1013         self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1014                         Aging=int(bool(enable)), zone=self.zone,
1015                         AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
1016
1017     def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1018         self.set_aging(enable=True)
1019         settings = self.ldap_get_zone_settings()
1020         self.assertTrue(settings['aging_state'] is not None)
1021         self.assertTrue(settings['aging_state'])
1022
1023         rec = self.dns_update_record('agingtest', ['test txt'])
1024         self.assertNotEqual(rec.dwTimeStamp, 0)
1025
1026     def test_set_aging_disabled(self):
1027         self.set_aging(enable=False)
1028         settings = self.ldap_get_zone_settings()
1029         self.assertTrue(settings['aging_state'] is not None)
1030         self.assertFalse(settings['aging_state'])
1031
1032         rec = self.dns_update_record('agingtest', ['test txt'])
1033         self.assertNotEqual(rec.dwTimeStamp, 0)
1034
1035     def test_aging_update(self, enable=True):
1036         name, txt = 'agingtest', ['test txt']
1037         self.set_aging(enable=True)
1038         before_mod = self.dns_update_record(name, txt)
1039         if not enable:
1040             self.set_params(zone=self.zone, Aging=0)
1041         dec = 2
1042         def mod_ts(rec):
1043             self.assertTrue(rec.dwTimeStamp > 0)
1044             rec.dwTimeStamp -= dec
1045         self.ldap_modify_dnsrecs(name, mod_ts)
1046         after_mod = self.ldap_get_dns_records(name)
1047         self.assertEqual(len(after_mod), 1)
1048         after_mod = after_mod[0]
1049         self.assertEqual(after_mod.dwTimeStamp,
1050                          before_mod.dwTimeStamp - dec)
1051         after_update = self.dns_update_record(name, txt)
1052         after_should_equal = before_mod if enable else after_mod
1053         self.assertEqual(after_should_equal.dwTimeStamp,
1054                          after_update.dwTimeStamp)
1055
1056     def test_aging_update_disabled(self):
1057         self.test_aging_update(enable=False)
1058
1059     def test_aging_refresh(self):
1060         name,txt = 'agingtest', ['test txt']
1061         self.create_zone(self.zone, aging_enabled=True)
1062         interval = 10
1063         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1064                         Aging=1, zone=self.zone,
1065                         AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
1066         before_mod = self.dns_update_record(name, txt)
1067         def mod_ts(rec):
1068             self.assertTrue(rec.dwTimeStamp > 0)
1069             rec.dwTimeStamp -= interval/2
1070         self.ldap_modify_dnsrecs(name, mod_ts)
1071         update_during_norefresh = self.dns_update_record(name, txt)
1072         def mod_ts(rec):
1073             self.assertTrue(rec.dwTimeStamp > 0)
1074             rec.dwTimeStamp -= interval + interval/2
1075         self.ldap_modify_dnsrecs(name, mod_ts)
1076         update_during_refresh = self.dns_update_record(name, txt)
1077         self.assertEqual(update_during_norefresh.dwTimeStamp,
1078                          before_mod.dwTimeStamp - interval/2)
1079         self.assertEqual(update_during_refresh.dwTimeStamp,
1080                          before_mod.dwTimeStamp)
1081
1082     def test_rpc_add_no_timestamp(self):
1083         name,txt = 'agingtest', ['test txt']
1084         self.set_aging(enable=True)
1085         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1086         rec_buf.rec = TXTRecord(txt)
1087         self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1088                                           0, self.server_ip,
1089                                           self.zone, name, rec_buf, None)
1090         recs = self.ldap_get_dns_records(name)
1091         self.assertEqual(len(recs), 1)
1092         self.assertEqual(recs[0].dwTimeStamp, 0)
1093
1094     def test_basic_scavenging(self):
1095         self.create_zone(self.zone, aging_enabled=True)
1096         interval = 1
1097         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1098                         zone=self.zone, Aging=1,
1099                         AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
1100         name, txt = 'agingtest', ['test txt']
1101         rec = self.dns_update_record(name,txt)
1102         rec = self.dns_update_record(name+'2',txt)
1103         def mod_ts(rec):
1104             self.assertTrue(rec.dwTimeStamp > 0)
1105             rec.dwTimeStamp -= interval*5
1106         self.ldap_modify_dnsrecs(name, mod_ts)
1107         self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1108         dsdb._scavenge_dns_records(self.samdb)
1109
1110         recs = self.ldap_get_dns_records(name)
1111         self.assertEqual(len(recs), 1)
1112         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1113         recs = self.ldap_get_dns_records(name+'2')
1114         self.assertEqual(len(recs), 1)
1115         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1116
1117     def delete_zone(self, zone):
1118         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1119                                        0,
1120                                        self.server_ip,
1121                                        zone,
1122                                        0,
1123                                        'DeleteZoneFromDs',
1124                                        dnsserver.DNSSRV_TYPEID_NULL,
1125                                        None)
1126
1127     def test_soa_query(self):
1128         zone = "test.lan"
1129         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1130         questions = []
1131
1132         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1133         questions.append(q)
1134         self.finish_name_packet(p, questions)
1135
1136         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1137         # Windows returns OK while BIND logically seems to return NXDOMAIN
1138         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1139         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1140         self.assertEquals(response.ancount, 0)
1141
1142         self.create_zone(zone)
1143         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1144         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1145         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1146         self.assertEquals(response.ancount, 1)
1147         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1148
1149         self.delete_zone(zone)
1150         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1151         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1152         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1153         self.assertEquals(response.ancount, 0)
1154
1155 class TestRPCRoundtrip(DNSTest):
1156     def setUp(self):
1157         super(TestRPCRoundtrip, self).setUp()
1158         global server, server_ip, lp, creds
1159         self.server = server_name
1160         self.server_ip = server_ip
1161         self.lp = lp
1162         self.creds = creds
1163         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
1164                                             self.lp, self.creds)
1165
1166     def tearDown(self):
1167         super(TestRPCRoundtrip, self).tearDown()
1168
1169     def test_update_add_txt_rpc_to_dns(self):
1170         prefix, txt = 'rpctextrec', ['"This is a test"']
1171
1172         name = "%s.%s" % (prefix, self.get_dns_domain())
1173
1174         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1175         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1176         add_rec_buf.rec = rec
1177         try:
1178             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1179                                      0, self.server_ip, self.get_dns_domain(),
1180                                      name, add_rec_buf, None)
1181         except WERRORError as e:
1182             self.fail(str(e))
1183
1184         try:
1185             self.check_query_txt(prefix, txt)
1186         finally:
1187             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1188                                               0, self.server_ip, self.get_dns_domain(),
1189                                               name, None, add_rec_buf)
1190
1191     def test_update_add_null_padded_txt_record(self):
1192         "test adding records works"
1193         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1194         p = self.make_txt_update(prefix, txt)
1195         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1196         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1197         self.check_query_txt(prefix, txt)
1198         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1199                              self.get_dns_domain(),
1200                              "%s.%s" % (prefix, self.get_dns_domain()),
1201                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
1202
1203         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1204         p = self.make_txt_update(prefix, txt)
1205         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1206         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1207         self.check_query_txt(prefix, txt)
1208         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1209                              self.get_dns_domain(),
1210                              "%s.%s" % (prefix, self.get_dns_domain()),
1211                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
1212
1213         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1214         p = self.make_txt_update(prefix, txt)
1215         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1216         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1217         self.check_query_txt(prefix, txt)
1218         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1219                              self.get_dns_domain(),
1220                              "%s.%s" % (prefix, self.get_dns_domain()),
1221                              dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
1222
1223     def test_update_add_padding_rpc_to_dns(self):
1224         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1225         prefix = 'rpc' + prefix
1226         name = "%s.%s" % (prefix, self.get_dns_domain())
1227
1228         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
1229         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1230         add_rec_buf.rec = rec
1231         try:
1232             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1233                                      0, self.server_ip, self.get_dns_domain(),
1234                                      name, add_rec_buf, None)
1235
1236         except WERRORError as e:
1237             self.fail(str(e))
1238
1239         try:
1240             self.check_query_txt(prefix, txt)
1241         finally:
1242             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1243                                               0, self.server_ip, self.get_dns_domain(),
1244                                               name, None, add_rec_buf)
1245
1246         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1247         prefix = 'rpc' + prefix
1248         name = "%s.%s" % (prefix, self.get_dns_domain())
1249
1250         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
1251         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1252         add_rec_buf.rec = rec
1253         try:
1254             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1255                                      0, self.server_ip, self.get_dns_domain(),
1256                                      name, add_rec_buf, None)
1257
1258         except WERRORError as e:
1259             self.fail(str(e))
1260
1261         try:
1262             self.check_query_txt(prefix, txt)
1263         finally:
1264             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1265                                               0, self.server_ip, self.get_dns_domain(),
1266                                               name, None, add_rec_buf)
1267
1268         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1269         prefix = 'rpc' + prefix
1270         name = "%s.%s" % (prefix, self.get_dns_domain())
1271
1272         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
1273         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1274         add_rec_buf.rec = rec
1275         try:
1276             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1277                                      0, self.server_ip, self.get_dns_domain(),
1278                                      name, add_rec_buf, None)
1279         except WERRORError as e:
1280             self.fail(str(e))
1281
1282         try:
1283             self.check_query_txt(prefix, txt)
1284         finally:
1285             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1286                                               0, self.server_ip, self.get_dns_domain(),
1287                                               name, None, add_rec_buf)
1288
1289     # Test is incomplete due to strlen against txt records
1290     def test_update_add_null_char_txt_record(self):
1291         "test adding records works"
1292         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1293         p = self.make_txt_update(prefix, txt)
1294         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1295         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1296         self.check_query_txt(prefix, ['NULL'])
1297         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1298                              self.get_dns_domain(),
1299                              "%s.%s" % (prefix, self.get_dns_domain()),
1300                              dnsp.DNS_TYPE_TXT, '"NULL"'))
1301
1302         prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1303         p = self.make_txt_update(prefix, txt)
1304         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1305         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1306         self.check_query_txt(prefix, ['NULL', 'NULL'])
1307         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1308                              self.get_dns_domain(),
1309                              "%s.%s" % (prefix, self.get_dns_domain()),
1310                              dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1311
1312     def test_update_add_null_char_rpc_to_dns(self):
1313         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1314         prefix = 'rpc' + prefix
1315         name = "%s.%s" % (prefix, self.get_dns_domain())
1316
1317         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
1318         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1319         add_rec_buf.rec = rec
1320         try:
1321             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1322                                      0, self.server_ip, self.get_dns_domain(),
1323                                      name, add_rec_buf, None)
1324
1325         except WERRORError as e:
1326             self.fail(str(e))
1327
1328         try:
1329            self.check_query_txt(prefix, ['NULL'])
1330         finally:
1331             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1332                                               0, self.server_ip, self.get_dns_domain(),
1333                                               name, None, add_rec_buf)
1334
1335     def test_update_add_hex_char_txt_record(self):
1336         "test adding records works"
1337         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1338         p = self.make_txt_update(prefix, txt)
1339         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1340         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1341         self.check_query_txt(prefix, txt)
1342         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1343                              self.get_dns_domain(),
1344                              "%s.%s" % (prefix, self.get_dns_domain()),
1345                              dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1346
1347     def test_update_add_hex_rpc_to_dns(self):
1348         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1349         prefix = 'rpc' + prefix
1350         name = "%s.%s" % (prefix, self.get_dns_domain())
1351
1352         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1353         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1354         add_rec_buf.rec = rec
1355         try:
1356             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1357                                      0, self.server_ip, self.get_dns_domain(),
1358                                      name, add_rec_buf, None)
1359
1360         except WERRORError as e:
1361             self.fail(str(e))
1362
1363         try:
1364            self.check_query_txt(prefix, txt)
1365         finally:
1366             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1367                                               0, self.server_ip, self.get_dns_domain(),
1368                                               name, None, add_rec_buf)
1369
1370     def test_update_add_slash_txt_record(self):
1371         "test adding records works"
1372         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1373         p = self.make_txt_update(prefix, txt)
1374         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1375         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1376         self.check_query_txt(prefix, txt)
1377         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1378                              self.get_dns_domain(),
1379                              "%s.%s" % (prefix, self.get_dns_domain()),
1380                              dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1381
1382     # This test fails against Windows as it eliminates slashes in RPC
1383     # One typical use for a slash is in records like 'var=value' to
1384     # escape '=' characters.
1385     def test_update_add_slash_rpc_to_dns(self):
1386         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1387         prefix = 'rpc' + prefix
1388         name = "%s.%s" % (prefix, self.get_dns_domain())
1389
1390         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1391         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1392         add_rec_buf.rec = rec
1393         try:
1394             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1395                                      0, self.server_ip, self.get_dns_domain(),
1396                                      name, add_rec_buf, None)
1397
1398         except WERRORError as e:
1399             self.fail(str(e))
1400
1401         try:
1402             self.check_query_txt(prefix, txt)
1403
1404         finally:
1405             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1406                                               0, self.server_ip, self.get_dns_domain(),
1407                                               name, None, add_rec_buf)
1408
1409     def test_update_add_two_txt_records(self):
1410         "test adding two txt records works"
1411         prefix, txt = 'textrec2', ['"This is a test"',
1412                                    '"and this is a test, too"']
1413         p = self.make_txt_update(prefix, txt)
1414         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1415         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1416         self.check_query_txt(prefix, txt)
1417         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1418                              self.get_dns_domain(),
1419                              "%s.%s" % (prefix, self.get_dns_domain()),
1420                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1421                              ' "\\"and this is a test, too\\""'))
1422
1423     def test_update_add_two_rpc_to_dns(self):
1424         prefix, txt = 'textrec2', ['"This is a test"',
1425                                    '"and this is a test, too"']
1426         prefix = 'rpc' + prefix
1427         name = "%s.%s" % (prefix, self.get_dns_domain())
1428
1429         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1430                                 '"\\"This is a test\\""' +
1431                                 ' "\\"and this is a test, too\\""')
1432         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1433         add_rec_buf.rec = rec
1434         try:
1435             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1436                                      0, self.server_ip, self.get_dns_domain(),
1437                                      name, add_rec_buf, None)
1438
1439         except WERRORError as e:
1440             self.fail(str(e))
1441
1442         try:
1443             self.check_query_txt(prefix, txt)
1444         finally:
1445             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1446                                               0, self.server_ip, self.get_dns_domain(),
1447                                               name, None, add_rec_buf)
1448
1449     def test_update_add_empty_txt_records(self):
1450         "test adding two txt records works"
1451         prefix, txt = 'emptytextrec', []
1452         p = self.make_txt_update(prefix, txt)
1453         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1454         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1455         self.check_query_txt(prefix, txt)
1456         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1457                              self.get_dns_domain(),
1458                              "%s.%s" % (prefix, self.get_dns_domain()),
1459                              dnsp.DNS_TYPE_TXT, ''))
1460
1461     def test_update_add_empty_rpc_to_dns(self):
1462         prefix, txt = 'rpcemptytextrec', []
1463
1464         name = "%s.%s" % (prefix, self.get_dns_domain())
1465
1466         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1467         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1468         add_rec_buf.rec = rec
1469         try:
1470             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1471                                      0, self.server_ip, self.get_dns_domain(),
1472                                      name, add_rec_buf, None)
1473         except WERRORError as e:
1474             self.fail(str(e))
1475
1476         try:
1477             self.check_query_txt(prefix, txt)
1478         finally:
1479             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1480                                               0, self.server_ip, self.get_dns_domain(),
1481                                               name, None, add_rec_buf)
1482
1483 TestProgram(module=__name__, opts=subunitopts)