3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin <kai@samba.org> 2011
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba import socket
25 import samba.ndr as ndr
26 import samba.dcerpc.dns as dns
27 from samba.tests import TestCase
29 class DNSTest(TestCase):
31 def assert_dns_rcode_equals(self, packet, rcode):
32 "Helper function to check return code"
33 p_errcode = packet.operation & 0x000F
34 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
37 def assert_dns_opcode_equals(self, packet, opcode):
38 "Helper function to check opcode"
39 p_opcode = packet.operation & 0x7800
40 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
43 def make_name_packet(self, opcode, qid=None):
44 "Helper creating a dns.name_packet"
47 p.id = random.randint(0x0, 0xffff)
52 def finish_name_packet(self, packet, questions):
53 "Helper to finalize a dns.name_packet"
54 packet.qdcount = len(questions)
55 packet.questions = questions
57 def make_name_question(self, name, qtype, qclass):
58 "Helper creating a dns.name_question"
59 q = dns.name_question()
61 q.question_type = qtype
62 q.question_class = qclass
65 def get_dns_domain(self):
66 "Helper to get dns domain"
67 return os.getenv('REALM', 'example.com').lower()
69 def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
70 "send a DNS query and read the reply"
73 send_packet = ndr.ndr_pack(packet)
74 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
76 s.send(send_packet, 0)
77 recv_packet = s.recv(2048, 0)
78 return ndr.ndr_unpack(dns.name_packet, recv_packet)
83 def test_one_a_query(self):
84 "create a query packet containing one query record"
85 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
88 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
89 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
90 print "asking for ", q.name
93 self.finish_name_packet(p, questions)
94 response = self.dns_transaction_udp(p)
95 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97 self.assertEquals(response.ancount, 1)
98 self.assertEquals(response.answers[0].rdata,
99 os.getenv('DC_SERVER_IP'))
101 def test_two_queries(self):
102 "create a query packet containing two query records"
103 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
106 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
107 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
110 name = "%s.%s" % ('bogusname', self.get_dns_domain())
111 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
114 self.finish_name_packet(p, questions)
115 response = self.dns_transaction_udp(p)
116 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
118 def test_qtype_all_query(self):
119 "create a QTYPE_ALL query"
120 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
123 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
124 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
125 print "asking for ", q.name
128 self.finish_name_packet(p, questions)
129 response = self.dns_transaction_udp(p)
132 dc_ipv6 = os.getenv('DC_SERVER_IPV6')
133 if dc_ipv6 is not None:
136 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
137 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
138 self.assertEquals(response.ancount, num_answers)
139 self.assertEquals(response.answers[0].rdata,
140 os.getenv('DC_SERVER_IP'))
141 if dc_ipv6 is not None:
142 self.assertEquals(response.answers[1].rdata, dc_ipv6)
144 def test_qclass_none_query(self):
145 "create a QCLASS_NONE query"
146 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
149 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
150 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
153 self.finish_name_packet(p, questions)
154 response = self.dns_transaction_udp(p)
155 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
157 # Only returns an authority section entry in BIND and Win DNS
158 # FIXME: Enable one Samba implements this feature
159 # def test_soa_hostname_query(self):
160 # "create a SOA query for a hostname"
161 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
164 # name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
165 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
166 # questions.append(q)
168 # self.finish_name_packet(p, questions)
169 # response = self.dns_transaction_udp(p)
170 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
171 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
172 # # We don't get SOA records for single hosts
173 # self.assertEquals(response.ancount, 0)
175 def test_soa_domain_query(self):
176 "create a SOA query for a domain"
177 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
180 name = self.get_dns_domain()
181 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
184 self.finish_name_packet(p, questions)
185 response = self.dns_transaction_udp(p)
186 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
187 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
188 self.assertEquals(response.ancount, 1)
190 def test_two_updates(self):
191 "create two update requests"
192 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
195 name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
196 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
199 name = self.get_dns_domain()
200 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
203 self.finish_name_packet(p, updates)
204 response = self.dns_transaction_udp(p)
205 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
208 if __name__ == "__main__":