python/tests: fix typo to use correct var
[nivanova/samba-autobuild/.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import sys
20 import struct
21 import random
22 import socket
23 import samba.ndr as ndr
24 from samba import credentials, param
25 from samba.tests import TestCase
26 from samba.dcerpc import dns, dnsp, dnsserver
27 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
28 from samba.tests.subunitrun import SubunitOptions, TestProgram
29 import samba.getopt as options
30 import optparse
31
32 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
33 sambaopts = options.SambaOptions(parser)
34 parser.add_option_group(sambaopts)
35
36 # This timeout only has relevance when testing against Windows
37 # Format errors tend to return patchy responses, so a timeout is needed.
38 parser.add_option("--timeout", type="int", dest="timeout",
39                   help="Specify timeout for DNS requests")
40
41 # use command line creds if available
42 credopts = options.CredentialsOptions(parser)
43 parser.add_option_group(credopts)
44 subunitopts = SubunitOptions(parser)
45 parser.add_option_group(subunitopts)
46
47 opts, args = parser.parse_args()
48
49 lp = sambaopts.get_loadparm()
50 creds = credopts.get_credentials(lp)
51
52 timeout = opts.timeout
53
54 if len(args) < 2:
55     parser.print_usage()
56     sys.exit(1)
57
58 server_name = args[0]
59 server_ip = args[1]
60 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
61
62 def make_txt_record(records):
63     rdata_txt = dns.txt_record()
64     s_list = dnsp.string_list()
65     s_list.count = len(records)
66     s_list.str = records
67     rdata_txt.txt = s_list
68     return rdata_txt
69
70 class DNSTest(TestCase):
71
72     def setUp(self):
73         global server, server_ip, lp, creds
74         super(DNSTest, self).setUp()
75         self.server = server_name
76         self.server_ip = server_ip
77         self.lp = lp
78         self.creds = creds
79
80     def errstr(self, errcode):
81         "Return a readable error code"
82         string_codes = [
83             "OK",
84             "FORMERR",
85             "SERVFAIL",
86             "NXDOMAIN",
87             "NOTIMP",
88             "REFUSED",
89             "YXDOMAIN",
90             "YXRRSET",
91             "NXRRSET",
92             "NOTAUTH",
93             "NOTZONE",
94         ]
95
96         return string_codes[errcode]
97
98
99     def assert_dns_rcode_equals(self, packet, rcode):
100         "Helper function to check return code"
101         p_errcode = packet.operation & 0x000F
102         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
103                             (self.errstr(rcode), self.errstr(p_errcode)))
104
105     def assert_dns_opcode_equals(self, packet, opcode):
106         "Helper function to check opcode"
107         p_opcode = packet.operation & 0x7800
108         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
109                             (opcode, p_opcode))
110
111     def make_name_packet(self, opcode, qid=None):
112         "Helper creating a dns.name_packet"
113         p = dns.name_packet()
114         if qid is None:
115             p.id = random.randint(0x0, 0xffff)
116         p.operation = opcode
117         p.questions = []
118         return p
119
120     def finish_name_packet(self, packet, questions):
121         "Helper to finalize a dns.name_packet"
122         packet.qdcount = len(questions)
123         packet.questions = questions
124
125     def make_name_question(self, name, qtype, qclass):
126         "Helper creating a dns.name_question"
127         q = dns.name_question()
128         q.name = name
129         q.question_type = qtype
130         q.question_class = qclass
131         return q
132
133     def get_dns_domain(self):
134         "Helper to get dns domain"
135         return self.creds.get_realm().lower()
136
137     def dns_transaction_udp(self, packet, host=server_ip,
138                             dump=False, timeout=timeout):
139         "send a DNS query and read the reply"
140         s = None
141         try:
142             send_packet = ndr.ndr_pack(packet)
143             if dump:
144                 print self.hexdump(send_packet)
145             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
146             s.settimeout(timeout)
147             s.connect((host, 53))
148             s.send(send_packet, 0)
149             recv_packet = s.recv(2048, 0)
150             if dump:
151                 print self.hexdump(recv_packet)
152             return ndr.ndr_unpack(dns.name_packet, recv_packet)
153         finally:
154             if s is not None:
155                 s.close()
156
157     def dns_transaction_tcp(self, packet, host=server_ip,
158                             dump=False, timeout=timeout):
159         "send a DNS query and read the reply"
160         s = None
161         try:
162             send_packet = ndr.ndr_pack(packet)
163             if dump:
164                 print self.hexdump(send_packet)
165             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
166             s.settimeout(timeout)
167             s.connect((host, 53))
168             tcp_packet = struct.pack('!H', len(send_packet))
169             tcp_packet += send_packet
170             s.send(tcp_packet, 0)
171             recv_packet = s.recv(0xffff + 2, 0)
172             if dump:
173                 print self.hexdump(recv_packet)
174             return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
175         finally:
176                 if s is not None:
177                     s.close()
178
179     def make_txt_update(self, prefix, txt_array):
180         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
181         updates = []
182
183         name = self.get_dns_domain()
184         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
185         updates.append(u)
186         self.finish_name_packet(p, updates)
187
188         updates = []
189         r = dns.res_rec()
190         r.name = "%s.%s" % (prefix, self.get_dns_domain())
191         r.rr_type = dns.DNS_QTYPE_TXT
192         r.rr_class = dns.DNS_QCLASS_IN
193         r.ttl = 900
194         r.length = 0xffff
195         rdata = make_txt_record(txt_array)
196         r.rdata = rdata
197         updates.append(r)
198         p.nscount = len(updates)
199         p.nsrecs = updates
200
201         return p
202
203     def check_query_txt(self, prefix, txt_array):
204         name = "%s.%s" % (prefix, self.get_dns_domain())
205         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
206         questions = []
207
208         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
209         questions.append(q)
210
211         self.finish_name_packet(p, questions)
212         response = self.dns_transaction_udp(p)
213         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
214         self.assertEquals(response.ancount, 1)
215         self.assertEquals(response.answers[0].rdata.txt.str, txt_array)
216
217 class TestSimpleQueries(DNSTest):
218
219     def test_one_a_query(self):
220         "create a query packet containing one query record"
221         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
222         questions = []
223
224         name = "%s.%s" % (self.server, self.get_dns_domain())
225         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
226         print "asking for ", q.name
227         questions.append(q)
228
229         self.finish_name_packet(p, questions)
230         response = self.dns_transaction_udp(p)
231         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
232         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
233         self.assertEquals(response.ancount, 1)
234         self.assertEquals(response.answers[0].rdata,
235                           self.server_ip)
236
237     def test_one_a_query_tcp(self):
238         "create a query packet containing one query record via TCP"
239         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
240         questions = []
241
242         name = "%s.%s" % (self.server, self.get_dns_domain())
243         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
244         print "asking for ", q.name
245         questions.append(q)
246
247         self.finish_name_packet(p, questions)
248         response = self.dns_transaction_tcp(p)
249         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
250         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
251         self.assertEquals(response.ancount, 1)
252         self.assertEquals(response.answers[0].rdata,
253                           self.server_ip)
254
255     def test_one_mx_query(self):
256         "create a query packet causing an empty RCODE_OK answer"
257         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
258         questions = []
259
260         name = "%s.%s" % (self.server, self.get_dns_domain())
261         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
262         print "asking for ", q.name
263         questions.append(q)
264
265         self.finish_name_packet(p, questions)
266         response = self.dns_transaction_udp(p)
267         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
268         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
269         self.assertEquals(response.ancount, 0)
270
271         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
272         questions = []
273
274         name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
275         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
276         print "asking for ", q.name
277         questions.append(q)
278
279         self.finish_name_packet(p, questions)
280         response = self.dns_transaction_udp(p)
281         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
282         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
283         self.assertEquals(response.ancount, 0)
284
285     def test_two_queries(self):
286         "create a query packet containing two query records"
287         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
288         questions = []
289
290         name = "%s.%s" % (self.server, self.get_dns_domain())
291         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
292         questions.append(q)
293
294         name = "%s.%s" % ('bogusname', self.get_dns_domain())
295         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
296         questions.append(q)
297
298         self.finish_name_packet(p, questions)
299         try:
300             response = self.dns_transaction_udp(p)
301             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
302         except socket.timeout:
303             # Windows chooses not to respond to incorrectly formatted queries.
304             # Although this appears to be non-deterministic even for the same
305             # request twice, it also appears to be based on a how poorly the
306             # request is formatted.
307             pass
308
309     def test_qtype_all_query(self):
310         "create a QTYPE_ALL query"
311         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
312         questions = []
313
314         name = "%s.%s" % (self.server, self.get_dns_domain())
315         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
316         print "asking for ", q.name
317         questions.append(q)
318
319         self.finish_name_packet(p, questions)
320         response = self.dns_transaction_udp(p)
321
322         num_answers = 1
323         dc_ipv6 = os.getenv('SERVER_IPV6')
324         if dc_ipv6 is not None:
325             num_answers += 1
326
327         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
328         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
329         self.assertEquals(response.ancount, num_answers)
330         self.assertEquals(response.answers[0].rdata,
331                           self.server_ip)
332         if dc_ipv6 is not None:
333             self.assertEquals(response.answers[1].rdata, dc_ipv6)
334
335     def test_qclass_none_query(self):
336         "create a QCLASS_NONE query"
337         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
338         questions = []
339
340         name = "%s.%s" % (self.server, self.get_dns_domain())
341         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
342         questions.append(q)
343
344         self.finish_name_packet(p, questions)
345         try:
346             response = self.dns_transaction_udp(p)
347             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
348         except socket.timeout:
349             # Windows chooses not to respond to incorrectly formatted queries.
350             # Although this appears to be non-deterministic even for the same
351             # request twice, it also appears to be based on a how poorly the
352             # request is formatted.
353             pass
354
355     def test_soa_hostname_query(self):
356         "create a SOA query for a hostname"
357         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
358         questions = []
359
360         name = "%s.%s" % (self.server, self.get_dns_domain())
361         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
362         questions.append(q)
363
364         self.finish_name_packet(p, questions)
365         response = self.dns_transaction_udp(p)
366         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
367         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
368         # We don't get SOA records for single hosts
369         self.assertEquals(response.ancount, 0)
370         # But we do respond with an authority section
371         self.assertEqual(response.nscount, 1)
372
373     def test_soa_domain_query(self):
374         "create a SOA query for a domain"
375         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
376         questions = []
377
378         name = self.get_dns_domain()
379         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
380         questions.append(q)
381
382         self.finish_name_packet(p, questions)
383         response = self.dns_transaction_udp(p)
384         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
385         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
386         self.assertEquals(response.ancount, 1)
387         self.assertEquals(response.answers[0].rdata.minimum, 3600)
388
389
390 class TestDNSUpdates(DNSTest):
391
392     def test_two_updates(self):
393         "create two update requests"
394         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
395         updates = []
396
397         name = "%s.%s" % (self.server, self.get_dns_domain())
398         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
399         updates.append(u)
400
401         name = self.get_dns_domain()
402         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
403         updates.append(u)
404
405         self.finish_name_packet(p, updates)
406         try:
407             response = self.dns_transaction_udp(p)
408             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
409         except socket.timeout:
410             # Windows chooses not to respond to incorrectly formatted queries.
411             # Although this appears to be non-deterministic even for the same
412             # request twice, it also appears to be based on a how poorly the
413             # request is formatted.
414             pass
415
416     def test_update_wrong_qclass(self):
417         "create update with DNS_QCLASS_NONE"
418         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
419         updates = []
420
421         name = self.get_dns_domain()
422         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
423         updates.append(u)
424
425         self.finish_name_packet(p, updates)
426         response = self.dns_transaction_udp(p)
427         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
428
429     def test_update_prereq_with_non_null_ttl(self):
430         "test update with a non-null TTL"
431         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
432         updates = []
433
434         name = self.get_dns_domain()
435
436         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
437         updates.append(u)
438         self.finish_name_packet(p, updates)
439
440         prereqs = []
441         r = dns.res_rec()
442         r.name = "%s.%s" % (self.server, self.get_dns_domain())
443         r.rr_type = dns.DNS_QTYPE_TXT
444         r.rr_class = dns.DNS_QCLASS_NONE
445         r.ttl = 1
446         r.length = 0
447         prereqs.append(r)
448
449         p.ancount = len(prereqs)
450         p.answers = prereqs
451
452         try:
453             response = self.dns_transaction_udp(p)
454             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
455         except socket.timeout:
456             # Windows chooses not to respond to incorrectly formatted queries.
457             # Although this appears to be non-deterministic even for the same
458             # request twice, it also appears to be based on a how poorly the
459             # request is formatted.
460             pass
461
462     def test_update_prereq_with_non_null_length(self):
463         "test update with a non-null length"
464         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
465         updates = []
466
467         name = self.get_dns_domain()
468
469         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
470         updates.append(u)
471         self.finish_name_packet(p, updates)
472
473         prereqs = []
474         r = dns.res_rec()
475         r.name = "%s.%s" % (self.server, self.get_dns_domain())
476         r.rr_type = dns.DNS_QTYPE_TXT
477         r.rr_class = dns.DNS_QCLASS_ANY
478         r.ttl = 0
479         r.length = 1
480         prereqs.append(r)
481
482         p.ancount = len(prereqs)
483         p.answers = prereqs
484
485         response = self.dns_transaction_udp(p)
486         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
487
488     def test_update_prereq_nonexisting_name(self):
489         "test update with a nonexisting name"
490         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
491         updates = []
492
493         name = self.get_dns_domain()
494
495         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
496         updates.append(u)
497         self.finish_name_packet(p, updates)
498
499         prereqs = []
500         r = dns.res_rec()
501         r.name = "idontexist.%s" % self.get_dns_domain()
502         r.rr_type = dns.DNS_QTYPE_TXT
503         r.rr_class = dns.DNS_QCLASS_ANY
504         r.ttl = 0
505         r.length = 0
506         prereqs.append(r)
507
508         p.ancount = len(prereqs)
509         p.answers = prereqs
510
511         response = self.dns_transaction_udp(p)
512         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
513
514     def test_update_add_txt_record(self):
515         "test adding records works"
516         prefix, txt = 'textrec', ['"This is a test"']
517         p = self.make_txt_update(prefix, txt)
518         response = self.dns_transaction_udp(p)
519         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
520         self.check_query_txt(prefix, txt)
521
522     def test_delete_record(self):
523         "Test if deleting records works"
524
525         NAME = "deleterec.%s" % self.get_dns_domain()
526
527         # First, create a record to make sure we have a record to delete.
528         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
529         updates = []
530
531         name = self.get_dns_domain()
532
533         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
534         updates.append(u)
535         self.finish_name_packet(p, updates)
536
537         updates = []
538         r = dns.res_rec()
539         r.name = NAME
540         r.rr_type = dns.DNS_QTYPE_TXT
541         r.rr_class = dns.DNS_QCLASS_IN
542         r.ttl = 900
543         r.length = 0xffff
544         rdata = make_txt_record(['"This is a test"'])
545         r.rdata = rdata
546         updates.append(r)
547         p.nscount = len(updates)
548         p.nsrecs = updates
549
550         response = self.dns_transaction_udp(p)
551         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
552
553         # Now check the record is around
554         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
555         questions = []
556         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
557         questions.append(q)
558
559         self.finish_name_packet(p, questions)
560         response = self.dns_transaction_udp(p)
561         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
562
563         # Now delete the record
564         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
565         updates = []
566
567         name = self.get_dns_domain()
568
569         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
570         updates.append(u)
571         self.finish_name_packet(p, updates)
572
573         updates = []
574         r = dns.res_rec()
575         r.name = NAME
576         r.rr_type = dns.DNS_QTYPE_TXT
577         r.rr_class = dns.DNS_QCLASS_NONE
578         r.ttl = 0
579         r.length = 0xffff
580         rdata = make_txt_record(['"This is a test"'])
581         r.rdata = rdata
582         updates.append(r)
583         p.nscount = len(updates)
584         p.nsrecs = updates
585
586         response = self.dns_transaction_udp(p)
587         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
588
589         # And finally check it's gone
590         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
591         questions = []
592
593         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
594         questions.append(q)
595
596         self.finish_name_packet(p, questions)
597         response = self.dns_transaction_udp(p)
598         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
599
600     def test_readd_record(self):
601         "Test if adding, deleting and then readding a records works"
602
603         NAME = "readdrec.%s" % self.get_dns_domain()
604
605         # Create the record
606         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
607         updates = []
608
609         name = self.get_dns_domain()
610
611         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
612         updates.append(u)
613         self.finish_name_packet(p, updates)
614
615         updates = []
616         r = dns.res_rec()
617         r.name = NAME
618         r.rr_type = dns.DNS_QTYPE_TXT
619         r.rr_class = dns.DNS_QCLASS_IN
620         r.ttl = 900
621         r.length = 0xffff
622         rdata = make_txt_record(['"This is a test"'])
623         r.rdata = rdata
624         updates.append(r)
625         p.nscount = len(updates)
626         p.nsrecs = updates
627
628         response = self.dns_transaction_udp(p)
629         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
630
631         # Now check the record is around
632         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
633         questions = []
634         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
635         questions.append(q)
636
637         self.finish_name_packet(p, questions)
638         response = self.dns_transaction_udp(p)
639         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
640
641         # Now delete the record
642         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
643         updates = []
644
645         name = self.get_dns_domain()
646
647         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
648         updates.append(u)
649         self.finish_name_packet(p, updates)
650
651         updates = []
652         r = dns.res_rec()
653         r.name = NAME
654         r.rr_type = dns.DNS_QTYPE_TXT
655         r.rr_class = dns.DNS_QCLASS_NONE
656         r.ttl = 0
657         r.length = 0xffff
658         rdata = make_txt_record(['"This is a test"'])
659         r.rdata = rdata
660         updates.append(r)
661         p.nscount = len(updates)
662         p.nsrecs = updates
663
664         response = self.dns_transaction_udp(p)
665         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
666
667         # check it's gone
668         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
669         questions = []
670
671         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
672         questions.append(q)
673
674         self.finish_name_packet(p, questions)
675         response = self.dns_transaction_udp(p)
676         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
677
678         # recreate the record
679         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
680         updates = []
681
682         name = self.get_dns_domain()
683
684         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
685         updates.append(u)
686         self.finish_name_packet(p, updates)
687
688         updates = []
689         r = dns.res_rec()
690         r.name = NAME
691         r.rr_type = dns.DNS_QTYPE_TXT
692         r.rr_class = dns.DNS_QCLASS_IN
693         r.ttl = 900
694         r.length = 0xffff
695         rdata = make_txt_record(['"This is a test"'])
696         r.rdata = rdata
697         updates.append(r)
698         p.nscount = len(updates)
699         p.nsrecs = updates
700
701         response = self.dns_transaction_udp(p)
702         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
703
704         # Now check the record is around
705         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
706         questions = []
707         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
708         questions.append(q)
709
710         self.finish_name_packet(p, questions)
711         response = self.dns_transaction_udp(p)
712         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
713
714     def test_update_add_mx_record(self):
715         "test adding MX records works"
716         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
717         updates = []
718
719         name = self.get_dns_domain()
720
721         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
722         updates.append(u)
723         self.finish_name_packet(p, updates)
724
725         updates = []
726         r = dns.res_rec()
727         r.name = "%s" % self.get_dns_domain()
728         r.rr_type = dns.DNS_QTYPE_MX
729         r.rr_class = dns.DNS_QCLASS_IN
730         r.ttl = 900
731         r.length = 0xffff
732         rdata = dns.mx_record()
733         rdata.preference = 10
734         rdata.exchange = 'mail.%s' % self.get_dns_domain()
735         r.rdata = rdata
736         updates.append(r)
737         p.nscount = len(updates)
738         p.nsrecs = updates
739
740         response = self.dns_transaction_udp(p)
741         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
742
743         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
744         questions = []
745
746         name = "%s" % self.get_dns_domain()
747         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
748         questions.append(q)
749
750         self.finish_name_packet(p, questions)
751         response = self.dns_transaction_udp(p)
752         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
753         self.assertEqual(response.ancount, 1)
754         ans = response.answers[0]
755         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
756         self.assertEqual(ans.rdata.preference, 10)
757         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
758
759
760 class TestComplexQueries(DNSTest):
761     def make_dns_update(self, key, value, qtype):
762         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
763
764         name = self.get_dns_domain()
765         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
766         self.finish_name_packet(p, [u])
767
768         r = dns.res_rec()
769         r.name = key
770         r.rr_type = qtype
771         r.rr_class = dns.DNS_QCLASS_IN
772         r.ttl = 900
773         r.length = 0xffff
774         rdata = value
775         r.rdata = rdata
776         updates = [r]
777         p.nscount = 1
778         p.nsrecs = updates
779         response = self.dns_transaction_udp(p)
780         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
781
782     def setUp(self):
783         super(TestComplexQueries, self).setUp()
784         name = "cname_test.%s" % self.get_dns_domain()
785         rdata = "%s.%s" % (self.server, self.get_dns_domain())
786         self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
787
788     def tearDown(self):
789         super(TestComplexQueries, self).tearDown()
790         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
791         updates = []
792
793         name = self.get_dns_domain()
794
795         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
796         updates.append(u)
797         self.finish_name_packet(p, updates)
798
799         updates = []
800         r = dns.res_rec()
801         r.name = "cname_test.%s" % self.get_dns_domain()
802         r.rr_type = dns.DNS_QTYPE_CNAME
803         r.rr_class = dns.DNS_QCLASS_NONE
804         r.ttl = 0
805         r.length = 0xffff
806         r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
807         updates.append(r)
808         p.nscount = len(updates)
809         p.nsrecs = updates
810
811         response = self.dns_transaction_udp(p)
812         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
813
814     def test_one_a_query(self):
815         "create a query packet containing one query record"
816         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
817         questions = []
818
819         name = "cname_test.%s" % self.get_dns_domain()
820         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
821         print "asking for ", q.name
822         questions.append(q)
823
824         self.finish_name_packet(p, questions)
825         response = self.dns_transaction_udp(p)
826         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
827         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
828         self.assertEquals(response.ancount, 2)
829         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
830         self.assertEquals(response.answers[0].rdata, "%s.%s" %
831                           (self.server, self.get_dns_domain()))
832         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
833         self.assertEquals(response.answers[1].rdata,
834                           self.server_ip)
835
836     def test_cname_two_chain(self):
837         name0 = "cnamechain0.%s" % self.get_dns_domain()
838         name1 = "cnamechain1.%s" % self.get_dns_domain()
839         name2 = "cnamechain2.%s" % self.get_dns_domain()
840         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
841         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
842         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
843
844         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
845         questions = []
846         q = self.make_name_question(name1, dns.DNS_QTYPE_A,
847                                     dns.DNS_QCLASS_IN)
848         questions.append(q)
849
850         self.finish_name_packet(p, questions)
851         response = self.dns_transaction_udp(p)
852         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
853         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
854         self.assertEquals(response.ancount, 3)
855
856         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
857         self.assertEquals(response.answers[0].name, name1)
858         self.assertEquals(response.answers[0].rdata, name2)
859
860         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
861         self.assertEquals(response.answers[1].name, name2)
862         self.assertEquals(response.answers[1].rdata, name0)
863
864         self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
865         self.assertEquals(response.answers[2].rdata,
866                           self.server_ip)
867
868     def test_cname_two_chain_not_matching_qtype(self):
869         name0 = "cnamechain0.%s" % self.get_dns_domain()
870         name1 = "cnamechain1.%s" % self.get_dns_domain()
871         name2 = "cnamechain2.%s" % self.get_dns_domain()
872         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
873         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
874         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
875
876         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
877         questions = []
878         q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
879                                     dns.DNS_QCLASS_IN)
880         questions.append(q)
881
882         self.finish_name_packet(p, questions)
883         response = self.dns_transaction_udp(p)
884         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
885         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
886
887         # CNAME should return all intermediate results!
888         # Only the A records exists, not the TXT.
889         self.assertEquals(response.ancount, 2)
890
891         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
892         self.assertEquals(response.answers[0].name, name1)
893         self.assertEquals(response.answers[0].rdata, name2)
894
895         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
896         self.assertEquals(response.answers[1].name, name2)
897         self.assertEquals(response.answers[1].rdata, name0)
898
899 class TestInvalidQueries(DNSTest):
900
901     def test_one_a_query(self):
902         "send 0 bytes follows by create a query packet containing one query record"
903
904         s = None
905         try:
906             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
907             s.connect((self.server_ip, 53))
908             s.send("", 0)
909         finally:
910             if s is not None:
911                 s.close()
912
913         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
914         questions = []
915
916         name = "%s.%s" % (self.server, self.get_dns_domain())
917         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
918         print "asking for ", q.name
919         questions.append(q)
920
921         self.finish_name_packet(p, questions)
922         response = self.dns_transaction_udp(p)
923         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
924         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
925         self.assertEquals(response.ancount, 1)
926         self.assertEquals(response.answers[0].rdata,
927                           self.server_ip)
928
929     def test_one_a_reply(self):
930         "send a reply instead of a query"
931         global timeout
932
933         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
934         questions = []
935
936         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
937         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
938         print "asking for ", q.name
939         questions.append(q)
940
941         self.finish_name_packet(p, questions)
942         p.operation |= dns.DNS_FLAG_REPLY
943         s = None
944         try:
945             send_packet = ndr.ndr_pack(p)
946             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
947             s.settimeout(timeout)
948             host=self.server_ip
949             s.connect((host, 53))
950             tcp_packet = struct.pack('!H', len(send_packet))
951             tcp_packet += send_packet
952             s.send(tcp_packet, 0)
953             recv_packet = s.recv(0xffff + 2, 0)
954             self.assertEquals(0, len(recv_packet))
955         except socket.timeout:
956             # Windows chooses not to respond to incorrectly formatted queries.
957             # Although this appears to be non-deterministic even for the same
958             # request twice, it also appears to be based on a how poorly the
959             # request is formatted.
960             pass
961         finally:
962             if s is not None:
963                 s.close()
964
965 class TestZones(DNSTest):
966     def setUp(self):
967         super(TestZones, self).setUp()
968         self.zone = "test.lan"
969         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
970                                             self.lp, self.creds)
971
972     def tearDown(self):
973         super(TestZones, self).tearDown()
974         try:
975             self.delete_zone(self.zone)
976         except RuntimeError, (num, string):
977             if num != 9601: #WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
978                 raise
979
980     def create_zone(self, zone):
981         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
982         zone_create.pszZoneName = zone
983         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
984         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
985         zone_create.fAging = 0
986         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
987         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
988                                        0,
989                                        self.server_ip,
990                                        None,
991                                        0,
992                                        'ZoneCreate',
993                                        dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
994                                        zone_create)
995
996     def delete_zone(self, zone):
997         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
998                                        0,
999                                        self.server_ip,
1000                                        zone,
1001                                        0,
1002                                        'DeleteZoneFromDs',
1003                                        dnsserver.DNSSRV_TYPEID_NULL,
1004                                        None)
1005
1006     def test_soa_query(self):
1007         zone = "test.lan"
1008         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
1009         questions = []
1010
1011         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
1012         questions.append(q)
1013         self.finish_name_packet(p, questions)
1014
1015         response = self.dns_transaction_udp(p)
1016         # Windows returns OK while BIND logically seems to return NXDOMAIN
1017         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1018         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1019         self.assertEquals(response.ancount, 0)
1020
1021         self.create_zone(zone)
1022         response = self.dns_transaction_udp(p)
1023         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1024         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1025         self.assertEquals(response.ancount, 1)
1026         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
1027
1028         self.delete_zone(zone)
1029         response = self.dns_transaction_udp(p)
1030         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
1031         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
1032         self.assertEquals(response.ancount, 0)
1033
1034 class TestRPCRoundtrip(DNSTest):
1035     def setUp(self):
1036         super(TestRPCRoundtrip, self).setUp()
1037         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
1038                                             self.lp, self.creds)
1039
1040     def tearDown(self):
1041         super(TestRPCRoundtrip, self).tearDown()
1042
1043     def test_update_add_txt_rpc_to_dns(self):
1044         prefix, txt = 'rpctextrec', ['"This is a test"']
1045
1046         name = "%s.%s" % (prefix, self.get_dns_domain())
1047
1048         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
1049         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1050         add_rec_buf.rec = rec
1051         try:
1052             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1053                                      0, self.server_ip, self.get_dns_domain(),
1054                                      name, add_rec_buf, None)
1055
1056             self.check_query_txt(prefix, txt)
1057         finally:
1058             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1059                                               0, self.server_ip, self.get_dns_domain(),
1060                                               name, None, add_rec_buf)
1061
1062     def test_update_add_null_padded_txt_record(self):
1063         "test adding records works"
1064         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1065         p = self.make_txt_update(prefix, txt)
1066         response = self.dns_transaction_udp(p)
1067         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1068         self.check_query_txt(prefix, txt)
1069         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1070                              self.get_dns_domain(),
1071                              "%s.%s" % (prefix, self.get_dns_domain()),
1072                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
1073
1074         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1075         p = self.make_txt_update(prefix, txt)
1076         response = self.dns_transaction_udp(p)
1077         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1078         self.check_query_txt(prefix, txt)
1079         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1080                              self.get_dns_domain(),
1081                              "%s.%s" % (prefix, self.get_dns_domain()),
1082                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
1083
1084         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1085         p = self.make_txt_update(prefix, txt)
1086         response = self.dns_transaction_udp(p)
1087         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1088         self.check_query_txt(prefix, txt)
1089         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1090                              self.get_dns_domain(),
1091                              "%s.%s" % (prefix, self.get_dns_domain()),
1092                              dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
1093
1094     def test_update_add_padding_rpc_to_dns(self):
1095         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1096         prefix = 'rpc' + prefix
1097         name = "%s.%s" % (prefix, self.get_dns_domain())
1098
1099         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
1100         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1101         add_rec_buf.rec = rec
1102         try:
1103             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1104                                      0, self.server_ip, self.get_dns_domain(),
1105                                      name, add_rec_buf, None)
1106
1107             self.check_query_txt(prefix, txt)
1108         finally:
1109             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1110                                               0, self.server_ip, self.get_dns_domain(),
1111                                               name, None, add_rec_buf)
1112
1113         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1114         prefix = 'rpc' + prefix
1115         name = "%s.%s" % (prefix, self.get_dns_domain())
1116
1117         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
1118         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1119         add_rec_buf.rec = rec
1120         try:
1121             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1122                                      0, self.server_ip, self.get_dns_domain(),
1123                                      name, add_rec_buf, None)
1124
1125             self.check_query_txt(prefix, txt)
1126         finally:
1127             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1128                                               0, self.server_ip, self.get_dns_domain(),
1129                                               name, None, add_rec_buf)
1130
1131         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1132         prefix = 'rpc' + prefix
1133         name = "%s.%s" % (prefix, self.get_dns_domain())
1134
1135         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
1136         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1137         add_rec_buf.rec = rec
1138         try:
1139             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1140                                      0, self.server_ip, self.get_dns_domain(),
1141                                      name, add_rec_buf, None)
1142
1143             self.check_query_txt(prefix, txt)
1144         finally:
1145             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1146                                               0, self.server_ip, self.get_dns_domain(),
1147                                               name, None, add_rec_buf)
1148
1149     # Test is incomplete due to strlen against txt records
1150     def test_update_add_null_char_txt_record(self):
1151         "test adding records works"
1152         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1153         p = self.make_txt_update(prefix, txt)
1154         response = self.dns_transaction_udp(p)
1155         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1156         self.check_query_txt(prefix, ['NULL'])
1157         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1158                              self.get_dns_domain(),
1159                              "%s.%s" % (prefix, self.get_dns_domain()),
1160                              dnsp.DNS_TYPE_TXT, '"NULL"'))
1161
1162         prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1163         p = self.make_txt_update(prefix, txt)
1164         response = self.dns_transaction_udp(p)
1165         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1166         self.check_query_txt(prefix, ['NULL', 'NULL'])
1167         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1168                              self.get_dns_domain(),
1169                              "%s.%s" % (prefix, self.get_dns_domain()),
1170                              dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1171
1172     def test_update_add_null_char_rpc_to_dns(self):
1173         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1174         prefix = 'rpc' + prefix
1175         name = "%s.%s" % (prefix, self.get_dns_domain())
1176
1177         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
1178         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1179         add_rec_buf.rec = rec
1180         try:
1181             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1182                                      0, self.server_ip, self.get_dns_domain(),
1183                                      name, add_rec_buf, None)
1184
1185             self.check_query_txt(prefix, ['NULL'])
1186         finally:
1187             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1188                                               0, self.server_ip, self.get_dns_domain(),
1189                                               name, None, add_rec_buf)
1190
1191     def test_update_add_hex_char_txt_record(self):
1192         "test adding records works"
1193         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1194         p = self.make_txt_update(prefix, txt)
1195         response = self.dns_transaction_udp(p)
1196         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1197         self.check_query_txt(prefix, txt)
1198         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1199                              self.get_dns_domain(),
1200                              "%s.%s" % (prefix, self.get_dns_domain()),
1201                              dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1202
1203     def test_update_add_hex_rpc_to_dns(self):
1204         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1205         prefix = 'rpc' + prefix
1206         name = "%s.%s" % (prefix, self.get_dns_domain())
1207
1208         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1209         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1210         add_rec_buf.rec = rec
1211         try:
1212             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1213                                      0, self.server_ip, self.get_dns_domain(),
1214                                      name, add_rec_buf, None)
1215
1216             self.check_query_txt(prefix, txt)
1217         finally:
1218             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1219                                               0, self.server_ip, self.get_dns_domain(),
1220                                               name, None, add_rec_buf)
1221
1222     def test_update_add_slash_txt_record(self):
1223         "test adding records works"
1224         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1225         p = self.make_txt_update(prefix, txt)
1226         response = self.dns_transaction_udp(p)
1227         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1228         self.check_query_txt(prefix, txt)
1229         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1230                              self.get_dns_domain(),
1231                              "%s.%s" % (prefix, self.get_dns_domain()),
1232                              dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1233
1234     # This test fails against Windows as it eliminates slashes in RPC
1235     # One typical use for a slash is in records like 'var=value' to
1236     # escape '=' characters.
1237     def test_update_add_slash_rpc_to_dns(self):
1238         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1239         prefix = 'rpc' + prefix
1240         name = "%s.%s" % (prefix, self.get_dns_domain())
1241
1242         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1243         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1244         add_rec_buf.rec = rec
1245         try:
1246             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1247                                      0, self.server_ip, self.get_dns_domain(),
1248                                      name, add_rec_buf, None)
1249
1250             self.check_query_txt(prefix, txt)
1251         finally:
1252             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1253                                               0, self.server_ip, self.get_dns_domain(),
1254                                               name, None, add_rec_buf)
1255
1256     def test_update_add_two_txt_records(self):
1257         "test adding two txt records works"
1258         prefix, txt = 'textrec2', ['"This is a test"',
1259                                    '"and this is a test, too"']
1260         p = self.make_txt_update(prefix, txt)
1261         response = self.dns_transaction_udp(p)
1262         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1263         self.check_query_txt(prefix, txt)
1264         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1265                              self.get_dns_domain(),
1266                              "%s.%s" % (prefix, self.get_dns_domain()),
1267                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1268                              ' "\\"and this is a test, too\\""'))
1269
1270     def test_update_add_two_rpc_to_dns(self):
1271         prefix, txt = 'textrec2', ['"This is a test"',
1272                                    '"and this is a test, too"']
1273         prefix = 'rpc' + prefix
1274         name = "%s.%s" % (prefix, self.get_dns_domain())
1275
1276         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1277                                 '"\\"This is a test\\""' +
1278                                 ' "\\"and this is a test, too\\""')
1279         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1280         add_rec_buf.rec = rec
1281         try:
1282             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1283                                      0, self.server_ip, self.get_dns_domain(),
1284                                      name, add_rec_buf, None)
1285
1286             self.check_query_txt(prefix, txt)
1287         finally:
1288             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1289                                               0, self.server_ip, self.get_dns_domain(),
1290                                               name, None, add_rec_buf)
1291
1292     def test_update_add_empty_txt_records(self):
1293         "test adding two txt records works"
1294         prefix, txt = 'emptytextrec', []
1295         p = self.make_txt_update(prefix, txt)
1296         response = self.dns_transaction_udp(p)
1297         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1298         self.check_query_txt(prefix, txt)
1299         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1300                              self.get_dns_domain(),
1301                              "%s.%s" % (prefix, self.get_dns_domain()),
1302                              dnsp.DNS_TYPE_TXT, ''))
1303
1304     def test_update_add_empty_rpc_to_dns(self):
1305         prefix, txt = 'rpcemptytextrec', []
1306
1307         name = "%s.%s" % (prefix, self.get_dns_domain())
1308
1309         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1310         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1311         add_rec_buf.rec = rec
1312         try:
1313             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1314                                      0, self.server_ip, self.get_dns_domain(),
1315                                      name, add_rec_buf, None)
1316
1317             self.check_query_txt(prefix, txt)
1318         finally:
1319             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1320                                               0, self.server_ip, self.get_dns_domain(),
1321                                               name, None, add_rec_buf)
1322
1323 TestProgram(module=__name__, opts=subunitopts)