s4:dns.py: reproducer for (bug #9184)
[nivanova/samba-autobuild/.git] / source4 / scripting / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import struct
20 import random
21 from samba import socket
22 import samba.ndr as ndr
23 import samba.dcerpc.dns as dns
24 from samba.tests import TestCase
25
26 class DNSTest(TestCase):
27
28     def errstr(self, errcode):
29         "Return a readable error code"
30         string_codes = [
31             "OK",
32             "FORMERR",
33             "SERVFAIL",
34             "NXDOMAIN",
35             "NOTIMP",
36             "REFUSED",
37             "YXDOMAIN",
38             "YXRRSET",
39             "NXRRSET",
40             "NOTAUTH",
41             "NOTZONE",
42         ]
43
44         return string_codes[errcode]
45
46
47     def assert_dns_rcode_equals(self, packet, rcode):
48         "Helper function to check return code"
49         p_errcode = packet.operation & 0x000F
50         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
51                             (self.errstr(rcode), self.errstr(p_errcode)))
52
53     def assert_dns_opcode_equals(self, packet, opcode):
54         "Helper function to check opcode"
55         p_opcode = packet.operation & 0x7800
56         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
57                             (opcode, p_opcode))
58
59     def make_name_packet(self, opcode, qid=None):
60         "Helper creating a dns.name_packet"
61         p = dns.name_packet()
62         if qid is None:
63             p.id = random.randint(0x0, 0xffff)
64         p.operation = opcode
65         p.questions = []
66         return p
67
68     def finish_name_packet(self, packet, questions):
69         "Helper to finalize a dns.name_packet"
70         packet.qdcount = len(questions)
71         packet.questions = questions
72
73     def make_name_question(self, name, qtype, qclass):
74         "Helper creating a dns.name_question"
75         q = dns.name_question()
76         q.name = name
77         q.question_type = qtype
78         q.question_class = qclass
79         return q
80
81     def get_dns_domain(self):
82         "Helper to get dns domain"
83         return os.getenv('REALM', 'example.com').lower()
84
85     def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP')):
86         "send a DNS query and read the reply"
87         s = None
88         try:
89             send_packet = ndr.ndr_pack(packet)
90             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
91             s.connect((host, 53))
92             s.send(send_packet, 0)
93             recv_packet = s.recv(2048, 0)
94             return ndr.ndr_unpack(dns.name_packet, recv_packet)
95         finally:
96             if s is not None:
97                 s.close()
98
99     def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP')):
100         "send a DNS query and read the reply"
101         s = None
102         try:
103             send_packet = ndr.ndr_pack(packet)
104             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
105             s.connect((host, 53))
106             tcp_packet = struct.pack('!H', len(send_packet))
107             tcp_packet += send_packet
108             s.send(tcp_packet, 0)
109             recv_packet = s.recv(0xffff + 2, 0)
110             return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
111         finally:
112                 if s is not None:
113                     s.close()
114
115
116 class TestSimpleQueries(DNSTest):
117
118     def test_one_a_query(self):
119         "create a query packet containing one query record"
120         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
121         questions = []
122
123         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
124         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
125         print "asking for ", q.name
126         questions.append(q)
127
128         self.finish_name_packet(p, questions)
129         response = self.dns_transaction_udp(p)
130         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
131         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
132         self.assertEquals(response.ancount, 1)
133         self.assertEquals(response.answers[0].rdata,
134                           os.getenv('SERVER_IP'))
135
136     def test_one_a_query_tcp(self):
137         "create a query packet containing one query record via TCP"
138         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
139         questions = []
140
141         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
142         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
143         print "asking for ", q.name
144         questions.append(q)
145
146         self.finish_name_packet(p, questions)
147         response = self.dns_transaction_tcp(p)
148         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
149         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
150         self.assertEquals(response.ancount, 1)
151         self.assertEquals(response.answers[0].rdata,
152                           os.getenv('SERVER_IP'))
153
154     def test_two_queries(self):
155         "create a query packet containing two query records"
156         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
157         questions = []
158
159         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
160         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
161         questions.append(q)
162
163         name = "%s.%s" % ('bogusname', self.get_dns_domain())
164         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
165         questions.append(q)
166
167         self.finish_name_packet(p, questions)
168         response = self.dns_transaction_udp(p)
169         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
170
171     def test_qtype_all_query(self):
172         "create a QTYPE_ALL query"
173         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
174         questions = []
175
176         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
177         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
178         print "asking for ", q.name
179         questions.append(q)
180
181         self.finish_name_packet(p, questions)
182         response = self.dns_transaction_udp(p)
183
184         num_answers = 1
185         dc_ipv6 = os.getenv('SERVER_IPV6')
186         if dc_ipv6 is not None:
187             num_answers += 1
188
189         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
190         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
191         self.assertEquals(response.ancount, num_answers)
192         self.assertEquals(response.answers[0].rdata,
193                           os.getenv('SERVER_IP'))
194         if dc_ipv6 is not None:
195             self.assertEquals(response.answers[1].rdata, dc_ipv6)
196
197     def test_qclass_none_query(self):
198         "create a QCLASS_NONE query"
199         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
200         questions = []
201
202         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
203         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
204         questions.append(q)
205
206         self.finish_name_packet(p, questions)
207         response = self.dns_transaction_udp(p)
208         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
209
210 # Only returns an authority section entry in BIND and Win DNS
211 # FIXME: Enable one Samba implements this feature
212 #    def test_soa_hostname_query(self):
213 #        "create a SOA query for a hostname"
214 #        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
215 #        questions = []
216 #
217 #        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
218 #        q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
219 #        questions.append(q)
220 #
221 #        self.finish_name_packet(p, questions)
222 #        response = self.dns_transaction_udp(p)
223 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
224 #        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
225 #        # We don't get SOA records for single hosts
226 #        self.assertEquals(response.ancount, 0)
227
228     def test_soa_domain_query(self):
229         "create a SOA query for a domain"
230         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
231         questions = []
232
233         name = self.get_dns_domain()
234         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
235         questions.append(q)
236
237         self.finish_name_packet(p, questions)
238         response = self.dns_transaction_udp(p)
239         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
240         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
241         self.assertEquals(response.ancount, 1)
242
243
244 class TestDNSUpdates(DNSTest):
245
246     def test_two_updates(self):
247         "create two update requests"
248         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
249         updates = []
250
251         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
252         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
253         updates.append(u)
254
255         name = self.get_dns_domain()
256         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
257         updates.append(u)
258
259         self.finish_name_packet(p, updates)
260         response = self.dns_transaction_udp(p)
261         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
262
263     def test_update_wrong_qclass(self):
264         "create update with DNS_QCLASS_NONE"
265         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
266         updates = []
267
268         name = self.get_dns_domain()
269         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
270         updates.append(u)
271
272         self.finish_name_packet(p, updates)
273         response = self.dns_transaction_udp(p)
274         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
275
276     def test_update_prereq_with_non_null_ttl(self):
277         "test update with a non-null TTL"
278         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
279         updates = []
280
281         name = self.get_dns_domain()
282
283         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
284         updates.append(u)
285         self.finish_name_packet(p, updates)
286
287         prereqs = []
288         r = dns.res_rec()
289         r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
290         r.rr_type = dns.DNS_QTYPE_TXT
291         r.rr_class = dns.DNS_QCLASS_NONE
292         r.ttl = 1
293         r.length = 0
294         prereqs.append(r)
295
296         p.ancount = len(prereqs)
297         p.answers = prereqs
298
299         response = self.dns_transaction_udp(p)
300         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
301
302 # I'd love to test this one, but it segfaults. :)
303 #    def test_update_prereq_with_non_null_length(self):
304 #        "test update with a non-null length"
305 #        p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
306 #        updates = []
307 #
308 #        name = self.get_dns_domain()
309 #
310 #        u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
311 #        updates.append(u)
312 #        self.finish_name_packet(p, updates)
313 #
314 #        prereqs = []
315 #        r = dns.res_rec()
316 #        r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
317 #        r.rr_type = dns.DNS_QTYPE_TXT
318 #        r.rr_class = dns.DNS_QCLASS_ANY
319 #        r.ttl = 0
320 #        r.length = 1
321 #        prereqs.append(r)
322 #
323 #        p.ancount = len(prereqs)
324 #        p.answers = prereqs
325 #
326 #        response = self.dns_transaction_udp(p)
327 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
328
329     def test_update_prereq_nonexisting_name(self):
330         "test update with a nonexisting name"
331         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
332         updates = []
333
334         name = self.get_dns_domain()
335
336         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
337         updates.append(u)
338         self.finish_name_packet(p, updates)
339
340         prereqs = []
341         r = dns.res_rec()
342         r.name = "idontexist.%s" % self.get_dns_domain()
343         r.rr_type = dns.DNS_QTYPE_TXT
344         r.rr_class = dns.DNS_QCLASS_ANY
345         r.ttl = 0
346         r.length = 0
347         prereqs.append(r)
348
349         p.ancount = len(prereqs)
350         p.answers = prereqs
351
352         response = self.dns_transaction_udp(p)
353         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
354
355     def test_update_add_txt_record(self):
356         "test adding records works"
357         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
358         updates = []
359
360         name = self.get_dns_domain()
361
362         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
363         updates.append(u)
364         self.finish_name_packet(p, updates)
365
366         updates = []
367         r = dns.res_rec()
368         r.name = "textrec.%s" % self.get_dns_domain()
369         r.rr_type = dns.DNS_QTYPE_TXT
370         r.rr_class = dns.DNS_QCLASS_IN
371         r.ttl = 900
372         r.length = 0xffff
373         r.rdata = dns.txt_record()
374         r.rdata.txt = '"This is a test"'
375         updates.append(r)
376         p.nscount = len(updates)
377         p.nsrecs = updates
378
379         response = self.dns_transaction_udp(p)
380         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
381
382         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
383         questions = []
384
385         name = "textrec.%s" % self.get_dns_domain()
386         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
387         questions.append(q)
388
389         self.finish_name_packet(p, questions)
390         response = self.dns_transaction_udp(p)
391         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
392         self.assertEquals(response.ancount, 1)
393         self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
394
395     def test_update_add_two_txt_records(self):
396         "test adding two txt records works"
397         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
398         updates = []
399
400         name = self.get_dns_domain()
401
402         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
403         updates.append(u)
404         self.finish_name_packet(p, updates)
405
406         updates = []
407         r = dns.res_rec()
408         r.name = "textrec2.%s" % self.get_dns_domain()
409         r.rr_type = dns.DNS_QTYPE_TXT
410         r.rr_class = dns.DNS_QCLASS_IN
411         r.ttl = 900
412         r.length = 0xffff
413         r.rdata = dns.txt_record()
414         r.rdata.txt = '"This is a test" "and this is a test, too"'
415         updates.append(r)
416         p.nscount = len(updates)
417         p.nsrecs = updates
418
419         response = self.dns_transaction_udp(p)
420         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
421
422         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
423         questions = []
424
425         name = "textrec2.%s" % self.get_dns_domain()
426         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
427         questions.append(q)
428
429         self.finish_name_packet(p, questions)
430         response = self.dns_transaction_udp(p)
431         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
432         self.assertEquals(response.ancount, 1)
433         self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
434
435     def test_delete_record(self):
436         "Test if deleting records works"
437         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
438         updates = []
439
440         name = self.get_dns_domain()
441
442         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
443         updates.append(u)
444         self.finish_name_packet(p, updates)
445
446         updates = []
447         r = dns.res_rec()
448         r.name = "textrec.%s" % self.get_dns_domain()
449         r.rr_type = dns.DNS_QTYPE_TXT
450         r.rr_class = dns.DNS_QCLASS_NONE
451         r.ttl = 0
452         r.length = 0xffff
453         r.rdata = dns.txt_record()
454         r.rdata.txt = '"This is a test"'
455         updates.append(r)
456         p.nscount = len(updates)
457         p.nsrecs = updates
458
459         response = self.dns_transaction_udp(p)
460         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
461
462         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
463         questions = []
464
465         name = "textrec.%s" % self.get_dns_domain()
466         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
467         questions.append(q)
468
469         self.finish_name_packet(p, questions)
470         response = self.dns_transaction_udp(p)
471         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
472
473
474 class TestComplexQueries(DNSTest):
475
476     def setUp(self):
477         super(TestComplexQueries, self).setUp()
478         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
479         updates = []
480
481         name = self.get_dns_domain()
482
483         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
484         updates.append(u)
485         self.finish_name_packet(p, updates)
486
487         updates = []
488         r = dns.res_rec()
489         r.name = "cname_test.%s" % self.get_dns_domain()
490         r.rr_type = dns.DNS_QTYPE_CNAME
491         r.rr_class = dns.DNS_QCLASS_IN
492         r.ttl = 900
493         r.length = 0xffff
494         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
495         updates.append(r)
496         p.nscount = len(updates)
497         p.nsrecs = updates
498
499         response = self.dns_transaction_udp(p)
500         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
501
502     def tearDown(self):
503         super(TestComplexQueries, self).tearDown()
504         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
505         updates = []
506
507         name = self.get_dns_domain()
508
509         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
510         updates.append(u)
511         self.finish_name_packet(p, updates)
512
513         updates = []
514         r = dns.res_rec()
515         r.name = "cname_test.%s" % self.get_dns_domain()
516         r.rr_type = dns.DNS_QTYPE_CNAME
517         r.rr_class = dns.DNS_QCLASS_NONE
518         r.ttl = 0
519         r.length = 0xffff
520         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
521         updates.append(r)
522         p.nscount = len(updates)
523         p.nsrecs = updates
524
525         response = self.dns_transaction_udp(p)
526         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
527
528     def test_one_a_query(self):
529         "create a query packet containing one query record"
530         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
531         questions = []
532
533         name = "cname_test.%s" % self.get_dns_domain()
534         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
535         print "asking for ", q.name
536         questions.append(q)
537
538         self.finish_name_packet(p, questions)
539         response = self.dns_transaction_udp(p)
540         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
541         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
542         self.assertEquals(response.ancount, 2)
543         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
544         self.assertEquals(response.answers[0].rdata, "%s.%s" %
545                           (os.getenv('SERVER'), self.get_dns_domain()))
546         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
547         self.assertEquals(response.answers[1].rdata,
548                           os.getenv('SERVER_IP'))
549
550 class TestInvalidQueries(DNSTest):
551
552     def test_one_a_query(self):
553         "send 0 bytes follows by create a query packet containing one query record"
554
555         s = None
556         try:
557             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
558             s.connect((os.getenv('SERVER_IP'), 53))
559             s.send("", 0)
560         finally:
561             if s is not None:
562                 s.close()
563
564         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
565         questions = []
566
567         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
568         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
569         print "asking for ", q.name
570         questions.append(q)
571
572         self.finish_name_packet(p, questions)
573         response = self.dns_transaction_udp(p)
574         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
575         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
576         self.assertEquals(response.ancount, 1)
577         self.assertEquals(response.answers[0].rdata,
578                           os.getenv('SERVER_IP'))
579
580 if __name__ == "__main__":
581     import unittest
582     unittest.main()