dns.py: Always remove the test zone in tearDown()
[samba.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import struct
20 import random
21 import socket
22 import samba.ndr as ndr
23 import samba.dcerpc.dns as dns
24 from samba import credentials, param
25 from samba.tests import TestCase
26 from samba.dcerpc import dnsp, dnsserver
27
28 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
29
30
31 class DNSTest(TestCase):
32
33     def errstr(self, errcode):
34         "Return a readable error code"
35         string_codes = [
36             "OK",
37             "FORMERR",
38             "SERVFAIL",
39             "NXDOMAIN",
40             "NOTIMP",
41             "REFUSED",
42             "YXDOMAIN",
43             "YXRRSET",
44             "NXRRSET",
45             "NOTAUTH",
46             "NOTZONE",
47         ]
48
49         return string_codes[errcode]
50
51
52     def assert_dns_rcode_equals(self, packet, rcode):
53         "Helper function to check return code"
54         p_errcode = packet.operation & 0x000F
55         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
56                             (self.errstr(rcode), self.errstr(p_errcode)))
57
58     def assert_dns_opcode_equals(self, packet, opcode):
59         "Helper function to check opcode"
60         p_opcode = packet.operation & 0x7800
61         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
62                             (opcode, p_opcode))
63
64     def make_name_packet(self, opcode, qid=None):
65         "Helper creating a dns.name_packet"
66         p = dns.name_packet()
67         if qid is None:
68             p.id = random.randint(0x0, 0xffff)
69         p.operation = opcode
70         p.questions = []
71         return p
72
73     def finish_name_packet(self, packet, questions):
74         "Helper to finalize a dns.name_packet"
75         packet.qdcount = len(questions)
76         packet.questions = questions
77
78     def make_name_question(self, name, qtype, qclass):
79         "Helper creating a dns.name_question"
80         q = dns.name_question()
81         q.name = name
82         q.question_type = qtype
83         q.question_class = qclass
84         return q
85
86     def get_dns_domain(self):
87         "Helper to get dns domain"
88         return os.getenv('REALM', 'example.com').lower()
89
90     def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
91         "send a DNS query and read the reply"
92         s = None
93         try:
94             send_packet = ndr.ndr_pack(packet)
95             if dump:
96                 print self.hexdump(send_packet)
97             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
98             s.connect((host, 53))
99             s.send(send_packet, 0)
100             recv_packet = s.recv(2048, 0)
101             if dump:
102                 print self.hexdump(recv_packet)
103             return ndr.ndr_unpack(dns.name_packet, recv_packet)
104         finally:
105             if s is not None:
106                 s.close()
107
108     def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
109         "send a DNS query and read the reply"
110         s = None
111         try:
112             send_packet = ndr.ndr_pack(packet)
113             if dump:
114                 print self.hexdump(send_packet)
115             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
116             s.connect((host, 53))
117             tcp_packet = struct.pack('!H', len(send_packet))
118             tcp_packet += send_packet
119             s.send(tcp_packet, 0)
120             recv_packet = s.recv(0xffff + 2, 0)
121             if dump:
122                 print self.hexdump(recv_packet)
123             return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
124         finally:
125                 if s is not None:
126                     s.close()
127
128     def hexdump(self, src, length=8):
129         N = 0
130         result = ''
131         while src:
132             s, src = src[:length], src[length:]
133             hexa = ' '.join(["%02X" % ord(x) for x in s])
134             s = s.translate(FILTER)
135             result += "%04X   %-*s   %s\n" % (N, length*3, hexa, s)
136             N += length
137         return result
138
139
140 class TestSimpleQueries(DNSTest):
141
142     def test_one_a_query(self):
143         "create a query packet containing one query record"
144         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
145         questions = []
146
147         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
148         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
149         print "asking for ", q.name
150         questions.append(q)
151
152         self.finish_name_packet(p, questions)
153         response = self.dns_transaction_udp(p)
154         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
155         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
156         self.assertEquals(response.ancount, 1)
157         self.assertEquals(response.answers[0].rdata,
158                           os.getenv('SERVER_IP'))
159
160     def test_one_a_query_tcp(self):
161         "create a query packet containing one query record via TCP"
162         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
163         questions = []
164
165         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
166         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
167         print "asking for ", q.name
168         questions.append(q)
169
170         self.finish_name_packet(p, questions)
171         response = self.dns_transaction_tcp(p)
172         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
173         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
174         self.assertEquals(response.ancount, 1)
175         self.assertEquals(response.answers[0].rdata,
176                           os.getenv('SERVER_IP'))
177
178     def test_one_mx_query(self):
179         "create a query packet causing an empty RCODE_OK answer"
180         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
181         questions = []
182
183         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
184         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
185         print "asking for ", q.name
186         questions.append(q)
187
188         self.finish_name_packet(p, questions)
189         response = self.dns_transaction_udp(p)
190         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
191         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
192         self.assertEquals(response.ancount, 0)
193
194         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
195         questions = []
196
197         name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
198         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
199         print "asking for ", q.name
200         questions.append(q)
201
202         self.finish_name_packet(p, questions)
203         response = self.dns_transaction_udp(p)
204         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
205         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
206         self.assertEquals(response.ancount, 0)
207
208     def test_two_queries(self):
209         "create a query packet containing two query records"
210         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
211         questions = []
212
213         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
214         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
215         questions.append(q)
216
217         name = "%s.%s" % ('bogusname', self.get_dns_domain())
218         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
219         questions.append(q)
220
221         self.finish_name_packet(p, questions)
222         response = self.dns_transaction_udp(p)
223         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
224
225     def test_qtype_all_query(self):
226         "create a QTYPE_ALL query"
227         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
228         questions = []
229
230         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
231         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
232         print "asking for ", q.name
233         questions.append(q)
234
235         self.finish_name_packet(p, questions)
236         response = self.dns_transaction_udp(p)
237
238         num_answers = 1
239         dc_ipv6 = os.getenv('SERVER_IPV6')
240         if dc_ipv6 is not None:
241             num_answers += 1
242
243         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
244         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
245         self.assertEquals(response.ancount, num_answers)
246         self.assertEquals(response.answers[0].rdata,
247                           os.getenv('SERVER_IP'))
248         if dc_ipv6 is not None:
249             self.assertEquals(response.answers[1].rdata, dc_ipv6)
250
251     def test_qclass_none_query(self):
252         "create a QCLASS_NONE query"
253         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
254         questions = []
255
256         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
257         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
258         questions.append(q)
259
260         self.finish_name_packet(p, questions)
261         response = self.dns_transaction_udp(p)
262         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
263
264 # Only returns an authority section entry in BIND and Win DNS
265 # FIXME: Enable one Samba implements this feature
266 #    def test_soa_hostname_query(self):
267 #        "create a SOA query for a hostname"
268 #        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
269 #        questions = []
270 #
271 #        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
272 #        q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
273 #        questions.append(q)
274 #
275 #        self.finish_name_packet(p, questions)
276 #        response = self.dns_transaction_udp(p)
277 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
278 #        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
279 #        # We don't get SOA records for single hosts
280 #        self.assertEquals(response.ancount, 0)
281
282     def test_soa_domain_query(self):
283         "create a SOA query for a domain"
284         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
285         questions = []
286
287         name = self.get_dns_domain()
288         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
289         questions.append(q)
290
291         self.finish_name_packet(p, questions)
292         response = self.dns_transaction_udp(p)
293         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
294         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
295         self.assertEquals(response.ancount, 1)
296         self.assertEquals(response.answers[0].rdata.minimum, 3600)
297
298
299 class TestDNSUpdates(DNSTest):
300
301     def test_two_updates(self):
302         "create two update requests"
303         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
304         updates = []
305
306         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
307         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
308         updates.append(u)
309
310         name = self.get_dns_domain()
311         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
312         updates.append(u)
313
314         self.finish_name_packet(p, updates)
315         response = self.dns_transaction_udp(p)
316         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
317
318     def test_update_wrong_qclass(self):
319         "create update with DNS_QCLASS_NONE"
320         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
321         updates = []
322
323         name = self.get_dns_domain()
324         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
325         updates.append(u)
326
327         self.finish_name_packet(p, updates)
328         response = self.dns_transaction_udp(p)
329         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
330
331     def test_update_prereq_with_non_null_ttl(self):
332         "test update with a non-null TTL"
333         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
334         updates = []
335
336         name = self.get_dns_domain()
337
338         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
339         updates.append(u)
340         self.finish_name_packet(p, updates)
341
342         prereqs = []
343         r = dns.res_rec()
344         r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
345         r.rr_type = dns.DNS_QTYPE_TXT
346         r.rr_class = dns.DNS_QCLASS_NONE
347         r.ttl = 1
348         r.length = 0
349         prereqs.append(r)
350
351         p.ancount = len(prereqs)
352         p.answers = prereqs
353
354         response = self.dns_transaction_udp(p)
355         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
356
357 # I'd love to test this one, but it segfaults. :)
358 #    def test_update_prereq_with_non_null_length(self):
359 #        "test update with a non-null length"
360 #        p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
361 #        updates = []
362 #
363 #        name = self.get_dns_domain()
364 #
365 #        u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
366 #        updates.append(u)
367 #        self.finish_name_packet(p, updates)
368 #
369 #        prereqs = []
370 #        r = dns.res_rec()
371 #        r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
372 #        r.rr_type = dns.DNS_QTYPE_TXT
373 #        r.rr_class = dns.DNS_QCLASS_ANY
374 #        r.ttl = 0
375 #        r.length = 1
376 #        prereqs.append(r)
377 #
378 #        p.ancount = len(prereqs)
379 #        p.answers = prereqs
380 #
381 #        response = self.dns_transaction_udp(p)
382 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
383
384     def test_update_prereq_nonexisting_name(self):
385         "test update with a nonexisting name"
386         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
387         updates = []
388
389         name = self.get_dns_domain()
390
391         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
392         updates.append(u)
393         self.finish_name_packet(p, updates)
394
395         prereqs = []
396         r = dns.res_rec()
397         r.name = "idontexist.%s" % self.get_dns_domain()
398         r.rr_type = dns.DNS_QTYPE_TXT
399         r.rr_class = dns.DNS_QCLASS_ANY
400         r.ttl = 0
401         r.length = 0
402         prereqs.append(r)
403
404         p.ancount = len(prereqs)
405         p.answers = prereqs
406
407         response = self.dns_transaction_udp(p)
408         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
409
410     def test_update_add_txt_record(self):
411         "test adding records works"
412         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
413         updates = []
414
415         name = self.get_dns_domain()
416
417         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
418         updates.append(u)
419         self.finish_name_packet(p, updates)
420
421         updates = []
422         r = dns.res_rec()
423         r.name = "textrec.%s" % self.get_dns_domain()
424         r.rr_type = dns.DNS_QTYPE_TXT
425         r.rr_class = dns.DNS_QCLASS_IN
426         r.ttl = 900
427         r.length = 0xffff
428         rdata = dns.txt_record()
429         rdata.txt = '"This is a test"'
430         r.rdata = rdata
431         updates.append(r)
432         p.nscount = len(updates)
433         p.nsrecs = updates
434
435         response = self.dns_transaction_udp(p)
436         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
437
438         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
439         questions = []
440
441         name = "textrec.%s" % self.get_dns_domain()
442         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
443         questions.append(q)
444
445         self.finish_name_packet(p, questions)
446         response = self.dns_transaction_udp(p)
447         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
448         self.assertEquals(response.ancount, 1)
449         self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
450
451     def test_update_add_two_txt_records(self):
452         "test adding two txt records works"
453         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
454         updates = []
455
456         name = self.get_dns_domain()
457
458         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
459         updates.append(u)
460         self.finish_name_packet(p, updates)
461
462         updates = []
463         r = dns.res_rec()
464         r.name = "textrec2.%s" % self.get_dns_domain()
465         r.rr_type = dns.DNS_QTYPE_TXT
466         r.rr_class = dns.DNS_QCLASS_IN
467         r.ttl = 900
468         r.length = 0xffff
469         rdata = dns.txt_record()
470         rdata.txt = '"This is a test" "and this is a test, too"'
471         r.rdata = rdata
472         updates.append(r)
473         p.nscount = len(updates)
474         p.nsrecs = updates
475
476         response = self.dns_transaction_udp(p)
477         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
478
479         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
480         questions = []
481
482         name = "textrec2.%s" % self.get_dns_domain()
483         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
484         questions.append(q)
485
486         self.finish_name_packet(p, questions)
487         response = self.dns_transaction_udp(p)
488         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
489         self.assertEquals(response.ancount, 1)
490         self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
491
492     def test_delete_record(self):
493         "Test if deleting records works"
494
495         NAME = "deleterec.%s" % self.get_dns_domain()
496
497         # First, create a record to make sure we have a record to delete.
498         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
499         updates = []
500
501         name = self.get_dns_domain()
502
503         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
504         updates.append(u)
505         self.finish_name_packet(p, updates)
506
507         updates = []
508         r = dns.res_rec()
509         r.name = NAME
510         r.rr_type = dns.DNS_QTYPE_TXT
511         r.rr_class = dns.DNS_QCLASS_IN
512         r.ttl = 900
513         r.length = 0xffff
514         rdata = dns.txt_record()
515         rdata.txt = '"This is a test"'
516         r.rdata = rdata
517         updates.append(r)
518         p.nscount = len(updates)
519         p.nsrecs = updates
520
521         response = self.dns_transaction_udp(p)
522         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
523
524         # Now check the record is around
525         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
526         questions = []
527         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
528         questions.append(q)
529
530         self.finish_name_packet(p, questions)
531         response = self.dns_transaction_udp(p)
532         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
533
534         # Now delete the record
535         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
536         updates = []
537
538         name = self.get_dns_domain()
539
540         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
541         updates.append(u)
542         self.finish_name_packet(p, updates)
543
544         updates = []
545         r = dns.res_rec()
546         r.name = NAME
547         r.rr_type = dns.DNS_QTYPE_TXT
548         r.rr_class = dns.DNS_QCLASS_NONE
549         r.ttl = 0
550         r.length = 0xffff
551         rdata = dns.txt_record()
552         rdata.txt = '"This is a test"'
553         r.rdata = rdata
554         updates.append(r)
555         p.nscount = len(updates)
556         p.nsrecs = updates
557
558         response = self.dns_transaction_udp(p)
559         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
560
561         # And finally check it's gone
562         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
563         questions = []
564
565         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
566         questions.append(q)
567
568         self.finish_name_packet(p, questions)
569         response = self.dns_transaction_udp(p)
570         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
571
572     def test_readd_record(self):
573         "Test if adding, deleting and then readding a records works"
574
575         NAME = "readdrec.%s" % self.get_dns_domain()
576
577         # Create the record
578         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
579         updates = []
580
581         name = self.get_dns_domain()
582
583         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
584         updates.append(u)
585         self.finish_name_packet(p, updates)
586
587         updates = []
588         r = dns.res_rec()
589         r.name = NAME
590         r.rr_type = dns.DNS_QTYPE_TXT
591         r.rr_class = dns.DNS_QCLASS_IN
592         r.ttl = 900
593         r.length = 0xffff
594         rdata = dns.txt_record()
595         rdata.txt = '"This is a test"'
596         r.rdata = rdata
597         updates.append(r)
598         p.nscount = len(updates)
599         p.nsrecs = updates
600
601         response = self.dns_transaction_udp(p)
602         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
603
604         # Now check the record is around
605         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
606         questions = []
607         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
608         questions.append(q)
609
610         self.finish_name_packet(p, questions)
611         response = self.dns_transaction_udp(p)
612         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
613
614         # Now delete the record
615         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
616         updates = []
617
618         name = self.get_dns_domain()
619
620         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
621         updates.append(u)
622         self.finish_name_packet(p, updates)
623
624         updates = []
625         r = dns.res_rec()
626         r.name = NAME
627         r.rr_type = dns.DNS_QTYPE_TXT
628         r.rr_class = dns.DNS_QCLASS_NONE
629         r.ttl = 0
630         r.length = 0xffff
631         rdata = dns.txt_record()
632         rdata.txt = '"This is a test"'
633         r.rdata = rdata
634         updates.append(r)
635         p.nscount = len(updates)
636         p.nsrecs = updates
637
638         response = self.dns_transaction_udp(p)
639         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
640
641         # check it's gone
642         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
643         questions = []
644
645         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
646         questions.append(q)
647
648         self.finish_name_packet(p, questions)
649         response = self.dns_transaction_udp(p)
650         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
651
652         # recreate the record
653         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
654         updates = []
655
656         name = self.get_dns_domain()
657
658         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
659         updates.append(u)
660         self.finish_name_packet(p, updates)
661
662         updates = []
663         r = dns.res_rec()
664         r.name = NAME
665         r.rr_type = dns.DNS_QTYPE_TXT
666         r.rr_class = dns.DNS_QCLASS_IN
667         r.ttl = 900
668         r.length = 0xffff
669         rdata = dns.txt_record()
670         rdata.txt = '"This is a test"'
671         r.rdata = rdata
672         updates.append(r)
673         p.nscount = len(updates)
674         p.nsrecs = updates
675
676         response = self.dns_transaction_udp(p)
677         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
678
679         # Now check the record is around
680         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
681         questions = []
682         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
683         questions.append(q)
684
685         self.finish_name_packet(p, questions)
686         response = self.dns_transaction_udp(p)
687         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
688
689     def test_update_add_mx_record(self):
690         "test adding MX records works"
691         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
692         updates = []
693
694         name = self.get_dns_domain()
695
696         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
697         updates.append(u)
698         self.finish_name_packet(p, updates)
699
700         updates = []
701         r = dns.res_rec()
702         r.name = "%s" % self.get_dns_domain()
703         r.rr_type = dns.DNS_QTYPE_MX
704         r.rr_class = dns.DNS_QCLASS_IN
705         r.ttl = 900
706         r.length = 0xffff
707         rdata = dns.mx_record()
708         rdata.preference = 10
709         rdata.exchange = 'mail.%s' % self.get_dns_domain()
710         r.rdata = rdata
711         updates.append(r)
712         p.nscount = len(updates)
713         p.nsrecs = updates
714
715         response = self.dns_transaction_udp(p)
716         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
717
718         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
719         questions = []
720
721         name = "%s" % self.get_dns_domain()
722         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
723         questions.append(q)
724
725         self.finish_name_packet(p, questions)
726         response = self.dns_transaction_udp(p)
727         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
728         self.assertEqual(response.ancount, 1)
729         ans = response.answers[0]
730         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
731         self.assertEqual(ans.rdata.preference, 10)
732         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
733
734
735 class TestComplexQueries(DNSTest):
736
737     def setUp(self):
738         super(TestComplexQueries, self).setUp()
739         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
740         updates = []
741
742         name = self.get_dns_domain()
743
744         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
745         updates.append(u)
746         self.finish_name_packet(p, updates)
747
748         updates = []
749         r = dns.res_rec()
750         r.name = "cname_test.%s" % self.get_dns_domain()
751         r.rr_type = dns.DNS_QTYPE_CNAME
752         r.rr_class = dns.DNS_QCLASS_IN
753         r.ttl = 900
754         r.length = 0xffff
755         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
756         updates.append(r)
757         p.nscount = len(updates)
758         p.nsrecs = updates
759
760         response = self.dns_transaction_udp(p)
761         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
762
763     def tearDown(self):
764         super(TestComplexQueries, self).tearDown()
765         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
766         updates = []
767
768         name = self.get_dns_domain()
769
770         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
771         updates.append(u)
772         self.finish_name_packet(p, updates)
773
774         updates = []
775         r = dns.res_rec()
776         r.name = "cname_test.%s" % self.get_dns_domain()
777         r.rr_type = dns.DNS_QTYPE_CNAME
778         r.rr_class = dns.DNS_QCLASS_NONE
779         r.ttl = 0
780         r.length = 0xffff
781         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
782         updates.append(r)
783         p.nscount = len(updates)
784         p.nsrecs = updates
785
786         response = self.dns_transaction_udp(p)
787         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
788
789     def test_one_a_query(self):
790         "create a query packet containing one query record"
791         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
792         questions = []
793
794         name = "cname_test.%s" % self.get_dns_domain()
795         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
796         print "asking for ", q.name
797         questions.append(q)
798
799         self.finish_name_packet(p, questions)
800         response = self.dns_transaction_udp(p)
801         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
802         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
803         self.assertEquals(response.ancount, 2)
804         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
805         self.assertEquals(response.answers[0].rdata, "%s.%s" %
806                           (os.getenv('SERVER'), self.get_dns_domain()))
807         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
808         self.assertEquals(response.answers[1].rdata,
809                           os.getenv('SERVER_IP'))
810
811 class TestInvalidQueries(DNSTest):
812
813     def test_one_a_query(self):
814         "send 0 bytes follows by create a query packet containing one query record"
815
816         s = None
817         try:
818             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
819             s.connect((os.getenv('SERVER_IP'), 53))
820             s.send("", 0)
821         finally:
822             if s is not None:
823                 s.close()
824
825         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
826         questions = []
827
828         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
829         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
830         print "asking for ", q.name
831         questions.append(q)
832
833         self.finish_name_packet(p, questions)
834         response = self.dns_transaction_udp(p)
835         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
836         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
837         self.assertEquals(response.ancount, 1)
838         self.assertEquals(response.answers[0].rdata,
839                           os.getenv('SERVER_IP'))
840
841     def test_one_a_reply(self):
842         "send a reply instead of a query"
843
844         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
845         questions = []
846
847         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
848         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
849         print "asking for ", q.name
850         questions.append(q)
851
852         self.finish_name_packet(p, questions)
853         p.operation |= dns.DNS_FLAG_REPLY
854         s = None
855         try:
856             send_packet = ndr.ndr_pack(p)
857             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
858             host=os.getenv('SERVER_IP')
859             s.connect((host, 53))
860             tcp_packet = struct.pack('!H', len(send_packet))
861             tcp_packet += send_packet
862             s.send(tcp_packet, 0)
863             recv_packet = s.recv(0xffff + 2, 0)
864             self.assertEquals(0, len(recv_packet))
865         finally:
866             if s is not None:
867                 s.close()
868
869 class TestZones(DNSTest):
870     def get_loadparm(self):
871         lp = param.LoadParm()
872         lp.load(os.getenv("SMB_CONF_PATH"))
873         return lp
874
875     def get_credentials(self, lp):
876         creds = credentials.Credentials()
877         creds.guess(lp)
878         creds.set_machine_account(lp)
879         creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
880         return creds
881
882     def setUp(self):
883         super(TestZones, self).setUp()
884         self.lp = self.get_loadparm()
885         self.creds = self.get_credentials(self.lp)
886         self.server = os.getenv("SERVER_IP")
887         self.zone = "test.lan"
888         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
889                                             self.lp, self.creds)
890
891     def tearDown(self):
892         super(TestZones, self).tearDown()
893         try:
894             self.delete_zone(self.zone)
895         except RuntimeError, (num, string):
896             if num != 9601: #WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
897                 raise
898
899     def create_zone(self, zone):
900         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
901         zone_create.pszZoneName = zone
902         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
903         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
904         zone_create.fAging = 0
905         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
906         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
907                                        0,
908                                        self.server,
909                                        None,
910                                        0,
911                                        'ZoneCreate',
912                                        dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
913                                        zone_create)
914
915     def delete_zone(self, zone):
916         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
917                                        0,
918                                        self.server,
919                                        zone,
920                                        0,
921                                        'DeleteZoneFromDs',
922                                        dnsserver.DNSSRV_TYPEID_NULL,
923                                        None)
924
925     def test_soa_query(self):
926         zone = "test.lan"
927         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
928         questions = []
929
930         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
931         questions.append(q)
932         self.finish_name_packet(p, questions)
933
934         response = self.dns_transaction_udp(p)
935         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
936         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
937         self.assertEquals(response.ancount, 0)
938
939         self.create_zone(zone)
940         response = self.dns_transaction_udp(p)
941         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
942         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
943         self.assertEquals(response.ancount, 1)
944         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
945
946         self.delete_zone(zone)
947         response = self.dns_transaction_udp(p)
948         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
949         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
950         self.assertEquals(response.ancount, 0)
951
952
953
954 if __name__ == "__main__":
955     import unittest
956     unittest.main()