PEP8: fix E128: continuation line under-indented for visual indent
[bbaumbach/samba-autobuild/.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19
20 from samba import dsdb
21 from samba.ndr import ndr_unpack, ndr_pack
22 from samba.samdb import SamDB
23 from samba.auth import system_session
24 import ldb
25 from ldb import ERR_OPERATIONS_ERROR
26 import os
27 import sys
28 import struct
29 import socket
30 import samba.ndr as ndr
31 from samba import credentials
32 from samba.dcerpc import dns, dnsp, dnsserver
33 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
34 from samba.tests.subunitrun import SubunitOptions, TestProgram
35 from samba import werror, WERRORError
36 from samba.tests.dns_base import DNSTest
37 import samba.getopt as options
38 import optparse
39 import samba.dcerpc.dnsp
40
41
42 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
43 sambaopts = options.SambaOptions(parser)
44 parser.add_option_group(sambaopts)
45
46 # This timeout only has relevance when testing against Windows
47 # Format errors tend to return patchy responses, so a timeout is needed.
48 parser.add_option("--timeout", type="int", dest="timeout",
49                   help="Specify timeout for DNS requests")
50
51 # use command line creds if available
52 credopts = options.CredentialsOptions(parser)
53 parser.add_option_group(credopts)
54 subunitopts = SubunitOptions(parser)
55 parser.add_option_group(subunitopts)
56
57 opts, args = parser.parse_args()
58
59 lp = sambaopts.get_loadparm()
60 creds = credopts.get_credentials(lp)
61
62 timeout = opts.timeout
63
64 if len(args) < 2:
65     parser.print_usage()
66     sys.exit(1)
67
68 server_name = args[0]
69 server_ip = args[1]
70 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
71
72
73 class TestSimpleQueries(DNSTest):
74     def setUp(self):
75         super(TestSimpleQueries, self).setUp()
76         global server, server_ip, lp, creds, timeout
77         self.server = server_name
78         self.server_ip = server_ip
79         self.lp = lp
80         self.creds = creds
81         self.timeout = timeout
82
83     def test_one_a_query(self):
84         "create a query packet containing one query record"
85         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
86         questions = []
87
88         name = "%s.%s" % (self.server, self.get_dns_domain())
89         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
90         print("asking for ", q.name)
91         questions.append(q)
92
93         self.finish_name_packet(p, questions)
94         (response, response_packet) =\
95             self.dns_transaction_udp(p, host=server_ip)
96         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
97         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
98         self.assertEquals(response.ancount, 1)
99         self.assertEquals(response.answers[0].rdata,
100                           self.server_ip)
101
102     def test_one_SOA_query(self):
103         "create a query packet containing one query record for the SOA"
104         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
105         questions = []
106
107         name = "%s" % (self.get_dns_domain())
108         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
109         print("asking for ", q.name)
110         questions.append(q)
111
112         self.finish_name_packet(p, questions)
113         (response, response_packet) =\
114             self.dns_transaction_udp(p, host=server_ip)
115         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
116         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
117         self.assertEquals(response.ancount, 1)
118         self.assertEquals(
119             response.answers[0].rdata.mname.upper(),
120             ("%s.%s" % (self.server, self.get_dns_domain())).upper())
121
122     def test_one_a_query_tcp(self):
123         "create a query packet containing one query record via TCP"
124         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
125         questions = []
126
127         name = "%s.%s" % (self.server, self.get_dns_domain())
128         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
129         print("asking for ", q.name)
130         questions.append(q)
131
132         self.finish_name_packet(p, questions)
133         (response, response_packet) =\
134             self.dns_transaction_tcp(p, host=server_ip)
135         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
136         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
137         self.assertEquals(response.ancount, 1)
138         self.assertEquals(response.answers[0].rdata,
139                           self.server_ip)
140
141     def test_one_mx_query(self):
142         "create a query packet causing an empty RCODE_OK answer"
143         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
144         questions = []
145
146         name = "%s.%s" % (self.server, self.get_dns_domain())
147         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
148         print("asking for ", q.name)
149         questions.append(q)
150
151         self.finish_name_packet(p, questions)
152         (response, response_packet) =\
153             self.dns_transaction_udp(p, host=server_ip)
154         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
155         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
156         self.assertEquals(response.ancount, 0)
157
158         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
159         questions = []
160
161         name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
162         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
163         print("asking for ", q.name)
164         questions.append(q)
165
166         self.finish_name_packet(p, questions)
167         (response, response_packet) =\
168             self.dns_transaction_udp(p, host=server_ip)
169         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
170         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
171         self.assertEquals(response.ancount, 0)
172
173     def test_two_queries(self):
174         "create a query packet containing two query records"
175         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
176         questions = []
177
178         name = "%s.%s" % (self.server, self.get_dns_domain())
179         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
180         questions.append(q)
181
182         name = "%s.%s" % ('bogusname', self.get_dns_domain())
183         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
184         questions.append(q)
185
186         self.finish_name_packet(p, questions)
187         try:
188             (response, response_packet) =\
189                 self.dns_transaction_udp(p, host=server_ip)
190             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
191         except socket.timeout:
192             # Windows chooses not to respond to incorrectly formatted queries.
193             # Although this appears to be non-deterministic even for the same
194             # request twice, it also appears to be based on a how poorly the
195             # request is formatted.
196             pass
197
198     def test_qtype_all_query(self):
199         "create a QTYPE_ALL query"
200         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
201         questions = []
202
203         name = "%s.%s" % (self.server, self.get_dns_domain())
204         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
205         print("asking for ", q.name)
206         questions.append(q)
207
208         self.finish_name_packet(p, questions)
209         (response, response_packet) =\
210             self.dns_transaction_udp(p, host=server_ip)
211
212         num_answers = 1
213         dc_ipv6 = os.getenv('SERVER_IPV6')
214         if dc_ipv6 is not None:
215             num_answers += 1
216
217         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
218         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
219         self.assertEquals(response.ancount, num_answers)
220         self.assertEquals(response.answers[0].rdata,
221                           self.server_ip)
222         if dc_ipv6 is not None:
223             self.assertEquals(response.answers[1].rdata, dc_ipv6)
224
225     def test_qclass_none_query(self):
226         "create a QCLASS_NONE query"
227         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
228         questions = []
229
230         name = "%s.%s" % (self.server, self.get_dns_domain())
231         q = self.make_name_question(
232             name,
233             dns.DNS_QTYPE_ALL,
234             dns.DNS_QCLASS_NONE)
235         questions.append(q)
236
237         self.finish_name_packet(p, questions)
238         try:
239             (response, response_packet) =\
240                 self.dns_transaction_udp(p, host=server_ip)
241             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
242         except socket.timeout:
243             # Windows chooses not to respond to incorrectly formatted queries.
244             # Although this appears to be non-deterministic even for the same
245             # request twice, it also appears to be based on a how poorly the
246             # request is formatted.
247             pass
248
249     def test_soa_hostname_query(self):
250         "create a SOA query for a hostname"
251         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
252         questions = []
253
254         name = "%s.%s" % (self.server, self.get_dns_domain())
255         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
256         questions.append(q)
257
258         self.finish_name_packet(p, questions)
259         (response, response_packet) =\
260             self.dns_transaction_udp(p, host=server_ip)
261         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
262         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
263         # We don't get SOA records for single hosts
264         self.assertEquals(response.ancount, 0)
265         # But we do respond with an authority section
266         self.assertEqual(response.nscount, 1)
267
268     def test_soa_domain_query(self):
269         "create a SOA query for a domain"
270         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
271         questions = []
272
273         name = self.get_dns_domain()
274         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
275         questions.append(q)
276
277         self.finish_name_packet(p, questions)
278         (response, response_packet) =\
279             self.dns_transaction_udp(p, host=server_ip)
280         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
281         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
282         self.assertEquals(response.ancount, 1)
283         self.assertEquals(response.answers[0].rdata.minimum, 3600)
284
285
286 class TestDNSUpdates(DNSTest):
287     def setUp(self):
288         super(TestDNSUpdates, self).setUp()
289         global server, server_ip, lp, creds, timeout
290         self.server = server_name
291         self.server_ip = server_ip
292         self.lp = lp
293         self.creds = creds
294         self.timeout = timeout
295
296     def test_two_updates(self):
297         "create two update requests"
298         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
299         updates = []
300
301         name = "%s.%s" % (self.server, self.get_dns_domain())
302         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
303         updates.append(u)
304
305         name = self.get_dns_domain()
306         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
307         updates.append(u)
308
309         self.finish_name_packet(p, updates)
310         try:
311             (response, response_packet) =\
312                 self.dns_transaction_udp(p, host=server_ip)
313             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
314         except socket.timeout:
315             # Windows chooses not to respond to incorrectly formatted queries.
316             # Although this appears to be non-deterministic even for the same
317             # request twice, it also appears to be based on a how poorly the
318             # request is formatted.
319             pass
320
321     def test_update_wrong_qclass(self):
322         "create update with DNS_QCLASS_NONE"
323         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
324         updates = []
325
326         name = self.get_dns_domain()
327         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
328         updates.append(u)
329
330         self.finish_name_packet(p, updates)
331         (response, response_packet) =\
332             self.dns_transaction_udp(p, host=server_ip)
333         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
334
335     def test_update_prereq_with_non_null_ttl(self):
336         "test update with a non-null TTL"
337         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
338         updates = []
339
340         name = self.get_dns_domain()
341
342         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
343         updates.append(u)
344         self.finish_name_packet(p, updates)
345
346         prereqs = []
347         r = dns.res_rec()
348         r.name = "%s.%s" % (self.server, self.get_dns_domain())
349         r.rr_type = dns.DNS_QTYPE_TXT
350         r.rr_class = dns.DNS_QCLASS_NONE
351         r.ttl = 1
352         r.length = 0
353         prereqs.append(r)
354
355         p.ancount = len(prereqs)
356         p.answers = prereqs
357
358         try:
359             (response, response_packet) =\
360                 self.dns_transaction_udp(p, host=server_ip)
361             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
362         except socket.timeout:
363             # Windows chooses not to respond to incorrectly formatted queries.
364             # Although this appears to be non-deterministic even for the same
365             # request twice, it also appears to be based on a how poorly the
366             # request is formatted.
367             pass
368
369     def test_update_prereq_with_non_null_length(self):
370         "test update with a non-null length"
371         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
372         updates = []
373
374         name = self.get_dns_domain()
375
376         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
377         updates.append(u)
378         self.finish_name_packet(p, updates)
379
380         prereqs = []
381         r = dns.res_rec()
382         r.name = "%s.%s" % (self.server, self.get_dns_domain())
383         r.rr_type = dns.DNS_QTYPE_TXT
384         r.rr_class = dns.DNS_QCLASS_ANY
385         r.ttl = 0
386         r.length = 1
387         prereqs.append(r)
388
389         p.ancount = len(prereqs)
390         p.answers = prereqs
391
392         (response, response_packet) =\
393             self.dns_transaction_udp(p, host=server_ip)
394         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
395
396     def test_update_prereq_nonexisting_name(self):
397         "test update with a nonexisting name"
398         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
399         updates = []
400
401         name = self.get_dns_domain()
402
403         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
404         updates.append(u)
405         self.finish_name_packet(p, updates)
406
407         prereqs = []
408         r = dns.res_rec()
409         r.name = "idontexist.%s" % self.get_dns_domain()
410         r.rr_type = dns.DNS_QTYPE_TXT
411         r.rr_class = dns.DNS_QCLASS_ANY
412         r.ttl = 0
413         r.length = 0
414         prereqs.append(r)
415
416         p.ancount = len(prereqs)
417         p.answers = prereqs
418
419         (response, response_packet) =\
420             self.dns_transaction_udp(p, host=server_ip)
421         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
422
423     def test_update_add_txt_record(self):
424         "test adding records works"
425         prefix, txt = 'textrec', ['"This is a test"']
426         p = self.make_txt_update(prefix, txt)
427         (response, response_packet) =\
428             self.dns_transaction_udp(p, host=server_ip)
429         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
430         self.check_query_txt(prefix, txt)
431
432     def test_delete_record(self):
433         "Test if deleting records works"
434
435         NAME = "deleterec.%s" % self.get_dns_domain()
436
437         # First, create a record to make sure we have a record to delete.
438         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
439         updates = []
440
441         name = self.get_dns_domain()
442
443         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
444         updates.append(u)
445         self.finish_name_packet(p, updates)
446
447         updates = []
448         r = dns.res_rec()
449         r.name = NAME
450         r.rr_type = dns.DNS_QTYPE_TXT
451         r.rr_class = dns.DNS_QCLASS_IN
452         r.ttl = 900
453         r.length = 0xffff
454         rdata = self.make_txt_record(['"This is a test"'])
455         r.rdata = rdata
456         updates.append(r)
457         p.nscount = len(updates)
458         p.nsrecs = updates
459
460         (response, response_packet) =\
461             self.dns_transaction_udp(p, host=server_ip)
462         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
463
464         # Now check the record is around
465         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
466         questions = []
467         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
468         questions.append(q)
469
470         self.finish_name_packet(p, questions)
471         (response, response_packet) =\
472             self.dns_transaction_udp(p, host=server_ip)
473         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
474
475         # Now delete the record
476         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
477         updates = []
478
479         name = self.get_dns_domain()
480
481         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
482         updates.append(u)
483         self.finish_name_packet(p, updates)
484
485         updates = []
486         r = dns.res_rec()
487         r.name = NAME
488         r.rr_type = dns.DNS_QTYPE_TXT
489         r.rr_class = dns.DNS_QCLASS_NONE
490         r.ttl = 0
491         r.length = 0xffff
492         rdata = self.make_txt_record(['"This is a test"'])
493         r.rdata = rdata
494         updates.append(r)
495         p.nscount = len(updates)
496         p.nsrecs = updates
497
498         (response, response_packet) =\
499             self.dns_transaction_udp(p, host=server_ip)
500         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
501
502         # And finally check it's gone
503         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
504         questions = []
505
506         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
507         questions.append(q)
508
509         self.finish_name_packet(p, questions)
510         (response, response_packet) =\
511             self.dns_transaction_udp(p, host=server_ip)
512         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
513
514     def test_readd_record(self):
515         "Test if adding, deleting and then readding a records works"
516
517         NAME = "readdrec.%s" % self.get_dns_domain()
518
519         # Create the record
520         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
521         updates = []
522
523         name = self.get_dns_domain()
524
525         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
526         updates.append(u)
527         self.finish_name_packet(p, updates)
528
529         updates = []
530         r = dns.res_rec()
531         r.name = NAME
532         r.rr_type = dns.DNS_QTYPE_TXT
533         r.rr_class = dns.DNS_QCLASS_IN
534         r.ttl = 900
535         r.length = 0xffff
536         rdata = self.make_txt_record(['"This is a test"'])
537         r.rdata = rdata
538         updates.append(r)
539         p.nscount = len(updates)
540         p.nsrecs = updates
541
542         (response, response_packet) =\
543             self.dns_transaction_udp(p, host=server_ip)
544         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
545
546         # Now check the record is around
547         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
548         questions = []
549         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
550         questions.append(q)
551
552         self.finish_name_packet(p, questions)
553         (response, response_packet) =\
554             self.dns_transaction_udp(p, host=server_ip)
555         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
556
557         # Now delete the record
558         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
559         updates = []
560
561         name = self.get_dns_domain()
562
563         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
564         updates.append(u)
565         self.finish_name_packet(p, updates)
566
567         updates = []
568         r = dns.res_rec()
569         r.name = NAME
570         r.rr_type = dns.DNS_QTYPE_TXT
571         r.rr_class = dns.DNS_QCLASS_NONE
572         r.ttl = 0
573         r.length = 0xffff
574         rdata = self.make_txt_record(['"This is a test"'])
575         r.rdata = rdata
576         updates.append(r)
577         p.nscount = len(updates)
578         p.nsrecs = updates
579
580         (response, response_packet) =\
581             self.dns_transaction_udp(p, host=server_ip)
582         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
583
584         # check it's gone
585         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
586         questions = []
587
588         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
589         questions.append(q)
590
591         self.finish_name_packet(p, questions)
592         (response, response_packet) =\
593             self.dns_transaction_udp(p, host=server_ip)
594         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
595
596         # recreate the record
597         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
598         updates = []
599
600         name = self.get_dns_domain()
601
602         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
603         updates.append(u)
604         self.finish_name_packet(p, updates)
605
606         updates = []
607         r = dns.res_rec()
608         r.name = NAME
609         r.rr_type = dns.DNS_QTYPE_TXT
610         r.rr_class = dns.DNS_QCLASS_IN
611         r.ttl = 900
612         r.length = 0xffff
613         rdata = self.make_txt_record(['"This is a test"'])
614         r.rdata = rdata
615         updates.append(r)
616         p.nscount = len(updates)
617         p.nsrecs = updates
618
619         (response, response_packet) =\
620             self.dns_transaction_udp(p, host=server_ip)
621         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
622
623         # Now check the record is around
624         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
625         questions = []
626         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
627         questions.append(q)
628
629         self.finish_name_packet(p, questions)
630         (response, response_packet) =\
631             self.dns_transaction_udp(p, host=server_ip)
632         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
633
634     def test_update_add_mx_record(self):
635         "test adding MX records works"
636         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
637         updates = []
638
639         name = self.get_dns_domain()
640
641         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
642         updates.append(u)
643         self.finish_name_packet(p, updates)
644
645         updates = []
646         r = dns.res_rec()
647         r.name = "%s" % self.get_dns_domain()
648         r.rr_type = dns.DNS_QTYPE_MX
649         r.rr_class = dns.DNS_QCLASS_IN
650         r.ttl = 900
651         r.length = 0xffff
652         rdata = dns.mx_record()
653         rdata.preference = 10
654         rdata.exchange = 'mail.%s' % self.get_dns_domain()
655         r.rdata = rdata
656         updates.append(r)
657         p.nscount = len(updates)
658         p.nsrecs = updates
659
660         (response, response_packet) =\
661             self.dns_transaction_udp(p, host=server_ip)
662         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
663
664         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
665         questions = []
666
667         name = "%s" % self.get_dns_domain()
668         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
669         questions.append(q)
670
671         self.finish_name_packet(p, questions)
672         (response, response_packet) =\
673             self.dns_transaction_udp(p, host=server_ip)
674         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
675         self.assertEqual(response.ancount, 1)
676         ans = response.answers[0]
677         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
678         self.assertEqual(ans.rdata.preference, 10)
679         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
680
681
682 class TestComplexQueries(DNSTest):
683     def make_dns_update(self, key, value, qtype):
684         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
685
686         name = self.get_dns_domain()
687         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
688         self.finish_name_packet(p, [u])
689
690         r = dns.res_rec()
691         r.name = key
692         r.rr_type = qtype
693         r.rr_class = dns.DNS_QCLASS_IN
694         r.ttl = 900
695         r.length = 0xffff
696         r.rdata = value
697         p.nscount = 1
698         p.nsrecs = [r]
699         (response, response_packet) =\
700             self.dns_transaction_udp(p, host=server_ip)
701         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
702
703     def setUp(self):
704         super(TestComplexQueries, self).setUp()
705
706         global server, server_ip, lp, creds, timeout
707         self.server = server_name
708         self.server_ip = server_ip
709         self.lp = lp
710         self.creds = creds
711         self.timeout = timeout
712
713     def test_one_a_query(self):
714         "create a query packet containing one query record"
715
716         try:
717
718             # Create the record
719             name = "cname_test.%s" % self.get_dns_domain()
720             rdata = "%s.%s" % (self.server, self.get_dns_domain())
721             self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
722
723             p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
724             questions = []
725
726             # Check the record
727             name = "cname_test.%s" % self.get_dns_domain()
728             q = self.make_name_question(name,
729                                         dns.DNS_QTYPE_A,
730                                         dns.DNS_QCLASS_IN)
731             print("asking for ", q.name)
732             questions.append(q)
733
734             self.finish_name_packet(p, questions)
735             (response, response_packet) =\
736                 self.dns_transaction_udp(p, host=self.server_ip)
737             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
738             self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
739             self.assertEquals(response.ancount, 2)
740             self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
741             self.assertEquals(response.answers[0].rdata, "%s.%s" %
742                               (self.server, self.get_dns_domain()))
743             self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
744             self.assertEquals(response.answers[1].rdata,
745                               self.server_ip)
746
747         finally:
748             # Delete the record
749             p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
750             updates = []
751
752             name = self.get_dns_domain()
753
754             u = self.make_name_question(name,
755                                         dns.DNS_QTYPE_SOA,
756                                         dns.DNS_QCLASS_IN)
757             updates.append(u)
758             self.finish_name_packet(p, updates)
759
760             updates = []
761             r = dns.res_rec()
762             r.name = "cname_test.%s" % self.get_dns_domain()
763             r.rr_type = dns.DNS_QTYPE_CNAME
764             r.rr_class = dns.DNS_QCLASS_NONE
765             r.ttl = 0
766             r.length = 0xffff
767             r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
768             updates.append(r)
769             p.nscount = len(updates)
770             p.nsrecs = updates
771
772             (response, response_packet) =\
773                 self.dns_transaction_udp(p, host=self.server_ip)
774             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
775
776     def test_cname_two_chain(self):
777         name0 = "cnamechain0.%s" % self.get_dns_domain()
778         name1 = "cnamechain1.%s" % self.get_dns_domain()
779         name2 = "cnamechain2.%s" % self.get_dns_domain()
780         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
781         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
782         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
783
784         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
785         questions = []
786         q = self.make_name_question(name1, dns.DNS_QTYPE_A,
787                                     dns.DNS_QCLASS_IN)
788         questions.append(q)
789
790         self.finish_name_packet(p, questions)
791         (response, response_packet) =\
792             self.dns_transaction_udp(p, host=server_ip)
793         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
794         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
795         self.assertEquals(response.ancount, 3)
796
797         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
798         self.assertEquals(response.answers[0].name, name1)
799         self.assertEquals(response.answers[0].rdata, name2)
800
801         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
802         self.assertEquals(response.answers[1].name, name2)
803         self.assertEquals(response.answers[1].rdata, name0)
804
805         self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
806         self.assertEquals(response.answers[2].rdata,
807                           self.server_ip)
808
809     def test_invalid_empty_cname(self):
810         name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
811         try:
812             self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
813         except AssertionError:
814             pass
815         else:
816             self.fail("Successfully added empty CNAME, which is invalid.")
817
818     def test_cname_two_chain_not_matching_qtype(self):
819         name0 = "cnamechain0.%s" % self.get_dns_domain()
820         name1 = "cnamechain1.%s" % self.get_dns_domain()
821         name2 = "cnamechain2.%s" % self.get_dns_domain()
822         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
823         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
824         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
825
826         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
827         questions = []
828         q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
829                                     dns.DNS_QCLASS_IN)
830         questions.append(q)
831
832         self.finish_name_packet(p, questions)
833         (response, response_packet) =\
834             self.dns_transaction_udp(p, host=server_ip)
835         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
836         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
837
838         # CNAME should return all intermediate results!
839         # Only the A records exists, not the TXT.
840         self.assertEquals(response.ancount, 2)
841
842         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
843         self.assertEquals(response.answers[0].name, name1)
844         self.assertEquals(response.answers[0].rdata, name2)
845
846         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
847         self.assertEquals(response.answers[1].name, name2)
848         self.assertEquals(response.answers[1].rdata, name0)
849
850
851 class TestInvalidQueries(DNSTest):
852     def setUp(self):
853         super(TestInvalidQueries, self).setUp()
854         global server, server_ip, lp, creds, timeout
855         self.server = server_name
856         self.server_ip = server_ip
857         self.lp = lp
858         self.creds = creds
859         self.timeout = timeout
860
861     def test_one_a_query(self):
862         """send 0 bytes follows by create a query packet
863            containing one query record"""
864
865         s = None
866         try:
867             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
868             s.connect((self.server_ip, 53))
869             s.send("", 0)
870         finally:
871             if s is not None:
872                 s.close()
873
874         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
875         questions = []
876
877         name = "%s.%s" % (self.server, self.get_dns_domain())
878         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
879         print("asking for ", q.name)
880         questions.append(q)
881
882         self.finish_name_packet(p, questions)
883         (response, response_packet) =\
884             self.dns_transaction_udp(p, host=self.server_ip)
885         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
886         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
887         self.assertEquals(response.ancount, 1)
888         self.assertEquals(response.answers[0].rdata,
889                           self.server_ip)
890
891     def test_one_a_reply(self):
892         "send a reply instead of a query"
893         global timeout
894
895         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
896         questions = []
897
898         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
899         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
900         print("asking for ", q.name)
901         questions.append(q)
902
903         self.finish_name_packet(p, questions)
904         p.operation |= dns.DNS_FLAG_REPLY
905         s = None
906         try:
907             send_packet = ndr.ndr_pack(p)
908             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
909             s.settimeout(timeout)
910             host = self.server_ip
911             s.connect((host, 53))
912             tcp_packet = struct.pack('!H', len(send_packet))
913             tcp_packet += send_packet
914             s.send(tcp_packet, 0)
915             recv_packet = s.recv(0xffff + 2, 0)
916             self.assertEquals(0, len(recv_packet))
917         except socket.timeout:
918             # Windows chooses not to respond to incorrectly formatted queries.
919             # Although this appears to be non-deterministic even for the same
920             # request twice, it also appears to be based on a how poorly the
921             # request is formatted.
922             pass
923         finally:
924             if s is not None:
925                 s.close()
926
927
928 class TestZones(DNSTest):
929     def setUp(self):
930         super(TestZones, self).setUp()
931         global server, server_ip, lp, creds, timeout
932         self.server = server_name
933         self.server_ip = server_ip
934         self.lp = lp
935         self.creds = creds
936         self.timeout = timeout
937
938         self.zone = "test.lan"
939         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
940                                             (self.server_ip),
941                                             self.lp, self.creds)
942
943         self.samdb = SamDB(url="ldap://" + self.server_ip,
944                            lp=self.get_loadparm(),
945                            session_info=system_session(),
946                            credentials=self.creds)
947         self.zone_dn = "DC=" + self.zone +\
948                        ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
949                        str(self.samdb.get_default_basedn())
950
951     def tearDown(self):
952         super(TestZones, self).tearDown()
953
954         try:
955             self.delete_zone(self.zone)
956         except RuntimeError as e:
957             (num, string) = e.args
958             if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
959                 raise
960
961     def create_zone(self, zone, aging_enabled=False):
962         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
963         zone_create.pszZoneName = zone
964         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
965         zone_create.fAging = int(aging_enabled)
966         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
967         zone_create.fDsIntegrated = 1
968         zone_create.fLoadExisting = 1
969         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
970         try:
971             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
972             self.rpc_conn.DnssrvOperation2(client_version,
973                                            0,
974                                            self.server_ip,
975                                            None,
976                                            0,
977                                            'ZoneCreate',
978                                            dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
979                                            zone_create)
980         except WERRORError as e:
981             self.fail(str(e))
982
983     def set_params(self, **kwargs):
984         zone = kwargs.pop('zone', None)
985         for key, val in kwargs.items():
986             name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
987             name_param.dwParam = val
988             name_param.pszNodeName = key
989
990             client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
991             nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
992             try:
993                 self.rpc_conn.DnssrvOperation2(client_version,
994                                                0,
995                                                self.server,
996                                                zone,
997                                                0,
998                                                'ResetDwordProperty',
999                                                nap_type,
1000                                                name_param)
1001             except WERRORError as e:
1002                 self.fail(str(e))
1003
1004     def ldap_modify_dnsrecs(self, name, func):
1005         dn = 'DC={},{}'.format(name, self.zone_dn)
1006         dns_recs = self.ldap_get_dns_records(name)
1007         for rec in dns_recs:
1008             func(rec)
1009         update_dict = {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in dns_recs]}
1010         self.samdb.modify(ldb.Message.from_dict(self.samdb,
1011                                                 update_dict,
1012                                                 ldb.FLAG_MOD_REPLACE))
1013
1014     def dns_update_record(self, prefix, txt):
1015         p = self.make_txt_update(prefix, txt, self.zone)
1016         (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
1017         self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
1018         recs = self.ldap_get_dns_records(prefix)
1019         recs = [r for r in recs if r.data.str == txt]
1020         self.assertEqual(len(recs), 1)
1021         return recs[0]
1022
1023     def dns_tombstone(self, prefix, txt, zone):
1024         name = prefix + "." + zone
1025
1026         to = dnsp.DnssrvRpcRecord()
1027         to.dwTimeStamp = 1000
1028         to.wType = dnsp.DNS_TYPE_TOMBSTONE
1029
1030         self.samdb.dns_replace(name, [to])
1031
1032     def ldap_get_records(self, name):
1033         # The use of SCOPE_SUBTREE here avoids raising an exception in the
1034         # 0 results case for a test below.
1035
1036         expr = "(&(objectClass=dnsNode)(name={}))".format(name)
1037         return self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1038                                  expression=expr, attrs=["*"])
1039
1040     def ldap_get_dns_records(self, name):
1041         records = self.ldap_get_records(name)
1042         return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
1043                 for r in records[0].get('dnsRecord')]
1044
1045     def ldap_get_zone_settings(self):
1046         records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
1047                                     expression="(&(objectClass=dnsZone)" +
1048                                     "(name={}))".format(self.zone),
1049                                     attrs=["dNSProperty"])
1050         self.assertEqual(len(records), 1)
1051         props = [ndr_unpack(dnsp.DnsProperty, r)
1052                  for r in records[0].get('dNSProperty')]
1053
1054         # We have no choice but to repeat these here.
1055         zone_prop_ids = {0x00: "EMPTY",
1056                          0x01: "TYPE",
1057                          0x02: "ALLOW_UPDATE",
1058                          0x08: "SECURE_TIME",
1059                          0x10: "NOREFRESH_INTERVAL",
1060                          0x11: "SCAVENGING_SERVERS",
1061                          0x12: "AGING_ENABLED_TIME",
1062                          0x20: "REFRESH_INTERVAL",
1063                          0x40: "AGING_STATE",
1064                          0x80: "DELETED_FROM_HOSTNAME",
1065                          0x81: "MASTER_SERVERS",
1066                          0x82: "AUTO_NS_SERVERS",
1067                          0x83: "DCPROMO_CONVERT",
1068                          0x90: "SCAVENGING_SERVERS_DA",
1069                          0x91: "MASTER_SERVERS_DA",
1070                          0x92: "NS_SERVERS_DA",
1071                          0x100: "NODE_DBFLAGS"}
1072         return {zone_prop_ids[p.id].lower(): p.data for p in props}
1073
1074     def set_aging(self, enable=False):
1075         self.create_zone(self.zone, aging_enabled=enable)
1076         self.set_params(NoRefreshInterval=1, RefreshInterval=1,
1077                         Aging=int(bool(enable)), zone=self.zone,
1078                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1079
1080     def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
1081         self.set_aging(enable=True)
1082         settings = self.ldap_get_zone_settings()
1083         self.assertTrue(settings['aging_state'] is not None)
1084         self.assertTrue(settings['aging_state'])
1085
1086         rec = self.dns_update_record('agingtest', ['test txt'])
1087         self.assertNotEqual(rec.dwTimeStamp, 0)
1088
1089     def test_set_aging_disabled(self):
1090         self.set_aging(enable=False)
1091         settings = self.ldap_get_zone_settings()
1092         self.assertTrue(settings['aging_state'] is not None)
1093         self.assertFalse(settings['aging_state'])
1094
1095         rec = self.dns_update_record('agingtest', ['test txt'])
1096         self.assertNotEqual(rec.dwTimeStamp, 0)
1097
1098     def test_aging_update(self, enable=True):
1099         name, txt = 'agingtest', ['test txt']
1100         self.set_aging(enable=True)
1101         before_mod = self.dns_update_record(name, txt)
1102         if not enable:
1103             self.set_params(zone=self.zone, Aging=0)
1104         dec = 2
1105
1106         def mod_ts(rec):
1107             self.assertTrue(rec.dwTimeStamp > 0)
1108             rec.dwTimeStamp -= dec
1109         self.ldap_modify_dnsrecs(name, mod_ts)
1110         after_mod = self.ldap_get_dns_records(name)
1111         self.assertEqual(len(after_mod), 1)
1112         after_mod = after_mod[0]
1113         self.assertEqual(after_mod.dwTimeStamp,
1114                          before_mod.dwTimeStamp - dec)
1115         after_update = self.dns_update_record(name, txt)
1116         after_should_equal = before_mod if enable else after_mod
1117         self.assertEqual(after_should_equal.dwTimeStamp,
1118                          after_update.dwTimeStamp)
1119
1120     def test_aging_update_disabled(self):
1121         self.test_aging_update(enable=False)
1122
1123     def test_aging_refresh(self):
1124         name, txt = 'agingtest', ['test txt']
1125         self.create_zone(self.zone, aging_enabled=True)
1126         interval = 10
1127         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1128                         Aging=1, zone=self.zone,
1129                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1130         before_mod = self.dns_update_record(name, txt)
1131
1132         def mod_ts(rec):
1133             self.assertTrue(rec.dwTimeStamp > 0)
1134             rec.dwTimeStamp -= interval / 2
1135         self.ldap_modify_dnsrecs(name, mod_ts)
1136         update_during_norefresh = self.dns_update_record(name, txt)
1137
1138         def mod_ts(rec):
1139             self.assertTrue(rec.dwTimeStamp > 0)
1140             rec.dwTimeStamp -= interval + interval / 2
1141         self.ldap_modify_dnsrecs(name, mod_ts)
1142         update_during_refresh = self.dns_update_record(name, txt)
1143         self.assertEqual(update_during_norefresh.dwTimeStamp,
1144                          before_mod.dwTimeStamp - interval / 2)
1145         self.assertEqual(update_during_refresh.dwTimeStamp,
1146                          before_mod.dwTimeStamp)
1147
1148     def test_rpc_add_no_timestamp(self):
1149         name, txt = 'agingtest', ['test txt']
1150         self.set_aging(enable=True)
1151         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1152         rec_buf.rec = TXTRecord(txt)
1153         self.rpc_conn.DnssrvUpdateRecord2(
1154             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1155             0,
1156             self.server_ip,
1157             self.zone,
1158             name,
1159             rec_buf,
1160             None)
1161         recs = self.ldap_get_dns_records(name)
1162         self.assertEqual(len(recs), 1)
1163         self.assertEqual(recs[0].dwTimeStamp, 0)
1164
1165     def test_static_record_dynamic_update(self):
1166         name, txt = 'agingtest', ['test txt']
1167         txt2 = ['test txt2']
1168         self.set_aging(enable=True)
1169         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1170         rec_buf.rec = TXTRecord(txt)
1171         self.rpc_conn.DnssrvUpdateRecord2(
1172             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1173             0,
1174             self.server_ip,
1175             self.zone,
1176             name,
1177             rec_buf,
1178             None)
1179
1180         rec2 = self.dns_update_record(name, txt2)
1181         self.assertEqual(rec2.dwTimeStamp, 0)
1182
1183     def test_dynamic_record_static_update(self):
1184         name, txt = 'agingtest', ['test txt']
1185         txt2 = ['test txt2']
1186         txt3 = ['test txt3']
1187         self.set_aging(enable=True)
1188
1189         self.dns_update_record(name, txt)
1190
1191         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1192         rec_buf.rec = TXTRecord(txt2)
1193         self.rpc_conn.DnssrvUpdateRecord2(
1194             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1195             0,
1196             self.server_ip,
1197             self.zone,
1198             name,
1199             rec_buf,
1200             None)
1201
1202         self.dns_update_record(name, txt3)
1203
1204         recs = self.ldap_get_dns_records(name)
1205         # Put in dict because ldap recs might be out of order
1206         recs = {str(r.data.str): r for r in recs}
1207         self.assertNotEqual(recs[str(txt)].dwTimeStamp, 0)
1208         self.assertEqual(recs[str(txt2)].dwTimeStamp, 0)
1209         self.assertEqual(recs[str(txt3)].dwTimeStamp, 0)
1210
1211     def test_dns_tombstone_custom_match_rule(self):
1212         lp = self.get_loadparm()
1213         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1214                            session_info=system_session(),
1215                            credentials=self.creds)
1216
1217         name, txt = 'agingtest', ['test txt']
1218         name2, txt2 = 'agingtest2', ['test txt2']
1219         name3, txt3 = 'agingtest3', ['test txt3']
1220         name4, txt4 = 'agingtest4', ['test txt4']
1221         name5, txt5 = 'agingtest5', ['test txt5']
1222
1223         self.create_zone(self.zone, aging_enabled=True)
1224         interval = 10
1225         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1226                         Aging=1, zone=self.zone,
1227                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1228
1229         self.dns_update_record(name, txt)
1230
1231         self.dns_update_record(name2, txt)
1232         self.dns_update_record(name2, txt2)
1233
1234         self.dns_update_record(name3, txt)
1235         self.dns_update_record(name3, txt2)
1236         last_update = self.dns_update_record(name3, txt3)
1237
1238         # Modify txt1 of the first 2 names
1239         def mod_ts(rec):
1240             if rec.data.str == txt:
1241                 rec.dwTimeStamp -= 2
1242         self.ldap_modify_dnsrecs(name, mod_ts)
1243         self.ldap_modify_dnsrecs(name2, mod_ts)
1244
1245         # create a static dns record.
1246         rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1247         rec_buf.rec = TXTRecord(txt4)
1248         self.rpc_conn.DnssrvUpdateRecord2(
1249             dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1250             0,
1251             self.server_ip,
1252             self.zone,
1253             name4,
1254             rec_buf,
1255             None)
1256
1257         # Create a tomb stoned record.
1258         self.dns_update_record(name5, txt5)
1259         self.dns_tombstone(name5, txt5, self.zone)
1260
1261         self.ldap_get_dns_records(name3)
1262         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
1263         expr = expr.format(int(last_update.dwTimeStamp) - 1)
1264         try:
1265             res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1266                                     expression=expr, attrs=["*"])
1267         except ldb.LdbError as e:
1268             self.fail(str(e))
1269         updated_names = {str(r.get('name')) for r in res}
1270         self.assertEqual(updated_names, set([name, name2]))
1271
1272     def test_dns_tombstone_custom_match_rule_no_records(self):
1273         lp = self.get_loadparm()
1274         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1275                            session_info=system_session(),
1276                            credentials=self.creds)
1277
1278         self.create_zone(self.zone, aging_enabled=True)
1279         interval = 10
1280         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1281                         Aging=1, zone=self.zone,
1282                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1283
1284         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
1285         expr = expr.format(1)
1286
1287         try:
1288             res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1289                                     expression=expr, attrs=["*"])
1290         except ldb.LdbError as e:
1291             self.fail(str(e))
1292         self.assertEqual(0, len(res))
1293
1294     def test_dns_tombstone_custom_match_rule_fail(self):
1295         self.create_zone(self.zone, aging_enabled=True)
1296         samdb = SamDB(url=lp.samdb_url(),
1297                       lp=lp,
1298                       session_info=system_session(),
1299                       credentials=self.creds)
1300
1301         # Property name in not dnsRecord
1302         expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
1303         res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1304                            expression=expr, attrs=["*"])
1305         self.assertEquals(len(res), 0)
1306
1307         # No value for tombstone time
1308         try:
1309             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
1310             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1311                                expression=expr, attrs=["*"])
1312             self.assertEquals(len(res), 0)
1313             self.fail("Exception: ldb.ldbError not generated")
1314         except ldb.LdbError as e:
1315             (num, msg) = e.args
1316             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1317
1318         # Tombstone time = -
1319         try:
1320             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
1321             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1322                                expression=expr, attrs=["*"])
1323             self.assertEquals(len(res), 0)
1324             self.fail("Exception: ldb.ldbError not generated")
1325         except ldb.LdbError as e:
1326             (num, _) = e.args
1327             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1328
1329         # Tombstone time longer than 64 characters
1330         try:
1331             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
1332             expr = expr.format("1" * 65)
1333             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1334                                expression=expr, attrs=["*"])
1335             self.assertEquals(len(res), 0)
1336             self.fail("Exception: ldb.ldbError not generated")
1337         except ldb.LdbError as e:
1338             (num, _) = e.args
1339             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1340
1341         # Non numeric Tombstone time
1342         try:
1343             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
1344             res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1345                                expression=expr, attrs=["*"])
1346             self.assertEquals(len(res), 0)
1347             self.fail("Exception: ldb.ldbError not generated")
1348         except ldb.LdbError as e:
1349             (num, _) = e.args
1350             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1351
1352         # Non system session
1353         try:
1354             db = SamDB(url="ldap://" + self.server_ip,
1355                        lp=self.get_loadparm(),
1356                        credentials=self.creds)
1357
1358             expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
1359             res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
1360                             expression=expr, attrs=["*"])
1361             self.assertEquals(len(res), 0)
1362             self.fail("Exception: ldb.ldbError not generated")
1363         except ldb.LdbError as e:
1364             (num, _) = e.args
1365             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1366
1367     def test_basic_scavenging(self):
1368         lp = self.get_loadparm()
1369         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
1370                            session_info=system_session(),
1371                            credentials=self.creds)
1372
1373         self.create_zone(self.zone, aging_enabled=True)
1374         interval = 1
1375         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
1376                         zone=self.zone, Aging=1,
1377                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
1378         name, txt = 'agingtest', ['test txt']
1379         name2, txt2 = 'agingtest2', ['test txt2']
1380         name3, txt3 = 'agingtest3', ['test txt3']
1381         self.dns_update_record(name, txt)
1382         self.dns_update_record(name2, txt)
1383         self.dns_update_record(name2, txt2)
1384         self.dns_update_record(name3, txt)
1385         self.dns_update_record(name3, txt2)
1386         last_add = self.dns_update_record(name3, txt3)
1387
1388         def mod_ts(rec):
1389             self.assertTrue(rec.dwTimeStamp > 0)
1390             if rec.data.str == txt:
1391                 rec.dwTimeStamp -= interval * 5
1392         self.ldap_modify_dnsrecs(name, mod_ts)
1393         self.ldap_modify_dnsrecs(name2, mod_ts)
1394         self.ldap_modify_dnsrecs(name3, mod_ts)
1395         self.assertTrue(callable(getattr(dsdb, '_scavenge_dns_records', None)))
1396         dsdb._scavenge_dns_records(self.samdb)
1397
1398         recs = self.ldap_get_dns_records(name)
1399         self.assertEqual(len(recs), 1)
1400         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
1401
1402         recs = self.ldap_get_dns_records(name2)
1403         self.assertEqual(len(recs), 1)
1404         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1405         self.assertEqual(recs[0].data.str, txt2)
1406
1407         recs = self.ldap_get_dns_records(name3)
1408         self.assertEqual(len(recs), 2)
1409         txts = {str(r.data.str) for r in recs}
1410         self.assertEqual(txts, {str(txt2), str(txt3)})
1411         self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
1412         self.assertEqual(recs[1].wType, dnsp.DNS_TYPE_TXT)
1413
1414         for make_it_work in [False, True]:
1415             inc = -1 if make_it_work else 1
1416
1417             def mod_ts(rec):
1418                 rec.data = (last_add.dwTimeStamp - 24 * 14) + inc
1419             self.ldap_modify_dnsrecs(name, mod_ts)
1420             dsdb._dns_delete_tombstones(self.samdb)
1421             recs = self.ldap_get_records(name)
1422             if make_it_work:
1423                 self.assertEqual(len(recs), 0)
1424             else:
1425                 self.assertEqual(len(recs), 1)
1426
1427     def delete_zone(self, zone):
1428         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1429                                        0,
1430                                        self.server_ip,
1431                                        zone,
1432                                        0,
1433                                        'DeleteZoneFromDs',
1434                                        dnsserver.DNSSRV_TYPEID_NULL,
1435                                        None)
1436
1437     def test_soa_query(self):
1438         zone = "test.lan"
1439         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1440         questions = []
1441
1442         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1443         questions.append(q)
1444         self.finish_name_packet(p, questions)
1445
1446         (response, response_packet) =\
1447             self.dns_transaction_udp(p, host=server_ip)
1448         # Windows returns OK while BIND logically seems to return NXDOMAIN
1449         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1450         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1451         self.assertEquals(response.ancount, 0)
1452
1453         self.create_zone(zone)
1454         (response, response_packet) =\
1455             self.dns_transaction_udp(p, host=server_ip)
1456         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1457         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1458         self.assertEquals(response.ancount, 1)
1459         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1460
1461         self.delete_zone(zone)
1462         (response, response_packet) =\
1463             self.dns_transaction_udp(p, host=server_ip)
1464         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1465         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1466         self.assertEquals(response.ancount, 0)
1467
1468
1469 class TestRPCRoundtrip(DNSTest):
1470     def setUp(self):
1471         super(TestRPCRoundtrip, self).setUp()
1472         global server, server_ip, lp, creds
1473         self.server = server_name
1474         self.server_ip = server_ip
1475         self.lp = lp
1476         self.creds = creds
1477         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %
1478                                             (self.server_ip),
1479                                             self.lp,
1480                                             self.creds)
1481
1482     def tearDown(self):
1483         super(TestRPCRoundtrip, self).tearDown()
1484
1485     def test_update_add_txt_rpc_to_dns(self):
1486         prefix, txt = 'rpctextrec', ['"This is a test"']
1487
1488         name = "%s.%s" % (prefix, self.get_dns_domain())
1489
1490         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1491         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1492         add_rec_buf.rec = rec
1493         try:
1494             self.rpc_conn.DnssrvUpdateRecord2(
1495                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1496                 0,
1497                 self.server_ip,
1498                 self.get_dns_domain(),
1499                 name,
1500                 add_rec_buf,
1501                 None)
1502
1503         except WERRORError as e:
1504             self.fail(str(e))
1505
1506         try:
1507             self.check_query_txt(prefix, txt)
1508         finally:
1509             self.rpc_conn.DnssrvUpdateRecord2(
1510                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1511                 0,
1512                 self.server_ip,
1513                 self.get_dns_domain(),
1514                 name,
1515                 None,
1516                 add_rec_buf)
1517
1518     def test_update_add_null_padded_txt_record(self):
1519         "test adding records works"
1520         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1521         p = self.make_txt_update(prefix, txt)
1522         (response, response_packet) =\
1523             self.dns_transaction_udp(p, host=server_ip)
1524         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1525         self.check_query_txt(prefix, txt)
1526         self.assertIsNotNone(
1527             dns_record_match(self.rpc_conn,
1528                              self.server_ip,
1529                              self.get_dns_domain(),
1530                              "%s.%s" % (prefix, self.get_dns_domain()),
1531                              dnsp.DNS_TYPE_TXT,
1532                              '"\\"This is a test\\"" "" ""'))
1533
1534         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1535         p = self.make_txt_update(prefix, txt)
1536         (response, response_packet) =\
1537             self.dns_transaction_udp(p, host=server_ip)
1538         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1539         self.check_query_txt(prefix, txt)
1540         self.assertIsNotNone(
1541             dns_record_match(
1542                 self.rpc_conn,
1543                 self.server_ip,
1544                 self.get_dns_domain(),
1545                 "%s.%s" % (prefix, self.get_dns_domain()),
1546                 dnsp.DNS_TYPE_TXT,
1547                 '"\\"This is a test\\"" "" "" "more text"'))
1548
1549         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1550         p = self.make_txt_update(prefix, txt)
1551         (response, response_packet) =\
1552             self.dns_transaction_udp(p, host=server_ip)
1553         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1554         self.check_query_txt(prefix, txt)
1555         self.assertIsNotNone(
1556             dns_record_match(
1557                 self.rpc_conn,
1558                 self.server_ip,
1559                 self.get_dns_domain(),
1560                 "%s.%s" % (prefix, self.get_dns_domain()),
1561                 dnsp.DNS_TYPE_TXT,
1562                 '"" "" "\\"This is a test\\""'))
1563
1564     def test_update_add_padding_rpc_to_dns(self):
1565         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1566         prefix = 'rpc' + prefix
1567         name = "%s.%s" % (prefix, self.get_dns_domain())
1568
1569         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1570                                  '"\\"This is a test\\"" "" ""')
1571         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1572         add_rec_buf.rec = rec
1573         try:
1574             self.rpc_conn.DnssrvUpdateRecord2(
1575                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1576                 0,
1577                 self.server_ip,
1578                 self.get_dns_domain(),
1579                 name,
1580                 add_rec_buf,
1581                 None)
1582
1583         except WERRORError as e:
1584             self.fail(str(e))
1585
1586         try:
1587             self.check_query_txt(prefix, txt)
1588         finally:
1589             self.rpc_conn.DnssrvUpdateRecord2(
1590                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1591                 0,
1592                 self.server_ip,
1593                 self.get_dns_domain(),
1594                 name,
1595                 None,
1596                 add_rec_buf)
1597
1598         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1599         prefix = 'rpc' + prefix
1600         name = "%s.%s" % (prefix, self.get_dns_domain())
1601
1602         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1603                                  '"\\"This is a test\\"" "" "" "more text"')
1604         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1605         add_rec_buf.rec = rec
1606         try:
1607             self.rpc_conn.DnssrvUpdateRecord2(
1608                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1609                 0,
1610                 self.server_ip,
1611                 self.get_dns_domain(),
1612                 name,
1613                 add_rec_buf,
1614                 None)
1615
1616         except WERRORError as e:
1617             self.fail(str(e))
1618
1619         try:
1620             self.check_query_txt(prefix, txt)
1621         finally:
1622             self.rpc_conn.DnssrvUpdateRecord2(
1623                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1624                 0,
1625                 self.server_ip,
1626                 self.get_dns_domain(),
1627                 name,
1628                 None,
1629                 add_rec_buf)
1630
1631         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1632         prefix = 'rpc' + prefix
1633         name = "%s.%s" % (prefix, self.get_dns_domain())
1634
1635         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1636                                  '"" "" "\\"This is a test\\""')
1637         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1638         add_rec_buf.rec = rec
1639         try:
1640             self.rpc_conn.DnssrvUpdateRecord2(
1641                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1642                 0,
1643                 self.server_ip,
1644                 self.get_dns_domain(),
1645                 name,
1646                 add_rec_buf,
1647                 None)
1648         except WERRORError as e:
1649             self.fail(str(e))
1650
1651         try:
1652             self.check_query_txt(prefix, txt)
1653         finally:
1654             self.rpc_conn.DnssrvUpdateRecord2(
1655                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1656                 0,
1657                 self.server_ip,
1658                 self.get_dns_domain(),
1659                 name,
1660                 None,
1661                 add_rec_buf)
1662
1663     # Test is incomplete due to strlen against txt records
1664     def test_update_add_null_char_txt_record(self):
1665         "test adding records works"
1666         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1667         p = self.make_txt_update(prefix, txt)
1668         (response, response_packet) =\
1669             self.dns_transaction_udp(p, host=server_ip)
1670         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1671         self.check_query_txt(prefix, ['NULL'])
1672         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1673                                               self.get_dns_domain(),
1674                                               "%s.%s" % (prefix, self.get_dns_domain()),
1675                                               dnsp.DNS_TYPE_TXT, '"NULL"'))
1676
1677         prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1678         p = self.make_txt_update(prefix, txt)
1679         (response, response_packet) =\
1680             self.dns_transaction_udp(p, host=server_ip)
1681         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1682         self.check_query_txt(prefix, ['NULL', 'NULL'])
1683         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1684                                               self.get_dns_domain(),
1685                                               "%s.%s" % (prefix, self.get_dns_domain()),
1686                                               dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1687
1688     def test_update_add_null_char_rpc_to_dns(self):
1689         prefix = 'rpcnulltextrec'
1690         name = "%s.%s" % (prefix, self.get_dns_domain())
1691
1692         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL\x00BYTE"')
1693         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1694         add_rec_buf.rec = rec
1695         try:
1696             self.rpc_conn.DnssrvUpdateRecord2(
1697                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1698                 0,
1699                 self.server_ip,
1700                 self.get_dns_domain(),
1701                 name,
1702                 add_rec_buf,
1703                 None)
1704
1705         except WERRORError as e:
1706             self.fail(str(e))
1707
1708         try:
1709             self.check_query_txt(prefix, ['NULL'])
1710         finally:
1711             self.rpc_conn.DnssrvUpdateRecord2(
1712                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1713                 0,
1714                 self.server_ip,
1715                 self.get_dns_domain(),
1716                 name,
1717                 None,
1718                 add_rec_buf)
1719
1720     def test_update_add_hex_char_txt_record(self):
1721         "test adding records works"
1722         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1723         p = self.make_txt_update(prefix, txt)
1724         (response, response_packet) =\
1725             self.dns_transaction_udp(p, host=server_ip)
1726         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1727         self.check_query_txt(prefix, txt)
1728         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1729                                               self.get_dns_domain(),
1730                                               "%s.%s" % (prefix, self.get_dns_domain()),
1731                                               dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1732
1733     def test_update_add_hex_rpc_to_dns(self):
1734         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1735         prefix = 'rpc' + prefix
1736         name = "%s.%s" % (prefix, self.get_dns_domain())
1737
1738         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1739         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1740         add_rec_buf.rec = rec
1741         try:
1742             self.rpc_conn.DnssrvUpdateRecord2(
1743                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1744                 0,
1745                 self.server_ip,
1746                 self.get_dns_domain(),
1747                 name,
1748                 add_rec_buf,
1749                 None)
1750
1751         except WERRORError as e:
1752             self.fail(str(e))
1753
1754         try:
1755             self.check_query_txt(prefix, txt)
1756         finally:
1757             self.rpc_conn.DnssrvUpdateRecord2(
1758                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1759                 0,
1760                 self.server_ip,
1761                 self.get_dns_domain(),
1762                 name,
1763                 None,
1764                 add_rec_buf)
1765
1766     def test_update_add_slash_txt_record(self):
1767         "test adding records works"
1768         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1769         p = self.make_txt_update(prefix, txt)
1770         (response, response_packet) =\
1771             self.dns_transaction_udp(p, host=server_ip)
1772         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1773         self.check_query_txt(prefix, txt)
1774         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1775                                               self.get_dns_domain(),
1776                                               "%s.%s" % (prefix, self.get_dns_domain()),
1777                                               dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1778
1779     # This test fails against Windows as it eliminates slashes in RPC
1780     # One typical use for a slash is in records like 'var=value' to
1781     # escape '=' characters.
1782     def test_update_add_slash_rpc_to_dns(self):
1783         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1784         prefix = 'rpc' + prefix
1785         name = "%s.%s" % (prefix, self.get_dns_domain())
1786
1787         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1788         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1789         add_rec_buf.rec = rec
1790         try:
1791             self.rpc_conn.DnssrvUpdateRecord2(
1792                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1793                 0,
1794                 self.server_ip,
1795                 self.get_dns_domain(),
1796                 name,
1797                 add_rec_buf,
1798                 None)
1799
1800         except WERRORError as e:
1801             self.fail(str(e))
1802
1803         try:
1804             self.check_query_txt(prefix, txt)
1805
1806         finally:
1807             self.rpc_conn.DnssrvUpdateRecord2(
1808                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1809                 0,
1810                 self.server_ip,
1811                 self.get_dns_domain(),
1812                 name,
1813                 None,
1814                 add_rec_buf)
1815
1816     def test_update_add_two_txt_records(self):
1817         "test adding two txt records works"
1818         prefix, txt = 'textrec2', ['"This is a test"',
1819                                    '"and this is a test, too"']
1820         p = self.make_txt_update(prefix, txt)
1821         (response, response_packet) =\
1822             self.dns_transaction_udp(p, host=server_ip)
1823         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1824         self.check_query_txt(prefix, txt)
1825         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1826                                               self.get_dns_domain(),
1827                                               "%s.%s" % (prefix, self.get_dns_domain()),
1828                                               dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1829                                               ' "\\"and this is a test, too\\""'))
1830
1831     def test_update_add_two_rpc_to_dns(self):
1832         prefix, txt = 'textrec2', ['"This is a test"',
1833                                    '"and this is a test, too"']
1834         prefix = 'rpc' + prefix
1835         name = "%s.%s" % (prefix, self.get_dns_domain())
1836
1837         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1838                                  '"\\"This is a test\\""' +
1839                                  ' "\\"and this is a test, too\\""')
1840         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1841         add_rec_buf.rec = rec
1842         try:
1843             self.rpc_conn.DnssrvUpdateRecord2(
1844                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1845                 0,
1846                 self.server_ip,
1847                 self.get_dns_domain(),
1848                 name,
1849                 add_rec_buf,
1850                 None)
1851
1852         except WERRORError as e:
1853             self.fail(str(e))
1854
1855         try:
1856             self.check_query_txt(prefix, txt)
1857         finally:
1858             self.rpc_conn.DnssrvUpdateRecord2(
1859                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1860                 0,
1861                 self.server_ip,
1862                 self.get_dns_domain(),
1863                 name,
1864                 None,
1865                 add_rec_buf)
1866
1867     def test_update_add_empty_txt_records(self):
1868         "test adding two txt records works"
1869         prefix, txt = 'emptytextrec', []
1870         p = self.make_txt_update(prefix, txt)
1871         (response, response_packet) =\
1872             self.dns_transaction_udp(p, host=server_ip)
1873         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1874         self.check_query_txt(prefix, txt)
1875         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1876                                               self.get_dns_domain(),
1877                                               "%s.%s" % (prefix, self.get_dns_domain()),
1878                                               dnsp.DNS_TYPE_TXT, ''))
1879
1880     def test_update_add_empty_rpc_to_dns(self):
1881         prefix, txt = 'rpcemptytextrec', []
1882
1883         name = "%s.%s" % (prefix, self.get_dns_domain())
1884
1885         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1886         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1887         add_rec_buf.rec = rec
1888         try:
1889             self.rpc_conn.DnssrvUpdateRecord2(
1890                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1891                 0,
1892                 self.server_ip,
1893                 self.get_dns_domain(),
1894                 name,
1895                 add_rec_buf,
1896                 None)
1897         except WERRORError as e:
1898             self.fail(str(e))
1899
1900         try:
1901             self.check_query_txt(prefix, txt)
1902         finally:
1903             self.rpc_conn.DnssrvUpdateRecord2(
1904                 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1905                 0,
1906                 self.server_ip,
1907                 self.get_dns_domain(),
1908                 name,
1909                 None,
1910                 add_rec_buf)
1911
1912 TestProgram(module=__name__, opts=subunitopts)