tests dns: dns.py remove flake8 warnings
[samba.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
24 import ldb
25 import os
26 import sys
27 import struct
28 import socket
29 import samba.ndr as ndr
30 from samba import credentials
31 from samba.dcerpc import dns, dnsp, dnsserver
32 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
33 from samba.tests.subunitrun import SubunitOptions, TestProgram
34 from samba import werror, WERRORError
35 from samba.tests.dns_base import DNSTest
36 import samba.getopt as options
37 import optparse
38
39 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
40 sambaopts = options.SambaOptions(parser)
41 parser.add_option_group(sambaopts)
42
43 # This timeout only has relevance when testing against Windows
44 # Format errors tend to return patchy responses, so a timeout is needed.
45 parser.add_option("--timeout", type="int", dest="timeout",
46                   help="Specify timeout for DNS requests")
47
48 # use command line creds if available
49 credopts = options.CredentialsOptions(parser)
50 parser.add_option_group(credopts)
51 subunitopts = SubunitOptions(parser)
52 parser.add_option_group(subunitopts)
53
54 opts, args = parser.parse_args()
55
56 lp = sambaopts.get_loadparm()
57 creds = credopts.get_credentials(lp)
58
59 timeout = opts.timeout
60
61 if len(args) < 2:
62     parser.print_usage()
63     sys.exit(1)
64
65 server_name = args[0]
66 server_ip = args[1]
67 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
68
69
70 class TestSimpleQueries(DNSTest):
71     def setUp(self):
72         super(TestSimpleQueries, self).setUp()
73         global server, server_ip, lp, creds, timeout
74         self.server = server_name
75         self.server_ip = server_ip
76         self.lp = lp
77         self.creds = creds
78         self.timeout = timeout
79
80     def test_one_a_query(self):
81         "create a query packet containing one query record"
82         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
83         questions = []
84
85         name = "%s.%s" % (self.server, self.get_dns_domain())
86         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
87         print("asking for ", q.name)
88         questions.append(q)
89
90         self.finish_name_packet(p, questions)
91         (response, response_packet) =\
92             self.dns_transaction_udp(p, host=server_ip)
93         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
94         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
95         self.assertEquals(response.ancount, 1)
96         self.assertEquals(response.answers[0].rdata,
97                           self.server_ip)
98
99     def test_one_SOA_query(self):
100         "create a query packet containing one query record for the SOA"
101         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
102         questions = []
103
104         name = "%s" % (self.get_dns_domain())
105         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
106         print("asking for ", q.name)
107         questions.append(q)
108
109         self.finish_name_packet(p, questions)
110         (response, response_packet) =\
111             self.dns_transaction_udp(p, host=server_ip)
112         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
113         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
114         self.assertEquals(response.ancount, 1)
115         self.assertEquals(
116             response.answers[0].rdata.mname.upper(),
117             ("%s.%s" % (self.server, self.get_dns_domain())).upper())
118
119     def test_one_a_query_tcp(self):
120         "create a query packet containing one query record via TCP"
121         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
122         questions = []
123
124         name = "%s.%s" % (self.server, self.get_dns_domain())
125         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
126         print("asking for ", q.name)
127         questions.append(q)
128
129         self.finish_name_packet(p, questions)
130         (response, response_packet) =\
131             self.dns_transaction_tcp(p, host=server_ip)
132         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
133         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
134         self.assertEquals(response.ancount, 1)
135         self.assertEquals(response.answers[0].rdata,
136                           self.server_ip)
137
138     def test_one_mx_query(self):
139         "create a query packet causing an empty RCODE_OK answer"
140         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
141         questions = []
142
143         name = "%s.%s" % (self.server, self.get_dns_domain())
144         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
145         print("asking for ", q.name)
146         questions.append(q)
147
148         self.finish_name_packet(p, questions)
149         (response, response_packet) =\
150             self.dns_transaction_udp(p, host=server_ip)
151         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
152         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
153         self.assertEquals(response.ancount, 0)
154
155         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
156         questions = []
157
158         name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
159         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
160         print("asking for ", q.name)
161         questions.append(q)
162
163         self.finish_name_packet(p, questions)
164         (response, response_packet) =\
165             self.dns_transaction_udp(p, host=server_ip)
166         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
167         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
168         self.assertEquals(response.ancount, 0)
169
170     def test_two_queries(self):
171         "create a query packet containing two query records"
172         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
173         questions = []
174
175         name = "%s.%s" % (self.server, self.get_dns_domain())
176         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
177         questions.append(q)
178
179         name = "%s.%s" % ('bogusname', self.get_dns_domain())
180         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
181         questions.append(q)
182
183         self.finish_name_packet(p, questions)
184         try:
185             (response, response_packet) =\
186                 self.dns_transaction_udp(p, host=server_ip)
187             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
188         except socket.timeout:
189             # Windows chooses not to respond to incorrectly formatted queries.
190             # Although this appears to be non-deterministic even for the same
191             # request twice, it also appears to be based on a how poorly the
192             # request is formatted.
193             pass
194
195     def test_qtype_all_query(self):
196         "create a QTYPE_ALL query"
197         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
198         questions = []
199
200         name = "%s.%s" % (self.server, self.get_dns_domain())
201         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
202         print("asking for ", q.name)
203         questions.append(q)
204
205         self.finish_name_packet(p, questions)
206         (response, response_packet) =\
207             self.dns_transaction_udp(p, host=server_ip)
208
209         num_answers = 1
210         dc_ipv6 = os.getenv('SERVER_IPV6')
211         if dc_ipv6 is not None:
212             num_answers += 1
213
214         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
215         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
216         self.assertEquals(response.ancount, num_answers)
217         self.assertEquals(response.answers[0].rdata,
218                           self.server_ip)
219         if dc_ipv6 is not None:
220             self.assertEquals(response.answers[1].rdata, dc_ipv6)
221
222     def test_qclass_none_query(self):
223         "create a QCLASS_NONE query"
224         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
225         questions = []
226
227         name = "%s.%s" % (self.server, self.get_dns_domain())
228         q = self.make_name_question(
229             name,
230             dns.DNS_QTYPE_ALL,
231             dns.DNS_QCLASS_NONE)
232         questions.append(q)
233
234         self.finish_name_packet(p, questions)
235         try:
236             (response, response_packet) =\
237                 self.dns_transaction_udp(p, host=server_ip)
238             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
239         except socket.timeout:
240             # Windows chooses not to respond to incorrectly formatted queries.
241             # Although this appears to be non-deterministic even for the same
242             # request twice, it also appears to be based on a how poorly the
243             # request is formatted.
244             pass
245
246     def test_soa_hostname_query(self):
247         "create a SOA query for a hostname"
248         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
249         questions = []
250
251         name = "%s.%s" % (self.server, self.get_dns_domain())
252         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
253         questions.append(q)
254
255         self.finish_name_packet(p, questions)
256         (response, response_packet) =\
257             self.dns_transaction_udp(p, host=server_ip)
258         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
259         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
260         # We don't get SOA records for single hosts
261         self.assertEquals(response.ancount, 0)
262         # But we do respond with an authority section
263         self.assertEqual(response.nscount, 1)
264
265     def test_soa_domain_query(self):
266         "create a SOA query for a domain"
267         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
268         questions = []
269
270         name = self.get_dns_domain()
271         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
272         questions.append(q)
273
274         self.finish_name_packet(p, questions)
275         (response, response_packet) =\
276             self.dns_transaction_udp(p, host=server_ip)
277         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
278         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
279         self.assertEquals(response.ancount, 1)
280         self.assertEquals(response.answers[0].rdata.minimum, 3600)
281
282
283 class TestDNSUpdates(DNSTest):
284     def setUp(self):
285         super(TestDNSUpdates, self).setUp()
286         global server, server_ip, lp, creds, timeout
287         self.server = server_name
288         self.server_ip = server_ip
289         self.lp = lp
290         self.creds = creds
291         self.timeout = timeout
292
293     def test_two_updates(self):
294         "create two update requests"
295         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
296         updates = []
297
298         name = "%s.%s" % (self.server, self.get_dns_domain())
299         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
300         updates.append(u)
301
302         name = self.get_dns_domain()
303         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
304         updates.append(u)
305
306         self.finish_name_packet(p, updates)
307         try:
308             (response, response_packet) =\
309                 self.dns_transaction_udp(p, host=server_ip)
310             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
311         except socket.timeout:
312             # Windows chooses not to respond to incorrectly formatted queries.
313             # Although this appears to be non-deterministic even for the same
314             # request twice, it also appears to be based on a how poorly the
315             # request is formatted.
316             pass
317
318     def test_update_wrong_qclass(self):
319         "create update with DNS_QCLASS_NONE"
320         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
321         updates = []
322
323         name = self.get_dns_domain()
324         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
325         updates.append(u)
326
327         self.finish_name_packet(p, updates)
328         (response, response_packet) =\
329             self.dns_transaction_udp(p, host=server_ip)
330         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
331
332     def test_update_prereq_with_non_null_ttl(self):
333         "test update with a non-null TTL"
334         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
335         updates = []
336
337         name = self.get_dns_domain()
338
339         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
340         updates.append(u)
341         self.finish_name_packet(p, updates)
342
343         prereqs = []
344         r = dns.res_rec()
345         r.name = "%s.%s" % (self.server, self.get_dns_domain())
346         r.rr_type = dns.DNS_QTYPE_TXT
347         r.rr_class = dns.DNS_QCLASS_NONE
348         r.ttl = 1
349         r.length = 0
350         prereqs.append(r)
351
352         p.ancount = len(prereqs)
353         p.answers = prereqs
354
355         try:
356             (response, response_packet) =\
357                 self.dns_transaction_udp(p, host=server_ip)
358             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
359         except socket.timeout:
360             # Windows chooses not to respond to incorrectly formatted queries.
361             # Although this appears to be non-deterministic even for the same
362             # request twice, it also appears to be based on a how poorly the
363             # request is formatted.
364             pass
365
366     def test_update_prereq_with_non_null_length(self):
367         "test update with a non-null length"
368         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
369         updates = []
370
371         name = self.get_dns_domain()
372
373         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
374         updates.append(u)
375         self.finish_name_packet(p, updates)
376
377         prereqs = []
378         r = dns.res_rec()
379         r.name = "%s.%s" % (self.server, self.get_dns_domain())
380         r.rr_type = dns.DNS_QTYPE_TXT
381         r.rr_class = dns.DNS_QCLASS_ANY
382         r.ttl = 0
383         r.length = 1
384         prereqs.append(r)
385
386         p.ancount = len(prereqs)
387         p.answers = prereqs
388
389         (response, response_packet) =\
390             self.dns_transaction_udp(p, host=server_ip)
391         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
392
393     def test_update_prereq_nonexisting_name(self):
394         "test update with a nonexisting name"
395         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
396         updates = []
397
398         name = self.get_dns_domain()
399
400         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
401         updates.append(u)
402         self.finish_name_packet(p, updates)
403
404         prereqs = []
405         r = dns.res_rec()
406         r.name = "idontexist.%s" % self.get_dns_domain()
407         r.rr_type = dns.DNS_QTYPE_TXT
408         r.rr_class = dns.DNS_QCLASS_ANY
409         r.ttl = 0
410         r.length = 0
411         prereqs.append(r)
412
413         p.ancount = len(prereqs)
414         p.answers = prereqs
415
416         (response, response_packet) =\
417             self.dns_transaction_udp(p, host=server_ip)
418         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
419
420     def test_update_add_txt_record(self):
421         "test adding records works"
422         prefix, txt = 'textrec', ['"This is a test"']
423         p = self.make_txt_update(prefix, txt)
424         (response, response_packet) =\
425             self.dns_transaction_udp(p, host=server_ip)
426         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
427         self.check_query_txt(prefix, txt)
428
429     def test_delete_record(self):
430         "Test if deleting records works"
431
432         NAME = "deleterec.%s" % self.get_dns_domain()
433
434         # First, create a record to make sure we have a record to delete.
435         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
436         updates = []
437
438         name = self.get_dns_domain()
439
440         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
441         updates.append(u)
442         self.finish_name_packet(p, updates)
443
444         updates = []
445         r = dns.res_rec()
446         r.name = NAME
447         r.rr_type = dns.DNS_QTYPE_TXT
448         r.rr_class = dns.DNS_QCLASS_IN
449         r.ttl = 900
450         r.length = 0xffff
451         rdata = self.make_txt_record(['"This is a test"'])
452         r.rdata = rdata
453         updates.append(r)
454         p.nscount = len(updates)
455         p.nsrecs = updates
456
457         (response, response_packet) =\
458             self.dns_transaction_udp(p, host=server_ip)
459         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
460
461         # Now check the record is around
462         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
463         questions = []
464         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
465         questions.append(q)
466
467         self.finish_name_packet(p, questions)
468         (response, response_packet) =\
469             self.dns_transaction_udp(p, host=server_ip)
470         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
471
472         # Now delete the record
473         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
474         updates = []
475
476         name = self.get_dns_domain()
477
478         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
479         updates.append(u)
480         self.finish_name_packet(p, updates)
481
482         updates = []
483         r = dns.res_rec()
484         r.name = NAME
485         r.rr_type = dns.DNS_QTYPE_TXT
486         r.rr_class = dns.DNS_QCLASS_NONE
487         r.ttl = 0
488         r.length = 0xffff
489         rdata = self.make_txt_record(['"This is a test"'])
490         r.rdata = rdata
491         updates.append(r)
492         p.nscount = len(updates)
493         p.nsrecs = updates
494
495         (response, response_packet) =\
496             self.dns_transaction_udp(p, host=server_ip)
497         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
498
499         # And finally check it's gone
500         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
501         questions = []
502
503         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
504         questions.append(q)
505
506         self.finish_name_packet(p, questions)
507         (response, response_packet) =\
508             self.dns_transaction_udp(p, host=server_ip)
509         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
510
511     def test_readd_record(self):
512         "Test if adding, deleting and then readding a records works"
513
514         NAME = "readdrec.%s" % self.get_dns_domain()
515
516         # Create the record
517         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
518         updates = []
519
520         name = self.get_dns_domain()
521
522         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
523         updates.append(u)
524         self.finish_name_packet(p, updates)
525
526         updates = []
527         r = dns.res_rec()
528         r.name = NAME
529         r.rr_type = dns.DNS_QTYPE_TXT
530         r.rr_class = dns.DNS_QCLASS_IN
531         r.ttl = 900
532         r.length = 0xffff
533         rdata = self.make_txt_record(['"This is a test"'])
534         r.rdata = rdata
535         updates.append(r)
536         p.nscount = len(updates)
537         p.nsrecs = updates
538
539         (response, response_packet) =\
540             self.dns_transaction_udp(p, host=server_ip)
541         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
542
543         # Now check the record is around
544         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
545         questions = []
546         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
547         questions.append(q)
548
549         self.finish_name_packet(p, questions)
550         (response, response_packet) =\
551             self.dns_transaction_udp(p, host=server_ip)
552         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
553
554         # Now delete the record
555         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
556         updates = []
557
558         name = self.get_dns_domain()
559
560         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
561         updates.append(u)
562         self.finish_name_packet(p, updates)
563
564         updates = []
565         r = dns.res_rec()
566         r.name = NAME
567         r.rr_type = dns.DNS_QTYPE_TXT
568         r.rr_class = dns.DNS_QCLASS_NONE
569         r.ttl = 0
570         r.length = 0xffff
571         rdata = self.make_txt_record(['"This is a test"'])
572         r.rdata = rdata
573         updates.append(r)
574         p.nscount = len(updates)
575         p.nsrecs = updates
576
577         (response, response_packet) =\
578             self.dns_transaction_udp(p, host=server_ip)
579         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
580
581         # check it's gone
582         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
583         questions = []
584
585         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
586         questions.append(q)
587
588         self.finish_name_packet(p, questions)
589         (response, response_packet) =\
590             self.dns_transaction_udp(p, host=server_ip)
591         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
592
593         # recreate the record
594         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
595         updates = []
596
597         name = self.get_dns_domain()
598
599         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
600         updates.append(u)
601         self.finish_name_packet(p, updates)
602
603         updates = []
604         r = dns.res_rec()
605         r.name = NAME
606         r.rr_type = dns.DNS_QTYPE_TXT
607         r.rr_class = dns.DNS_QCLASS_IN
608         r.ttl = 900
609         r.length = 0xffff
610         rdata = self.make_txt_record(['"This is a test"'])
611         r.rdata = rdata
612         updates.append(r)
613         p.nscount = len(updates)
614         p.nsrecs = updates
615
616         (response, response_packet) =\
617             self.dns_transaction_udp(p, host=server_ip)
618         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
619
620         # Now check the record is around
621         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
622         questions = []
623         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
624         questions.append(q)
625
626         self.finish_name_packet(p, questions)
627         (response, response_packet) =\
628             self.dns_transaction_udp(p, host=server_ip)
629         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
630
631     def test_update_add_mx_record(self):
632         "test adding MX records works"
633         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
634         updates = []
635
636         name = self.get_dns_domain()
637
638         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
639         updates.append(u)
640         self.finish_name_packet(p, updates)
641
642         updates = []
643         r = dns.res_rec()
644         r.name = "%s" % self.get_dns_domain()
645         r.rr_type = dns.DNS_QTYPE_MX
646         r.rr_class = dns.DNS_QCLASS_IN
647         r.ttl = 900
648         r.length = 0xffff
649         rdata = dns.mx_record()
650         rdata.preference = 10
651         rdata.exchange = 'mail.%s' % self.get_dns_domain()
652         r.rdata = rdata
653         updates.append(r)
654         p.nscount = len(updates)
655         p.nsrecs = updates
656
657         (response, response_packet) =\
658             self.dns_transaction_udp(p, host=server_ip)
659         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
660
661         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
662         questions = []
663
664         name = "%s" % self.get_dns_domain()
665         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
666         questions.append(q)
667
668         self.finish_name_packet(p, questions)
669         (response, response_packet) =\
670             self.dns_transaction_udp(p, host=server_ip)
671         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
672         self.assertEqual(response.ancount, 1)
673         ans = response.answers[0]
674         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
675         self.assertEqual(ans.rdata.preference, 10)
676         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
677
678
679 class TestComplexQueries(DNSTest):
680     def make_dns_update(self, key, value, qtype):
681         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
682
683         name = self.get_dns_domain()
684         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
685         self.finish_name_packet(p, [u])
686
687         r = dns.res_rec()
688         r.name = key
689         r.rr_type = qtype
690         r.rr_class = dns.DNS_QCLASS_IN
691         r.ttl = 900
692         r.length = 0xffff
693         r.rdata = value
694         p.nscount = 1
695         p.nsrecs = [r]
696         (response, response_packet) =\
697             self.dns_transaction_udp(p, host=server_ip)
698         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
699
700     def setUp(self):
701         super(TestComplexQueries, self).setUp()
702
703         global server, server_ip, lp, creds, timeout
704         self.server = server_name
705         self.server_ip = server_ip
706         self.lp = lp
707         self.creds = creds
708         self.timeout = timeout
709
710     def test_one_a_query(self):
711         "create a query packet containing one query record"
712
713         try:
714
715             # Create the record
716             name = "cname_test.%s" % self.get_dns_domain()
717             rdata = "%s.%s" % (self.server, self.get_dns_domain())
718             self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
719
720             p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
721             questions = []
722
723             # Check the record
724             name = "cname_test.%s" % self.get_dns_domain()
725             q = self.make_name_question(name,
726                                         dns.DNS_QTYPE_A,
727                                         dns.DNS_QCLASS_IN)
728             print("asking for ", q.name)
729             questions.append(q)
730
731             self.finish_name_packet(p, questions)
732             (response, response_packet) =\
733                 self.dns_transaction_udp(p, host=self.server_ip)
734             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
735             self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
736             self.assertEquals(response.ancount, 2)
737             self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
738             self.assertEquals(response.answers[0].rdata, "%s.%s" %
739                               (self.server, self.get_dns_domain()))
740             self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
741             self.assertEquals(response.answers[1].rdata,
742                               self.server_ip)
743
744         finally:
745             # Delete the record
746             p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
747             updates = []
748
749             name = self.get_dns_domain()
750
751             u = self.make_name_question(name,
752                                         dns.DNS_QTYPE_SOA,
753                                         dns.DNS_QCLASS_IN)
754             updates.append(u)
755             self.finish_name_packet(p, updates)
756
757             updates = []
758             r = dns.res_rec()
759             r.name = "cname_test.%s" % self.get_dns_domain()
760             r.rr_type = dns.DNS_QTYPE_CNAME
761             r.rr_class = dns.DNS_QCLASS_NONE
762             r.ttl = 0
763             r.length = 0xffff
764             r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
765             updates.append(r)
766             p.nscount = len(updates)
767             p.nsrecs = updates
768
769             (response, response_packet) =\
770                 self.dns_transaction_udp(p, host=self.server_ip)
771             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
772
773     def test_cname_two_chain(self):
774         name0 = "cnamechain0.%s" % self.get_dns_domain()
775         name1 = "cnamechain1.%s" % self.get_dns_domain()
776         name2 = "cnamechain2.%s" % self.get_dns_domain()
777         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
778         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
779         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
780
781         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
782         questions = []
783         q = self.make_name_question(name1, dns.DNS_QTYPE_A,
784                                     dns.DNS_QCLASS_IN)
785         questions.append(q)
786
787         self.finish_name_packet(p, questions)
788         (response, response_packet) =\
789             self.dns_transaction_udp(p, host=server_ip)
790         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
791         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
792         self.assertEquals(response.ancount, 3)
793
794         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
795         self.assertEquals(response.answers[0].name, name1)
796         self.assertEquals(response.answers[0].rdata, name2)
797
798         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
799         self.assertEquals(response.answers[1].name, name2)
800         self.assertEquals(response.answers[1].rdata, name0)
801
802         self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
803         self.assertEquals(response.answers[2].rdata,
804                           self.server_ip)
805
806     def test_invalid_empty_cname(self):
807         name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
808         try:
809             self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
810         except AssertionError:
811             pass
812         else:
813             self.fail("Successfully added empty CNAME, which is invalid.")
814
815     def test_cname_two_chain_not_matching_qtype(self):
816         name0 = "cnamechain0.%s" % self.get_dns_domain()
817         name1 = "cnamechain1.%s" % self.get_dns_domain()
818         name2 = "cnamechain2.%s" % self.get_dns_domain()
819         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
820         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
821         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
822
823         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
824         questions = []
825         q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
826                                     dns.DNS_QCLASS_IN)
827         questions.append(q)
828
829         self.finish_name_packet(p, questions)
830         (response, response_packet) =\
831             self.dns_transaction_udp(p, host=server_ip)
832         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
833         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
834
835         # CNAME should return all intermediate results!
836         # Only the A records exists, not the TXT.
837         self.assertEquals(response.ancount, 2)
838
839         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
840         self.assertEquals(response.answers[0].name, name1)
841         self.assertEquals(response.answers[0].rdata, name2)
842
843         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
844         self.assertEquals(response.answers[1].name, name2)
845         self.assertEquals(response.answers[1].rdata, name0)
846
847
848 class TestInvalidQueries(DNSTest):
849     def setUp(self):
850         super(TestInvalidQueries, self).setUp()
851         global server, server_ip, lp, creds, timeout
852         self.server = server_name
853         self.server_ip = server_ip
854         self.lp = lp
855         self.creds = creds
856         self.timeout = timeout
857
858     def test_one_a_query(self):
859         """send 0 bytes follows by create a query packet
860            containing one query record"""
861
862         s = None
863         try:
864             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
865             s.connect((self.server_ip, 53))
866             s.send("", 0)
867         finally:
868             if s is not None:
869                 s.close()
870
871         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
872         questions = []
873
874         name = "%s.%s" % (self.server, self.get_dns_domain())
875         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
876         print("asking for ", q.name)
877         questions.append(q)
878
879         self.finish_name_packet(p, questions)
880         (response, response_packet) =\
881             self.dns_transaction_udp(p, host=self.server_ip)
882         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
883         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
884         self.assertEquals(response.ancount, 1)
885         self.assertEquals(response.answers[0].rdata,
886                           self.server_ip)
887
888     def test_one_a_reply(self):
889         "send a reply instead of a query"
890         global timeout
891
892         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
893         questions = []
894
895         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
896         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
897         print("asking for ", q.name)
898         questions.append(q)
899
900         self.finish_name_packet(p, questions)
901         p.operation |= dns.DNS_FLAG_REPLY
902         s = None
903         try:
904             send_packet = ndr.ndr_pack(p)
905             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
906             s.settimeout(timeout)
907             host = self.server_ip
908             s.connect((host, 53))
909             tcp_packet = struct.pack('!H', len(send_packet))
910             tcp_packet += send_packet
911             s.send(tcp_packet, 0)
912             recv_packet = s.recv(0xffff + 2, 0)
913             self.assertEquals(0, len(recv_packet))
914         except socket.timeout:
915             # Windows chooses not to respond to incorrectly formatted queries.
916             # Although this appears to be non-deterministic even for the same
917             # request twice, it also appears to be based on a how poorly the
918             # request is formatted.
919             pass
920         finally:
921             if s is not None:
922                 s.close()
923
924
925 class TestZones(DNSTest):
926     def setUp(self):
927         super(TestZones, self).setUp()
928         global server, server_ip, lp, creds, timeout
929         self.server = server_name
930         self.server_ip = server_ip
931         self.lp = lp
932         self.creds = creds
933         self.timeout = timeout
934
935         self.zone = "test.lan"
936         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
937                                             (self.server_ip),
938                                             self.lp, self.creds)
939
940         self.samdb = SamDB(url="ldap://" + self.server_ip,
941                            lp=self.get_loadparm(),
942                            session_info=system_session(),
943                            credentials=self.creds)
944
945         self.zone_dn = "DC=" + self.zone +\
946                        ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
947                        str(self.samdb.get_default_basedn())
948
949     def tearDown(self):
950         super(TestZones, self).tearDown()
951
952         try:
953             self.delete_zone(self.zone)
954         except RuntimeError as e:
955             (num, string) = e.args
956             if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
957                 raise
958
959     def create_zone(self, zone, aging_enabled=False):
960         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
961         zone_create.pszZoneName = zone
962         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
963         zone_create.fAging = int(aging_enabled)
964         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
965         zone_create.fDsIntegrated = 1
966         zone_create.fLoadExisting = 1
967         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
968         try:
969             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
970             self.rpc_conn.DnssrvOperation2(client_version,
971                                            0,
972                                            self.server_ip,
973                                            None,
974                                            0,
975                                            'ZoneCreate',
976                                            dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
977                                            zone_create)
978         except WERRORError as e:
979             self.fail(str(e))
980
981     def set_params(self, **kwargs):
982         zone = kwargs.pop('zone', None)
983         for key, val in kwargs.items():
984             name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
985             name_param.dwParam = val
986             name_param.pszNodeName = key
987
988             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
989             nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
990             try:
991                 self.rpc_conn.DnssrvOperation2(client_version,
992                                                0,
993                                                self.server,
994                                                zone,
995                                                0,
996                                                'ResetDwordProperty',
997                                                nap_type,
998                                                name_param)
999             except WERRORError as e:
1000                 self.fail(str(e))
1001
1002     def ldap_modify_dnsrecs(self, name, func):
1003         dn = 'DC={},{}'.format(name, self.zone_dn)
1004         dns_recs = self.ldap_get_dns_records(name)
1005         for rec in dns_recs:
1006             func(rec)
1007         update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1008         self.samdb.modify(ldb.Message.from_dict(self.samdb,
1009                                                 update_dict,
1010                                                 ldb.FLAG_MOD_REPLACE))
1011
1012     def dns_update_record(self, prefix, txt):
1013         p = self.make_txt_update(prefix, txt, self.zone)
1014         (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1015         self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1016         recs = self.ldap_get_dns_records(prefix)
1017         recs = [r for r in recs if r.data.str == txt]
1018         self.assertEqual(len(recs), 1)
1019         return recs[0]
1020
1021     def ldap_get_records(self, name):
1022         # The use of SCOPE_SUBTREE here avoids raising an exception in the
1023         # 0 results case for a test below.
1024
1025         expr = "(&(objectClass=dnsNode)(name={}))".format(name)
1026         return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1027                                  expression=expr, attrs=["*"])
1028
1029     def ldap_get_dns_records(self, name):
1030         records = self.ldap_get_records(name)
1031         return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1032                 for r in records[0].get('dnsRecord')]
1033
1034     def ldap_get_zone_settings(self):
1035         records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1036                    expression="(&(objectClass=dnsZone)" +
1037                                 "(name={}))".format(self.zone),
1038                                     attrs=["dNSProperty"])
1039         self.assertEqual(len(records), 1)
1040         props = [ndr_unpack(dnsp.DnsProperty, r)
1041                  for r in records[0].get('dNSProperty')]
1042
1043         # We have no choice but to repeat these here.
1044         zone_prop_ids = {0x00: "EMPTY",
1045                          0x01: "TYPE",
1046                          0x02: "ALLOW_UPDATE",
1047                          0x08: "SECURE_TIME",
1048                          0x10: "NOREFRESH_INTERVAL",
1049                          0x11: "SCAVENGING_SERVERS",
1050                          0x12: "AGING_ENABLED_TIME",
1051                          0x20: "REFRESH_INTERVAL",
1052                          0x40: "AGING_STATE",
1053                          0x80: "DELETED_FROM_HOSTNAME",
1054                          0x81: "MASTER_SERVERS",
1055                          0x82: "AUTO_NS_SERVERS",
1056                          0x83: "DCPROMO_CONVERT",
1057                          0x90: "SCAVENGING_SERVERS_DA",
1058                          0x91: "MASTER_SERVERS_DA",
1059                          0x92: "NS_SERVERS_DA",
1060                          0x100: "NODE_DBFLAGS"}
1061         return {zone_prop_ids[p.id].lower(): p.data for p in props}
1062
1063     def set_aging(self, enable=False):
1064         self.create_zone(self.zone, aging_enabled=enable)
1065         self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1066                         Aging=int(bool(enable)), zone=self.zone,
1067                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1068
1069     def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1070         self.set_aging(enable=True)
1071         settings = self.ldap_get_zone_settings()
1072         self.assertTrue(settings['aging_state'] is not None)
1073         self.assertTrue(settings['aging_state'])
1074
1075         rec = self.dns_update_record('agingtest', ['test txt'])
1076         self.assertNotEqual(rec.dwTimeStamp, 0)
1077
1078     def test_set_aging_disabled(self):
1079         self.set_aging(enable=False)
1080         settings = self.ldap_get_zone_settings()
1081         self.assertTrue(settings['aging_state'] is not None)
1082         self.assertFalse(settings['aging_state'])
1083
1084         rec = self.dns_update_record('agingtest', ['test txt'])
1085         self.assertNotEqual(rec.dwTimeStamp, 0)
1086
1087     def test_aging_update(self, enable=True):
1088         name, txt = 'agingtest', ['test txt']
1089         self.set_aging(enable=True)
1090         before_mod = self.dns_update_record(name, txt)
1091         if not enable:
1092             self.set_params(zone=self.zone, Aging=0)
1093         dec = 2
1094
1095         def mod_ts(rec):
1096             self.assertTrue(rec.dwTimeStamp > 0)
1097             rec.dwTimeStamp -= dec
1098         self.ldap_modify_dnsrecs(name, mod_ts)
1099         after_mod = self.ldap_get_dns_records(name)
1100         self.assertEqual(len(after_mod), 1)
1101         after_mod = after_mod[0]
1102         self.assertEqual(after_mod.dwTimeStamp,
1103                          before_mod.dwTimeStamp - dec)
1104         after_update = self.dns_update_record(name, txt)
1105         after_should_equal = before_mod if enable else after_mod
1106         self.assertEqual(after_should_equal.dwTimeStamp,
1107                          after_update.dwTimeStamp)
1108
1109     def test_aging_update_disabled(self):
1110         self.test_aging_update(enable=False)
1111
1112     def test_aging_refresh(self):
1113         name, txt = 'agingtest', ['test txt']
1114         self.create_zone(self.zone, aging_enabled=True)
1115         interval = 10
1116         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1117                         Aging=1, zone=self.zone,
1118                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1119         before_mod = self.dns_update_record(name, txt)
1120
1121         def mod_ts(rec):
1122             self.assertTrue(rec.dwTimeStamp > 0)
1123             rec.dwTimeStamp -= interval / 2
1124         self.ldap_modify_dnsrecs(name, mod_ts)
1125         update_during_norefresh = self.dns_update_record(name, txt)
1126
1127         def mod_ts(rec):
1128             self.assertTrue(rec.dwTimeStamp > 0)
1129             rec.dwTimeStamp -= interval + interval / 2
1130         self.ldap_modify_dnsrecs(name, mod_ts)
1131         update_during_refresh = self.dns_update_record(name, txt)
1132         self.assertEqual(update_during_norefresh.dwTimeStamp,
1133                          before_mod.dwTimeStamp - interval / 2)
1134         self.assertEqual(update_during_refresh.dwTimeStamp,
1135                          before_mod.dwTimeStamp)
1136
1137     def test_rpc_add_no_timestamp(self):
1138         name, txt = 'agingtest', ['test txt']
1139         self.set_aging(enable=True)
1140         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1141         rec_buf.rec = TXTRecord(txt)
1142         self.rpc_conn.DnssrvUpdateRecord2(
1143             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1144             0,
1145             self.server_ip,
1146             self.zone,
1147             name,
1148             rec_buf,
1149             None)
1150         recs = self.ldap_get_dns_records(name)
1151         self.assertEqual(len(recs), 1)
1152         self.assertEqual(recs[0].dwTimeStamp, 0)
1153
1154     def test_static_record_dynamic_update(self):
1155         name, txt = 'agingtest', ['test txt']
1156         txt2 = ['test txt2']
1157         self.set_aging(enable=True)
1158         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1159         rec_buf.rec = TXTRecord(txt)
1160         self.rpc_conn.DnssrvUpdateRecord2(
1161             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1162             0,
1163             self.server_ip,
1164             self.zone,
1165             name,
1166             rec_buf,
1167             None)
1168
1169         rec2 = self.dns_update_record(name, txt2)
1170         self.assertEqual(rec2.dwTimeStamp, 0)
1171
1172     def test_dynamic_record_static_update(self):
1173         name, txt = 'agingtest', ['test txt']
1174         txt2 = ['test txt2']
1175         txt3 = ['test txt3']
1176         self.set_aging(enable=True)
1177
1178         self.dns_update_record(name, txt)
1179
1180         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1181         rec_buf.rec = TXTRecord(txt2)
1182         self.rpc_conn.DnssrvUpdateRecord2(
1183             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1184             0,
1185             self.server_ip,
1186             self.zone,
1187             name,
1188             rec_buf,
1189             None)
1190
1191         self.dns_update_record(name, txt3)
1192
1193         recs = self.ldap_get_dns_records(name)
1194         # Put in dict because ldap recs might be out of order
1195         recs = {str(r.data.str): r for r in recs}
1196         self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1197         self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1198         self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1199
1200     def test_dns_tombstone_custom_match_rule(self):
1201         lp = self.get_loadparm()
1202         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1203                            session_info=system_session(),
1204                            credentials=self.creds)
1205
1206         name, txt = 'agingtest', ['test txt']
1207         name2, txt2 = 'agingtest2', ['test txt2']
1208         name3, txt3 = 'agingtest3', ['test txt3']
1209         self.create_zone(self.zone, aging_enabled=True)
1210         interval = 10
1211         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1212                         Aging=1, zone=self.zone,
1213                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1214
1215         self.dns_update_record(name, txt),
1216
1217         self.dns_update_record(name2, txt),
1218         self.dns_update_record(name2, txt2),
1219
1220         self.dns_update_record(name3, txt),
1221         self.dns_update_record(name3, txt2),
1222         last_update = self.dns_update_record(name3, txt3)
1223
1224         # Modify txt1 of the first 2 names
1225         def mod_ts(rec):
1226             if rec.data.str == txt:
1227                 rec.dwTimeStamp -= 2
1228         self.ldap_modify_dnsrecs(name, mod_ts)
1229         self.ldap_modify_dnsrecs(name2, mod_ts)
1230
1231         self.ldap_get_dns_records(name3)
1232         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
1233         expr = expr.format(int(last_update.dwTimeStamp) - 1)
1234         try:
1235             res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1236                                     expression=expr, attrs=["*"])
1237         except ldb.LdbError as e:
1238             self.fail(str(e))
1239         updated_names = {str(r.get('name')) for r in res}
1240         self.assertEqual(updated_names, set([name, name2]))
1241
1242     def test_dns_tombstone_custom_match_rule_fail(self):
1243         self.create_zone(self.zone, aging_enabled=True)
1244
1245         # The check here is that this does not blow up on silly input
1246         expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1247         res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1248                                 expression=expr, attrs=["*"])
1249         self.assertEquals(len(res), 0)
1250
1251     def test_basic_scavenging(self):
1252         lp = self.get_loadparm()
1253         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1254                            session_info=system_session(),
1255                            credentials=self.creds)
1256
1257         self.create_zone(self.zone, aging_enabled=True)
1258         interval = 1
1259         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1260                         zone=self.zone, Aging=1,
1261                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1262         name, txt = 'agingtest', ['test txt']
1263         name2, txt2 = 'agingtest2', ['test txt2']
1264         name3, txt3 = 'agingtest3', ['test txt3']
1265         self.dns_update_record(name, txt)
1266         self.dns_update_record(name2, txt)
1267         self.dns_update_record(name2, txt2)
1268         self.dns_update_record(name3, txt)
1269         self.dns_update_record(name3, txt2)
1270         last_add = self.dns_update_record(name3, txt3)
1271
1272         def mod_ts(rec):
1273             self.assertTrue(rec.dwTimeStamp > 0)
1274             if rec.data.str == txt:
1275                 rec.dwTimeStamp -= interval * 5
1276         self.ldap_modify_dnsrecs(name, mod_ts)
1277         self.ldap_modify_dnsrecs(name2, mod_ts)
1278         self.ldap_modify_dnsrecs(name3, mod_ts)
1279         self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1280         dsdb._scavenge_dns_records(self.samdb)
1281
1282         recs = self.ldap_get_dns_records(name)
1283         self.assertEqual(len(recs), 1)
1284         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1285
1286         recs = self.ldap_get_dns_records(name2)
1287         self.assertEqual(len(recs), 1)
1288         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1289         self.assertEqual(recs[0].data.str, txt2)
1290
1291         recs = self.ldap_get_dns_records(name3)
1292         self.assertEqual(len(recs), 2)
1293         txts = {str(r.data.str) for r in recs}
1294         self.assertEqual(txts, {str(txt2), str(txt3)})
1295         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1296         self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1297
1298         for make_it_work in [False, True]:
1299             inc = -1 if make_it_work else 1
1300
1301             def mod_ts(rec):
1302                 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1303             self.ldap_modify_dnsrecs(name, mod_ts)
1304             dsdb._dns_delete_tombstones(self.samdb)
1305             recs = self.ldap_get_records(name)
1306             if make_it_work:
1307                 self.assertEqual(len(recs), 0)
1308             else:
1309                 self.assertEqual(len(recs), 1)
1310
1311     def delete_zone(self, zone):
1312         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1313                                        0,
1314                                        self.server_ip,
1315                                        zone,
1316                                        0,
1317                                        'DeleteZoneFromDs',
1318                                        dnsserver.DNSSRV_TYPEID_NULL,
1319                                        None)
1320
1321     def test_soa_query(self):
1322         zone = "test.lan"
1323         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1324         questions = []
1325
1326         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1327         questions.append(q)
1328         self.finish_name_packet(p, questions)
1329
1330         (response, response_packet) =\
1331             self.dns_transaction_udp(p, host=server_ip)
1332         # Windows returns OK while BIND logically seems to return NXDOMAIN
1333         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1334         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1335         self.assertEquals(response.ancount, 0)
1336
1337         self.create_zone(zone)
1338         (response, response_packet) =\
1339             self.dns_transaction_udp(p, host=server_ip)
1340         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1341         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1342         self.assertEquals(response.ancount, 1)
1343         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1344
1345         self.delete_zone(zone)
1346         (response, response_packet) =\
1347             self.dns_transaction_udp(p, host=server_ip)
1348         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1349         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1350         self.assertEquals(response.ancount, 0)
1351
1352
1353 class TestRPCRoundtrip(DNSTest):
1354     def setUp(self):
1355         super(TestRPCRoundtrip, self).setUp()
1356         global server, server_ip, lp, creds
1357         self.server = server_name
1358         self.server_ip = server_ip
1359         self.lp = lp
1360         self.creds = creds
1361         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1362                                             (self.server_ip),
1363                                             self.lp,
1364                                             self.creds)
1365
1366     def tearDown(self):
1367         super(TestRPCRoundtrip, self).tearDown()
1368
1369     def test_update_add_txt_rpc_to_dns(self):
1370         prefix, txt = 'rpctextrec', ['"This is a test"']
1371
1372         name = "%s.%s" % (prefix, self.get_dns_domain())
1373
1374         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1375         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1376         add_rec_buf.rec = rec
1377         try:
1378             self.rpc_conn.DnssrvUpdateRecord2(
1379                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1380                 0,
1381                 self.server_ip,
1382                 self.get_dns_domain(),
1383                 name,
1384                 add_rec_buf,
1385                 None)
1386
1387         except WERRORError as e:
1388             self.fail(str(e))
1389
1390         try:
1391             self.check_query_txt(prefix, txt)
1392         finally:
1393             self.rpc_conn.DnssrvUpdateRecord2(
1394                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1395                 0,
1396                 self.server_ip,
1397                 self.get_dns_domain(),
1398                 name,
1399                 None,
1400                 add_rec_buf)
1401
1402     def test_update_add_null_padded_txt_record(self):
1403         "test adding records works"
1404         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1405         p = self.make_txt_update(prefix, txt)
1406         (response, response_packet) =\
1407             self.dns_transaction_udp(p, host=server_ip)
1408         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1409         self.check_query_txt(prefix, txt)
1410         self.assertIsNotNone(
1411             dns_record_match(self.rpc_conn,
1412                 self.server_ip,
1413                 self.get_dns_domain(),
1414                 "%s.%s" % (prefix, self.get_dns_domain()),
1415                 dnsp.DNS_TYPE_TXT,
1416                 '"\\"This is a test\\"" "" ""'))
1417
1418         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1419         p = self.make_txt_update(prefix, txt)
1420         (response, response_packet) =\
1421             self.dns_transaction_udp(p, host=server_ip)
1422         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1423         self.check_query_txt(prefix, txt)
1424         self.assertIsNotNone(
1425             dns_record_match(
1426                 self.rpc_conn,
1427                 self.server_ip,
1428                 self.get_dns_domain(),
1429                 "%s.%s" % (prefix, self.get_dns_domain()),
1430                 dnsp.DNS_TYPE_TXT,
1431                 '"\\"This is a test\\"" "" "" "more text"'))
1432
1433         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1434         p = self.make_txt_update(prefix, txt)
1435         (response, response_packet) =\
1436             self.dns_transaction_udp(p, host=server_ip)
1437         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1438         self.check_query_txt(prefix, txt)
1439         self.assertIsNotNone(
1440             dns_record_match(
1441                 self.rpc_conn,
1442                 self.server_ip,
1443                 self.get_dns_domain(),
1444                 "%s.%s" % (prefix, self.get_dns_domain()),
1445                 dnsp.DNS_TYPE_TXT,
1446                 '"" "" "\\"This is a test\\""'))
1447
1448     def test_update_add_padding_rpc_to_dns(self):
1449         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1450         prefix = 'rpc' + prefix
1451         name = "%s.%s" % (prefix, self.get_dns_domain())
1452
1453         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1454                                  '"\\"This is a test\\"" "" ""')
1455         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1456         add_rec_buf.rec = rec
1457         try:
1458             self.rpc_conn.DnssrvUpdateRecord2(
1459                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1460                 0,
1461                 self.server_ip,
1462                 self.get_dns_domain(),
1463                 name,
1464                 add_rec_buf,
1465                 None)
1466
1467         except WERRORError as e:
1468             self.fail(str(e))
1469
1470         try:
1471             self.check_query_txt(prefix, txt)
1472         finally:
1473             self.rpc_conn.DnssrvUpdateRecord2(
1474                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1475                 0,
1476                 self.server_ip,
1477                 self.get_dns_domain(),
1478                 name,
1479                 None,
1480                 add_rec_buf)
1481
1482         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1483         prefix = 'rpc' + prefix
1484         name = "%s.%s" % (prefix, self.get_dns_domain())
1485
1486         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1487                                  '"\\"This is a test\\"" "" "" "more text"')
1488         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1489         add_rec_buf.rec = rec
1490         try:
1491             self.rpc_conn.DnssrvUpdateRecord2(
1492                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1493                 0,
1494                 self.server_ip,
1495                 self.get_dns_domain(),
1496                 name,
1497                 add_rec_buf,
1498                 None)
1499
1500         except WERRORError as e:
1501             self.fail(str(e))
1502
1503         try:
1504             self.check_query_txt(prefix, txt)
1505         finally:
1506             self.rpc_conn.DnssrvUpdateRecord2(
1507                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1508                 0,
1509                 self.server_ip,
1510                 self.get_dns_domain(),
1511                 name,
1512                 None,
1513                 add_rec_buf)
1514
1515         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1516         prefix = 'rpc' + prefix
1517         name = "%s.%s" % (prefix, self.get_dns_domain())
1518
1519         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1520                                  '"" "" "\\"This is a test\\""')
1521         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1522         add_rec_buf.rec = rec
1523         try:
1524             self.rpc_conn.DnssrvUpdateRecord2(
1525                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1526                 0,
1527                 self.server_ip,
1528                 self.get_dns_domain(),
1529                 name,
1530                 add_rec_buf,
1531                 None)
1532         except WERRORError as e:
1533             self.fail(str(e))
1534
1535         try:
1536             self.check_query_txt(prefix, txt)
1537         finally:
1538             self.rpc_conn.DnssrvUpdateRecord2(
1539                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1540                 0,
1541                 self.server_ip,
1542                 self.get_dns_domain(),
1543                 name,
1544                 None,
1545                 add_rec_buf)
1546
1547     # Test is incomplete due to strlen against txt records
1548     def test_update_add_null_char_txt_record(self):
1549         "test adding records works"
1550         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1551         p = self.make_txt_update(prefix, txt)
1552         (response, response_packet) =\
1553             self.dns_transaction_udp(p, host=server_ip)
1554         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1555         self.check_query_txt(prefix, ['NULL'])
1556         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1557                              self.get_dns_domain(),
1558                              "%s.%s" % (prefix, self.get_dns_domain()),
1559                              dnsp.DNS_TYPE_TXT, '"NULL"'))
1560
1561         prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1562         p = self.make_txt_update(prefix, txt)
1563         (response, response_packet) =\
1564             self.dns_transaction_udp(p, host=server_ip)
1565         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1566         self.check_query_txt(prefix, ['NULL', 'NULL'])
1567         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1568                              self.get_dns_domain(),
1569                              "%s.%s" % (prefix, self.get_dns_domain()),
1570                              dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1571
1572     def test_update_add_null_char_rpc_to_dns(self):
1573         prefix = 'rpcnulltextrec'
1574         name = "%s.%s" % (prefix, self.get_dns_domain())
1575
1576         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1577         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1578         add_rec_buf.rec = rec
1579         try:
1580             self.rpc_conn.DnssrvUpdateRecord2(
1581                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1582                 0,
1583                 self.server_ip,
1584                 self.get_dns_domain(),
1585                 name,
1586                 add_rec_buf,
1587                 None)
1588
1589         except WERRORError as e:
1590             self.fail(str(e))
1591
1592         try:
1593             self.check_query_txt(prefix, ['NULL'])
1594         finally:
1595             self.rpc_conn.DnssrvUpdateRecord2(
1596                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1597                 0,
1598                 self.server_ip,
1599                 self.get_dns_domain(),
1600                 name,
1601                 None,
1602                 add_rec_buf)
1603
1604     def test_update_add_hex_char_txt_record(self):
1605         "test adding records works"
1606         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1607         p = self.make_txt_update(prefix, txt)
1608         (response, response_packet) =\
1609             self.dns_transaction_udp(p, host=server_ip)
1610         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1611         self.check_query_txt(prefix, txt)
1612         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1613                              self.get_dns_domain(),
1614                              "%s.%s" % (prefix, self.get_dns_domain()),
1615                              dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1616
1617     def test_update_add_hex_rpc_to_dns(self):
1618         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1619         prefix = 'rpc' + prefix
1620         name = "%s.%s" % (prefix, self.get_dns_domain())
1621
1622         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1623         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1624         add_rec_buf.rec = rec
1625         try:
1626             self.rpc_conn.DnssrvUpdateRecord2(
1627                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1628                 0,
1629                 self.server_ip,
1630                 self.get_dns_domain(),
1631                 name,
1632                 add_rec_buf,
1633                 None)
1634
1635         except WERRORError as e:
1636             self.fail(str(e))
1637
1638         try:
1639             self.check_query_txt(prefix, txt)
1640         finally:
1641             self.rpc_conn.DnssrvUpdateRecord2(
1642                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1643                 0,
1644                 self.server_ip,
1645                 self.get_dns_domain(),
1646                 name,
1647                 None,
1648                 add_rec_buf)
1649
1650     def test_update_add_slash_txt_record(self):
1651         "test adding records works"
1652         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1653         p = self.make_txt_update(prefix, txt)
1654         (response, response_packet) =\
1655             self.dns_transaction_udp(p, host=server_ip)
1656         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1657         self.check_query_txt(prefix, txt)
1658         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1659                              self.get_dns_domain(),
1660                              "%s.%s" % (prefix, self.get_dns_domain()),
1661                              dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1662
1663     # This test fails against Windows as it eliminates slashes in RPC
1664     # One typical use for a slash is in records like 'var=value' to
1665     # escape '=' characters.
1666     def test_update_add_slash_rpc_to_dns(self):
1667         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1668         prefix = 'rpc' + prefix
1669         name = "%s.%s" % (prefix, self.get_dns_domain())
1670
1671         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1672         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1673         add_rec_buf.rec = rec
1674         try:
1675             self.rpc_conn.DnssrvUpdateRecord2(
1676                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1677                 0,
1678                 self.server_ip,
1679                 self.get_dns_domain(),
1680                 name,
1681                 add_rec_buf,
1682                 None)
1683
1684         except WERRORError as e:
1685             self.fail(str(e))
1686
1687         try:
1688             self.check_query_txt(prefix, txt)
1689
1690         finally:
1691             self.rpc_conn.DnssrvUpdateRecord2(
1692                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1693                 0,
1694                 self.server_ip,
1695                 self.get_dns_domain(),
1696                 name,
1697                 None,
1698                 add_rec_buf)
1699
1700     def test_update_add_two_txt_records(self):
1701         "test adding two txt records works"
1702         prefix, txt = 'textrec2', ['"This is a test"',
1703                                    '"and this is a test, too"']
1704         p = self.make_txt_update(prefix, txt)
1705         (response, response_packet) =\
1706             self.dns_transaction_udp(p, host=server_ip)
1707         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1708         self.check_query_txt(prefix, txt)
1709         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1710                              self.get_dns_domain(),
1711                              "%s.%s" % (prefix, self.get_dns_domain()),
1712                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1713                              ' "\\"and this is a test, too\\""'))
1714
1715     def test_update_add_two_rpc_to_dns(self):
1716         prefix, txt = 'textrec2', ['"This is a test"',
1717                                    '"and this is a test, too"']
1718         prefix = 'rpc' + prefix
1719         name = "%s.%s" % (prefix, self.get_dns_domain())
1720
1721         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1722                                 '"\\"This is a test\\""' +
1723                                 ' "\\"and this is a test, too\\""')
1724         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1725         add_rec_buf.rec = rec
1726         try:
1727             self.rpc_conn.DnssrvUpdateRecord2(
1728                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1729                 0,
1730                 self.server_ip,
1731                 self.get_dns_domain(),
1732                 name,
1733                 add_rec_buf,
1734                 None)
1735
1736         except WERRORError as e:
1737             self.fail(str(e))
1738
1739         try:
1740             self.check_query_txt(prefix, txt)
1741         finally:
1742             self.rpc_conn.DnssrvUpdateRecord2(
1743                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1744                 0,
1745                 self.server_ip,
1746                 self.get_dns_domain(),
1747                 name,
1748                 None,
1749                 add_rec_buf)
1750
1751     def test_update_add_empty_txt_records(self):
1752         "test adding two txt records works"
1753         prefix, txt = 'emptytextrec', []
1754         p = self.make_txt_update(prefix, txt)
1755         (response, response_packet) =\
1756             self.dns_transaction_udp(p, host=server_ip)
1757         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1758         self.check_query_txt(prefix, txt)
1759         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1760                              self.get_dns_domain(),
1761                              "%s.%s" % (prefix, self.get_dns_domain()),
1762                              dnsp.DNS_TYPE_TXT, ''))
1763
1764     def test_update_add_empty_rpc_to_dns(self):
1765         prefix, txt = 'rpcemptytextrec', []
1766
1767         name = "%s.%s" % (prefix, self.get_dns_domain())
1768
1769         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1770         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1771         add_rec_buf.rec = rec
1772         try:
1773             self.rpc_conn.DnssrvUpdateRecord2(
1774                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1775                 0,
1776                 self.server_ip,
1777                 self.get_dns_domain(),
1778                 name,
1779                 add_rec_buf,
1780                 None)
1781         except WERRORError as e:
1782             self.fail(str(e))
1783
1784         try:
1785             self.check_query_txt(prefix, txt)
1786         finally:
1787             self.rpc_conn.DnssrvUpdateRecord2(
1788                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1789                 0,
1790                 self.server_ip,
1791                 self.get_dns_domain(),
1792                 name,
1793                 None,
1794                 add_rec_buf)
1795
1796 TestProgram(module=__name__, opts=subunitopts)