3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba import socket
25 import samba.ndr as ndr
26 import samba.dcerpc.dns as dns
27 from samba.tests import TestCase
29 class DNSTest(TestCase):
31 def errstr(self, errcode):
32 "Return a readable error code"
47 return string_codes[errcode]
50 def assert_dns_rcode_equals(self, packet, rcode):
51 "Helper function to check return code"
52 p_errcode = packet.operation & 0x000F
53 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
54 (self.errstr(rcode), self.errstr(p_errcode)))
56 def assert_dns_opcode_equals(self, packet, opcode):
57 "Helper function to check opcode"
58 p_opcode = packet.operation & 0x7800
59 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
62 def make_name_packet(self, opcode, qid=None):
63 "Helper creating a dns.name_packet"
66 p.id = random.randint(0x0, 0xffff)
71 def finish_name_packet(self, packet, questions):
72 "Helper to finalize a dns.name_packet"
73 packet.qdcount = len(questions)
74 packet.questions = questions
76 def make_name_question(self, name, qtype, qclass):
77 "Helper creating a dns.name_question"
78 q = dns.name_question()
80 q.question_type = qtype
81 q.question_class = qclass
84 def get_dns_domain(self):
85 "Helper to get dns domain"
86 return os.getenv('REALM', 'example.com').lower()
88 def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
89 "send a DNS query and read the reply"
92 send_packet = ndr.ndr_pack(packet)
93 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
95 s.send(send_packet, 0)
96 recv_packet = s.recv(2048, 0)
97 return ndr.ndr_unpack(dns.name_packet, recv_packet)
102 def test_one_a_query(self):
103 "create a query packet containing one query record"
104 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
107 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
108 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
109 print "asking for ", q.name
112 self.finish_name_packet(p, questions)
113 response = self.dns_transaction_udp(p)
114 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
115 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
116 self.assertEquals(response.ancount, 1)
117 self.assertEquals(response.answers[0].rdata,
118 os.getenv('DC_SERVER_IP'))
120 def test_two_queries(self):
121 "create a query packet containing two query records"
122 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
125 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
126 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
129 name = "%s.%s" % ('bogusname', self.get_dns_domain())
130 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
133 self.finish_name_packet(p, questions)
134 response = self.dns_transaction_udp(p)
135 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
137 def test_qtype_all_query(self):
138 "create a QTYPE_ALL query"
139 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
142 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
143 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
144 print "asking for ", q.name
147 self.finish_name_packet(p, questions)
148 response = self.dns_transaction_udp(p)
151 dc_ipv6 = os.getenv('DC_SERVER_IPV6')
152 if dc_ipv6 is not None:
155 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
156 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
157 self.assertEquals(response.ancount, num_answers)
158 self.assertEquals(response.answers[0].rdata,
159 os.getenv('DC_SERVER_IP'))
160 if dc_ipv6 is not None:
161 self.assertEquals(response.answers[1].rdata, dc_ipv6)
163 def test_qclass_none_query(self):
164 "create a QCLASS_NONE query"
165 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
168 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
169 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
172 self.finish_name_packet(p, questions)
173 response = self.dns_transaction_udp(p)
174 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
176 # Only returns an authority section entry in BIND and Win DNS
177 # FIXME: Enable one Samba implements this feature
178 # def test_soa_hostname_query(self):
179 # "create a SOA query for a hostname"
180 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
183 # name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
184 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
185 # questions.append(q)
187 # self.finish_name_packet(p, questions)
188 # response = self.dns_transaction_udp(p)
189 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
190 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
191 # # We don't get SOA records for single hosts
192 # self.assertEquals(response.ancount, 0)
194 def test_soa_domain_query(self):
195 "create a SOA query for a domain"
196 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
199 name = self.get_dns_domain()
200 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
203 self.finish_name_packet(p, questions)
204 response = self.dns_transaction_udp(p)
205 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
206 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
207 self.assertEquals(response.ancount, 1)
209 def test_two_updates(self):
210 "create two update requests"
211 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
214 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
215 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
218 name = self.get_dns_domain()
219 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
222 self.finish_name_packet(p, updates)
223 response = self.dns_transaction_udp(p)
224 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
226 def test_update_wrong_qclass(self):
227 "create update with DNS_QCLASS_NONE"
228 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
231 name = self.get_dns_domain()
232 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
235 self.finish_name_packet(p, updates)
236 response = self.dns_transaction_udp(p)
237 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
239 if __name__ == "__main__":