dns: always add authority records
[kai/samba.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import struct
20 import random
21 import socket
22 import samba.ndr as ndr
23 import samba.dcerpc.dns as dns
24 from samba import credentials, param
25 from samba.tests import TestCase
26 from samba.dcerpc import dnsp, dnsserver
27
28
29 class DNSTest(TestCase):
30
31     def errstr(self, errcode):
32         "Return a readable error code"
33         string_codes = [
34             "OK",
35             "FORMERR",
36             "SERVFAIL",
37             "NXDOMAIN",
38             "NOTIMP",
39             "REFUSED",
40             "YXDOMAIN",
41             "YXRRSET",
42             "NXRRSET",
43             "NOTAUTH",
44             "NOTZONE",
45         ]
46
47         return string_codes[errcode]
48
49
50     def assert_dns_rcode_equals(self, packet, rcode):
51         "Helper function to check return code"
52         p_errcode = packet.operation & 0x000F
53         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
54                             (self.errstr(rcode), self.errstr(p_errcode)))
55
56     def assert_dns_opcode_equals(self, packet, opcode):
57         "Helper function to check opcode"
58         p_opcode = packet.operation & 0x7800
59         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
60                             (opcode, p_opcode))
61
62     def make_name_packet(self, opcode, qid=None):
63         "Helper creating a dns.name_packet"
64         p = dns.name_packet()
65         if qid is None:
66             p.id = random.randint(0x0, 0xffff)
67         p.operation = opcode
68         p.questions = []
69         return p
70
71     def finish_name_packet(self, packet, questions):
72         "Helper to finalize a dns.name_packet"
73         packet.qdcount = len(questions)
74         packet.questions = questions
75
76     def make_name_question(self, name, qtype, qclass):
77         "Helper creating a dns.name_question"
78         q = dns.name_question()
79         q.name = name
80         q.question_type = qtype
81         q.question_class = qclass
82         return q
83
84     def get_dns_domain(self):
85         "Helper to get dns domain"
86         return os.getenv('REALM', 'example.com').lower()
87
88     def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
89         "send a DNS query and read the reply"
90         s = None
91         try:
92             send_packet = ndr.ndr_pack(packet)
93             if dump:
94                 print self.hexdump(send_packet)
95             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
96             s.connect((host, 53))
97             s.send(send_packet, 0)
98             recv_packet = s.recv(2048, 0)
99             if dump:
100                 print self.hexdump(recv_packet)
101             return ndr.ndr_unpack(dns.name_packet, recv_packet)
102         finally:
103             if s is not None:
104                 s.close()
105
106     def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
107         "send a DNS query and read the reply"
108         s = None
109         try:
110             send_packet = ndr.ndr_pack(packet)
111             if dump:
112                 print self.hexdump(send_packet)
113             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
114             s.connect((host, 53))
115             tcp_packet = struct.pack('!H', len(send_packet))
116             tcp_packet += send_packet
117             s.send(tcp_packet, 0)
118             recv_packet = s.recv(0xffff + 2, 0)
119             if dump:
120                 print self.hexdump(recv_packet)
121             return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
122         finally:
123                 if s is not None:
124                     s.close()
125
126 class TestSimpleQueries(DNSTest):
127
128     def test_one_a_query(self):
129         "create a query packet containing one query record"
130         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
131         questions = []
132
133         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
134         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
135         print "asking for ", q.name
136         questions.append(q)
137
138         self.finish_name_packet(p, questions)
139         response = self.dns_transaction_udp(p)
140         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
141         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
142         self.assertEquals(response.ancount, 1)
143         self.assertEquals(response.answers[0].rdata,
144                           os.getenv('SERVER_IP'))
145
146     def test_one_a_query_tcp(self):
147         "create a query packet containing one query record via TCP"
148         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
149         questions = []
150
151         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
152         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
153         print "asking for ", q.name
154         questions.append(q)
155
156         self.finish_name_packet(p, questions)
157         response = self.dns_transaction_tcp(p)
158         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
159         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
160         self.assertEquals(response.ancount, 1)
161         self.assertEquals(response.answers[0].rdata,
162                           os.getenv('SERVER_IP'))
163
164     def test_one_mx_query(self):
165         "create a query packet causing an empty RCODE_OK answer"
166         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
167         questions = []
168
169         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
170         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
171         print "asking for ", q.name
172         questions.append(q)
173
174         self.finish_name_packet(p, questions)
175         response = self.dns_transaction_udp(p)
176         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
177         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
178         self.assertEquals(response.ancount, 0)
179
180         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
181         questions = []
182
183         name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
184         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
185         print "asking for ", q.name
186         questions.append(q)
187
188         self.finish_name_packet(p, questions)
189         response = self.dns_transaction_udp(p)
190         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
191         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
192         self.assertEquals(response.ancount, 0)
193
194     def test_two_queries(self):
195         "create a query packet containing two query records"
196         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
197         questions = []
198
199         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
200         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
201         questions.append(q)
202
203         name = "%s.%s" % ('bogusname', self.get_dns_domain())
204         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
205         questions.append(q)
206
207         self.finish_name_packet(p, questions)
208         response = self.dns_transaction_udp(p)
209         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
210
211     def test_qtype_all_query(self):
212         "create a QTYPE_ALL query"
213         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
214         questions = []
215
216         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
217         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
218         print "asking for ", q.name
219         questions.append(q)
220
221         self.finish_name_packet(p, questions)
222         response = self.dns_transaction_udp(p)
223
224         num_answers = 1
225         dc_ipv6 = os.getenv('SERVER_IPV6')
226         if dc_ipv6 is not None:
227             num_answers += 1
228
229         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
230         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
231         self.assertEquals(response.ancount, num_answers)
232         self.assertEquals(response.answers[0].rdata,
233                           os.getenv('SERVER_IP'))
234         if dc_ipv6 is not None:
235             self.assertEquals(response.answers[1].rdata, dc_ipv6)
236
237     def test_qclass_none_query(self):
238         "create a QCLASS_NONE query"
239         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
240         questions = []
241
242         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
243         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
244         questions.append(q)
245
246         self.finish_name_packet(p, questions)
247         response = self.dns_transaction_udp(p)
248         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
249
250     def test_soa_hostname_query(self):
251         "create a SOA query for a hostname"
252         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
253         questions = []
254
255         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
256         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
257         questions.append(q)
258
259         self.finish_name_packet(p, questions)
260         response = self.dns_transaction_udp(p)
261         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
262         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
263         # We don't get SOA records for single hosts
264         self.assertEquals(response.ancount, 0)
265         # But we do respond with an authority section
266         self.assertEqual(response.nscount, 1)
267
268     def test_soa_domain_query(self):
269         "create a SOA query for a domain"
270         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
271         questions = []
272
273         name = self.get_dns_domain()
274         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
275         questions.append(q)
276
277         self.finish_name_packet(p, questions)
278         response = self.dns_transaction_udp(p)
279         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
280         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
281         self.assertEquals(response.ancount, 1)
282         self.assertEquals(response.answers[0].rdata.minimum, 3600)
283
284
285 class TestDNSUpdates(DNSTest):
286
287     def test_two_updates(self):
288         "create two update requests"
289         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
290         updates = []
291
292         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
293         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
294         updates.append(u)
295
296         name = self.get_dns_domain()
297         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
298         updates.append(u)
299
300         self.finish_name_packet(p, updates)
301         response = self.dns_transaction_udp(p)
302         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
303
304     def test_update_wrong_qclass(self):
305         "create update with DNS_QCLASS_NONE"
306         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
307         updates = []
308
309         name = self.get_dns_domain()
310         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
311         updates.append(u)
312
313         self.finish_name_packet(p, updates)
314         response = self.dns_transaction_udp(p)
315         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
316
317     def test_update_prereq_with_non_null_ttl(self):
318         "test update with a non-null TTL"
319         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
320         updates = []
321
322         name = self.get_dns_domain()
323
324         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
325         updates.append(u)
326         self.finish_name_packet(p, updates)
327
328         prereqs = []
329         r = dns.res_rec()
330         r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
331         r.rr_type = dns.DNS_QTYPE_TXT
332         r.rr_class = dns.DNS_QCLASS_NONE
333         r.ttl = 1
334         r.length = 0
335         prereqs.append(r)
336
337         p.ancount = len(prereqs)
338         p.answers = prereqs
339
340         response = self.dns_transaction_udp(p)
341         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
342
343 # I'd love to test this one, but it segfaults. :)
344 #    def test_update_prereq_with_non_null_length(self):
345 #        "test update with a non-null length"
346 #        p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
347 #        updates = []
348 #
349 #        name = self.get_dns_domain()
350 #
351 #        u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
352 #        updates.append(u)
353 #        self.finish_name_packet(p, updates)
354 #
355 #        prereqs = []
356 #        r = dns.res_rec()
357 #        r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
358 #        r.rr_type = dns.DNS_QTYPE_TXT
359 #        r.rr_class = dns.DNS_QCLASS_ANY
360 #        r.ttl = 0
361 #        r.length = 1
362 #        prereqs.append(r)
363 #
364 #        p.ancount = len(prereqs)
365 #        p.answers = prereqs
366 #
367 #        response = self.dns_transaction_udp(p)
368 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
369
370     def test_update_prereq_nonexisting_name(self):
371         "test update with a nonexisting name"
372         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
373         updates = []
374
375         name = self.get_dns_domain()
376
377         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
378         updates.append(u)
379         self.finish_name_packet(p, updates)
380
381         prereqs = []
382         r = dns.res_rec()
383         r.name = "idontexist.%s" % self.get_dns_domain()
384         r.rr_type = dns.DNS_QTYPE_TXT
385         r.rr_class = dns.DNS_QCLASS_ANY
386         r.ttl = 0
387         r.length = 0
388         prereqs.append(r)
389
390         p.ancount = len(prereqs)
391         p.answers = prereqs
392
393         response = self.dns_transaction_udp(p)
394         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
395
396     def test_update_add_txt_record(self):
397         "test adding records works"
398         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
399         updates = []
400
401         name = self.get_dns_domain()
402
403         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
404         updates.append(u)
405         self.finish_name_packet(p, updates)
406
407         updates = []
408         r = dns.res_rec()
409         r.name = "textrec.%s" % self.get_dns_domain()
410         r.rr_type = dns.DNS_QTYPE_TXT
411         r.rr_class = dns.DNS_QCLASS_IN
412         r.ttl = 900
413         r.length = 0xffff
414         rdata = dns.txt_record()
415         rdata.txt = '"This is a test"'
416         r.rdata = rdata
417         updates.append(r)
418         p.nscount = len(updates)
419         p.nsrecs = updates
420
421         response = self.dns_transaction_udp(p)
422         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
423
424         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
425         questions = []
426
427         name = "textrec.%s" % self.get_dns_domain()
428         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
429         questions.append(q)
430
431         self.finish_name_packet(p, questions)
432         response = self.dns_transaction_udp(p)
433         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
434         self.assertEquals(response.ancount, 1)
435         self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
436
437     def test_update_add_two_txt_records(self):
438         "test adding two txt records works"
439         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
440         updates = []
441
442         name = self.get_dns_domain()
443
444         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
445         updates.append(u)
446         self.finish_name_packet(p, updates)
447
448         updates = []
449         r = dns.res_rec()
450         r.name = "textrec2.%s" % self.get_dns_domain()
451         r.rr_type = dns.DNS_QTYPE_TXT
452         r.rr_class = dns.DNS_QCLASS_IN
453         r.ttl = 900
454         r.length = 0xffff
455         rdata = dns.txt_record()
456         rdata.txt = '"This is a test" "and this is a test, too"'
457         r.rdata = rdata
458         updates.append(r)
459         p.nscount = len(updates)
460         p.nsrecs = updates
461
462         response = self.dns_transaction_udp(p)
463         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
464
465         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
466         questions = []
467
468         name = "textrec2.%s" % self.get_dns_domain()
469         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
470         questions.append(q)
471
472         self.finish_name_packet(p, questions)
473         response = self.dns_transaction_udp(p)
474         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
475         self.assertEquals(response.ancount, 1)
476         self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
477
478     def test_delete_record(self):
479         "Test if deleting records works"
480
481         NAME = "deleterec.%s" % self.get_dns_domain()
482
483         # First, create a record to make sure we have a record to delete.
484         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
485         updates = []
486
487         name = self.get_dns_domain()
488
489         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
490         updates.append(u)
491         self.finish_name_packet(p, updates)
492
493         updates = []
494         r = dns.res_rec()
495         r.name = NAME
496         r.rr_type = dns.DNS_QTYPE_TXT
497         r.rr_class = dns.DNS_QCLASS_IN
498         r.ttl = 900
499         r.length = 0xffff
500         rdata = dns.txt_record()
501         rdata.txt = '"This is a test"'
502         r.rdata = rdata
503         updates.append(r)
504         p.nscount = len(updates)
505         p.nsrecs = updates
506
507         response = self.dns_transaction_udp(p)
508         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
509
510         # Now check the record is around
511         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
512         questions = []
513         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
514         questions.append(q)
515
516         self.finish_name_packet(p, questions)
517         response = self.dns_transaction_udp(p)
518         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
519
520         # Now delete the record
521         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
522         updates = []
523
524         name = self.get_dns_domain()
525
526         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
527         updates.append(u)
528         self.finish_name_packet(p, updates)
529
530         updates = []
531         r = dns.res_rec()
532         r.name = NAME
533         r.rr_type = dns.DNS_QTYPE_TXT
534         r.rr_class = dns.DNS_QCLASS_NONE
535         r.ttl = 0
536         r.length = 0xffff
537         rdata = dns.txt_record()
538         rdata.txt = '"This is a test"'
539         r.rdata = rdata
540         updates.append(r)
541         p.nscount = len(updates)
542         p.nsrecs = updates
543
544         response = self.dns_transaction_udp(p)
545         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
546
547         # And finally check it's gone
548         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
549         questions = []
550
551         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
552         questions.append(q)
553
554         self.finish_name_packet(p, questions)
555         response = self.dns_transaction_udp(p)
556         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
557
558     def test_readd_record(self):
559         "Test if adding, deleting and then readding a records works"
560
561         NAME = "readdrec.%s" % self.get_dns_domain()
562
563         # Create the record
564         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
565         updates = []
566
567         name = self.get_dns_domain()
568
569         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
570         updates.append(u)
571         self.finish_name_packet(p, updates)
572
573         updates = []
574         r = dns.res_rec()
575         r.name = NAME
576         r.rr_type = dns.DNS_QTYPE_TXT
577         r.rr_class = dns.DNS_QCLASS_IN
578         r.ttl = 900
579         r.length = 0xffff
580         rdata = dns.txt_record()
581         rdata.txt = '"This is a test"'
582         r.rdata = rdata
583         updates.append(r)
584         p.nscount = len(updates)
585         p.nsrecs = updates
586
587         response = self.dns_transaction_udp(p)
588         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
589
590         # Now check the record is around
591         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
592         questions = []
593         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
594         questions.append(q)
595
596         self.finish_name_packet(p, questions)
597         response = self.dns_transaction_udp(p)
598         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
599
600         # Now delete the record
601         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
602         updates = []
603
604         name = self.get_dns_domain()
605
606         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
607         updates.append(u)
608         self.finish_name_packet(p, updates)
609
610         updates = []
611         r = dns.res_rec()
612         r.name = NAME
613         r.rr_type = dns.DNS_QTYPE_TXT
614         r.rr_class = dns.DNS_QCLASS_NONE
615         r.ttl = 0
616         r.length = 0xffff
617         rdata = dns.txt_record()
618         rdata.txt = '"This is a test"'
619         r.rdata = rdata
620         updates.append(r)
621         p.nscount = len(updates)
622         p.nsrecs = updates
623
624         response = self.dns_transaction_udp(p)
625         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
626
627         # check it's gone
628         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
629         questions = []
630
631         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
632         questions.append(q)
633
634         self.finish_name_packet(p, questions)
635         response = self.dns_transaction_udp(p)
636         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
637
638         # recreate the record
639         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
640         updates = []
641
642         name = self.get_dns_domain()
643
644         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
645         updates.append(u)
646         self.finish_name_packet(p, updates)
647
648         updates = []
649         r = dns.res_rec()
650         r.name = NAME
651         r.rr_type = dns.DNS_QTYPE_TXT
652         r.rr_class = dns.DNS_QCLASS_IN
653         r.ttl = 900
654         r.length = 0xffff
655         rdata = dns.txt_record()
656         rdata.txt = '"This is a test"'
657         r.rdata = rdata
658         updates.append(r)
659         p.nscount = len(updates)
660         p.nsrecs = updates
661
662         response = self.dns_transaction_udp(p)
663         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
664
665         # Now check the record is around
666         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
667         questions = []
668         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
669         questions.append(q)
670
671         self.finish_name_packet(p, questions)
672         response = self.dns_transaction_udp(p)
673         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
674
675     def test_update_add_mx_record(self):
676         "test adding MX records works"
677         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
678         updates = []
679
680         name = self.get_dns_domain()
681
682         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
683         updates.append(u)
684         self.finish_name_packet(p, updates)
685
686         updates = []
687         r = dns.res_rec()
688         r.name = "%s" % self.get_dns_domain()
689         r.rr_type = dns.DNS_QTYPE_MX
690         r.rr_class = dns.DNS_QCLASS_IN
691         r.ttl = 900
692         r.length = 0xffff
693         rdata = dns.mx_record()
694         rdata.preference = 10
695         rdata.exchange = 'mail.%s' % self.get_dns_domain()
696         r.rdata = rdata
697         updates.append(r)
698         p.nscount = len(updates)
699         p.nsrecs = updates
700
701         response = self.dns_transaction_udp(p)
702         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
703
704         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
705         questions = []
706
707         name = "%s" % self.get_dns_domain()
708         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
709         questions.append(q)
710
711         self.finish_name_packet(p, questions)
712         response = self.dns_transaction_udp(p)
713         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
714         self.assertEqual(response.ancount, 1)
715         ans = response.answers[0]
716         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
717         self.assertEqual(ans.rdata.preference, 10)
718         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
719
720
721 class TestComplexQueries(DNSTest):
722
723     def setUp(self):
724         super(TestComplexQueries, self).setUp()
725         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
726         updates = []
727
728         name = self.get_dns_domain()
729
730         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
731         updates.append(u)
732         self.finish_name_packet(p, updates)
733
734         updates = []
735         r = dns.res_rec()
736         r.name = "cname_test.%s" % self.get_dns_domain()
737         r.rr_type = dns.DNS_QTYPE_CNAME
738         r.rr_class = dns.DNS_QCLASS_IN
739         r.ttl = 900
740         r.length = 0xffff
741         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
742         updates.append(r)
743         p.nscount = len(updates)
744         p.nsrecs = updates
745
746         response = self.dns_transaction_udp(p)
747         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
748
749     def tearDown(self):
750         super(TestComplexQueries, self).tearDown()
751         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
752         updates = []
753
754         name = self.get_dns_domain()
755
756         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
757         updates.append(u)
758         self.finish_name_packet(p, updates)
759
760         updates = []
761         r = dns.res_rec()
762         r.name = "cname_test.%s" % self.get_dns_domain()
763         r.rr_type = dns.DNS_QTYPE_CNAME
764         r.rr_class = dns.DNS_QCLASS_NONE
765         r.ttl = 0
766         r.length = 0xffff
767         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
768         updates.append(r)
769         p.nscount = len(updates)
770         p.nsrecs = updates
771
772         response = self.dns_transaction_udp(p)
773         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
774
775     def test_one_a_query(self):
776         "create a query packet containing one query record"
777         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
778         questions = []
779
780         name = "cname_test.%s" % self.get_dns_domain()
781         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
782         print "asking for ", q.name
783         questions.append(q)
784
785         self.finish_name_packet(p, questions)
786         response = self.dns_transaction_udp(p)
787         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
788         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
789         self.assertEquals(response.ancount, 2)
790         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
791         self.assertEquals(response.answers[0].rdata, "%s.%s" %
792                           (os.getenv('SERVER'), self.get_dns_domain()))
793         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
794         self.assertEquals(response.answers[1].rdata,
795                           os.getenv('SERVER_IP'))
796
797 class TestInvalidQueries(DNSTest):
798
799     def test_one_a_query(self):
800         "send 0 bytes follows by create a query packet containing one query record"
801
802         s = None
803         try:
804             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
805             s.connect((os.getenv('SERVER_IP'), 53))
806             s.send("", 0)
807         finally:
808             if s is not None:
809                 s.close()
810
811         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
812         questions = []
813
814         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
815         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
816         print "asking for ", q.name
817         questions.append(q)
818
819         self.finish_name_packet(p, questions)
820         response = self.dns_transaction_udp(p)
821         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
822         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
823         self.assertEquals(response.ancount, 1)
824         self.assertEquals(response.answers[0].rdata,
825                           os.getenv('SERVER_IP'))
826
827     def test_one_a_reply(self):
828         "send a reply instead of a query"
829
830         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
831         questions = []
832
833         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
834         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
835         print "asking for ", q.name
836         questions.append(q)
837
838         self.finish_name_packet(p, questions)
839         p.operation |= dns.DNS_FLAG_REPLY
840         s = None
841         try:
842             send_packet = ndr.ndr_pack(p)
843             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
844             host=os.getenv('SERVER_IP')
845             s.connect((host, 53))
846             tcp_packet = struct.pack('!H', len(send_packet))
847             tcp_packet += send_packet
848             s.send(tcp_packet, 0)
849             recv_packet = s.recv(0xffff + 2, 0)
850             self.assertEquals(0, len(recv_packet))
851         finally:
852             if s is not None:
853                 s.close()
854
855 class TestZones(DNSTest):
856     def get_loadparm(self):
857         lp = param.LoadParm()
858         lp.load(os.getenv("SMB_CONF_PATH"))
859         return lp
860
861     def get_credentials(self, lp):
862         creds = credentials.Credentials()
863         creds.guess(lp)
864         creds.set_machine_account(lp)
865         creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
866         return creds
867
868     def setUp(self):
869         super(TestZones, self).setUp()
870         self.lp = self.get_loadparm()
871         self.creds = self.get_credentials(self.lp)
872         self.server = os.getenv("SERVER_IP")
873         self.zone = "test.lan"
874         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
875                                             self.lp, self.creds)
876
877     def tearDown(self):
878         super(TestZones, self).tearDown()
879         try:
880             self.delete_zone(self.zone)
881         except RuntimeError, (num, string):
882             if num != 9601: #WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST
883                 raise
884
885     def create_zone(self, zone):
886         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
887         zone_create.pszZoneName = zone
888         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
889         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
890         zone_create.fAging = 0
891         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
892         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
893                                        0,
894                                        self.server,
895                                        None,
896                                        0,
897                                        'ZoneCreate',
898                                        dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
899                                        zone_create)
900
901     def delete_zone(self, zone):
902         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
903                                        0,
904                                        self.server,
905                                        zone,
906                                        0,
907                                        'DeleteZoneFromDs',
908                                        dnsserver.DNSSRV_TYPEID_NULL,
909                                        None)
910
911     def test_soa_query(self):
912         zone = "test.lan"
913         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
914         questions = []
915
916         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
917         questions.append(q)
918         self.finish_name_packet(p, questions)
919
920         response = self.dns_transaction_udp(p)
921         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
922         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
923         self.assertEquals(response.ancount, 0)
924
925         self.create_zone(zone)
926         response = self.dns_transaction_udp(p)
927         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
928         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
929         self.assertEquals(response.ancount, 1)
930         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
931
932         self.delete_zone(zone)
933         response = self.dns_transaction_udp(p)
934         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
935         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
936         self.assertEquals(response.ancount, 0)
937
938
939
940 if __name__ == "__main__":
941     import unittest
942     unittest.main()