c95148b70e287e0d9be90b7f1a5b6d42d73b6a49
[sfrench/samba-autobuild/.git] / source4 / scripting / python / samba / tests / dns.py
1 #!/usr/bin/env python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin  <kai@samba.org> 2011
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import os
21 import sys
22 import struct
23 import random
24 from samba import socket
25 import samba.ndr as ndr
26 import samba.dcerpc.dns as dns
27 from samba.tests import TestCase
28
29 class DNSTest(TestCase):
30
31     def assert_dns_rcode_equals(self, packet, rcode):
32         "Helper function to check return code"
33         p_errcode = packet.operation & 0x000F
34         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
35                             (rcode, p_errcode))
36
37     def assert_dns_opcode_equals(self, packet, opcode):
38         "Helper function to check opcode"
39         p_opcode = packet.operation & 0x7800
40         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
41                             (opcode, p_opcode))
42
43     def make_name_packet(self, opcode, qid=None):
44         "Helper creating a dns.name_packet"
45         p = dns.name_packet()
46         if qid is None:
47             p.id = random.randint(0x0, 0xffff)
48         p.operation = opcode
49         p.questions = []
50         return p
51
52     def finish_name_packet(self, packet, questions):
53         "Helper to finalize a dns.name_packet"
54         packet.qdcount = len(questions)
55         packet.questions = questions
56
57     def make_name_question(self, name, qtype, qclass):
58         "Helper creating a dns.name_question"
59         q = dns.name_question()
60         q.name = name
61         q.question_type = qtype
62         q.question_class = qclass
63         return q
64
65     def get_dns_domain(self):
66         "Helper to get dns domain"
67         return os.getenv('REALM', 'example.com').lower()
68
69     def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
70         "send a DNS query and read the reply"
71         s = None
72         try:
73             send_packet = ndr.ndr_pack(packet)
74             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
75             s.connect((host, 53))
76             s.send(send_packet, 0)
77             recv_packet = s.recv(2048, 0)
78             return ndr.ndr_unpack(dns.name_packet, recv_packet)
79         finally:
80             if s is not None:
81                 s.close()
82
83     def test_one_a_query(self):
84         "create a query packet containing one query record"
85         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
86         questions = []
87
88         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
89         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
90         print "asking for ", q.name
91         questions.append(q)
92
93         self.finish_name_packet(p, questions)
94         response = self.dns_transaction_udp(p)
95         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97         self.assertEquals(response.ancount, 1)
98         self.assertEquals(response.answers[0].rdata,
99                           os.getenv('DC_SERVER_IP'))
100
101     def test_two_queries(self):
102         "create a query packet containing two query records"
103         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
104         questions = []
105
106         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
107         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
108         questions.append(q)
109
110         name = "%s.%s" % ('bogusname', self.get_dns_domain())
111         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
112         questions.append(q)
113
114         self.finish_name_packet(p, questions)
115         response = self.dns_transaction_udp(p)
116         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
117
118     def test_qtype_all_query(self):
119         "create a QTYPE_ALL query"
120         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
121         questions = []
122
123         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
124         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
125         print "asking for ", q.name
126         questions.append(q)
127
128         self.finish_name_packet(p, questions)
129         response = self.dns_transaction_udp(p)
130
131         num_answers = 1
132         dc_ipv6 = os.getenv('DC_SERVER_IPV6')
133         if dc_ipv6 is not None:
134             num_answers += 1
135
136         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
137         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
138         self.assertEquals(response.ancount, num_answers)
139         self.assertEquals(response.answers[0].rdata,
140                           os.getenv('DC_SERVER_IP'))
141         if dc_ipv6 is not None:
142             self.assertEquals(response.answers[1].rdata, dc_ipv6)
143
144     def test_qclass_none_query(self):
145         "create a QCLASS_NONE query"
146         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
147         questions = []
148
149         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
150         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
151         questions.append(q)
152
153         self.finish_name_packet(p, questions)
154         response = self.dns_transaction_udp(p)
155         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
156
157 # Only returns an authority section entry in BIND and Win DNS
158 # FIXME: Enable one Samba implements this feature
159 #    def test_soa_hostname_query(self):
160 #        "create a SOA query for a hostname"
161 #        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
162 #        questions = []
163 #
164 #        name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
165 #        q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
166 #        questions.append(q)
167 #
168 #        self.finish_name_packet(p, questions)
169 #        response = self.dns_transaction_udp(p)
170 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
171 #        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
172 #        # We don't get SOA records for single hosts
173 #        self.assertEquals(response.ancount, 0)
174
175     def test_soa_domain_query(self):
176         "create a SOA query for a domain"
177         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
178         questions = []
179
180         name = self.get_dns_domain()
181         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
182         questions.append(q)
183
184         self.finish_name_packet(p, questions)
185         response = self.dns_transaction_udp(p)
186         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
187         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
188         self.assertEquals(response.ancount, 1)
189
190     def test_two_updates(self):
191         "create two update requests"
192         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
193         updates = []
194
195         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
196         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
197         updates.append(u)
198
199         name = self.get_dns_domain()
200         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
201         updates.append(u)
202
203         self.finish_name_packet(p, updates)
204         response = self.dns_transaction_udp(p)
205         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
206
207
208 if __name__ == "__main__":
209     import unittest
210     unittest.main()