python tests: fix format() strings for Python 2.6
[samba.git] / python / samba / tests / dns_wildcard.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett 2007
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import sys
19 from samba import credentials
20 from samba.dcerpc import dns, dnsserver
21 from samba.netcmd.dns import data_to_dns_record
22 from samba.tests.subunitrun import SubunitOptions, TestProgram
23 from samba import werror, WERRORError
24 from samba.tests.dns_base import DNSTest
25 import samba.getopt as options
26 import optparse
27
28 parser = optparse.OptionParser(
29     "dns_wildcard.py <server name> <server ip> [options]")
30 sambaopts = options.SambaOptions(parser)
31 parser.add_option_group(sambaopts)
32
33 # This timeout only has relevance when testing against Windows
34 # Format errors tend to return patchy responses, so a timeout is needed.
35 parser.add_option("--timeout", type="int", dest="timeout",
36                   help="Specify timeout for DNS requests")
37
38 # To run against Windows
39 # python python/samba/tests/dns_wildcard.py computer_name ip
40 #                                  -U"Administrator%admin_password"
41 #                                  --realm=Domain_name
42 #                                  --timeout 10
43 #
44
45 # use command line creds if available
46 credopts = options.CredentialsOptions(parser)
47 parser.add_option_group(credopts)
48 subunitopts = SubunitOptions(parser)
49 parser.add_option_group(subunitopts)
50
51 opts, args = parser.parse_args()
52
53 lp = sambaopts.get_loadparm()
54 creds = credopts.get_credentials(lp)
55
56 timeout = opts.timeout
57
58 if len(args) < 2:
59     parser.print_usage()
60     sys.exit(1)
61
62 server_name = args[0]
63 server_ip = args[1]
64 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
65
66 WILDCARD_IP        = "1.1.1.1"
67 WILDCARD           = "*.wildcardtest"
68 EXACT_IP           = "1.1.1.2"
69 EXACT              = "exact.wildcardtest"
70 LEVEL2_WILDCARD_IP = "1.1.1.3"
71 LEVEL2_WILDCARD    = "*.level2.wildcardtest"
72 LEVEL2_EXACT_IP    = "1.1.1.4"
73 LEVEL2_EXACT       = "exact.level2.wildcardtest"
74
75
76 class TestWildCardQueries(DNSTest):
77
78     def setUp(self):
79         super(TestWildCardQueries, self).setUp()
80         global server, server_ip, lp, creds, timeout
81         self.server = server_name
82         self.server_ip = server_ip
83         self.lp = lp
84         self.creds = creds
85         self.timeout = timeout
86
87         # Create the dns records
88         self.dns_records = [(dns.DNS_QTYPE_A,
89                              "%s.%s" % (WILDCARD, self.get_dns_domain()),
90                              WILDCARD_IP),
91                             (dns.DNS_QTYPE_A,
92                              "%s.%s" % (EXACT, self.get_dns_domain()),
93                              EXACT_IP),
94                             (dns.DNS_QTYPE_A,
95                              ("%s.%s" % (
96                                  LEVEL2_WILDCARD,
97                                  self.get_dns_domain())),
98                              LEVEL2_WILDCARD_IP),
99                             (dns.DNS_QTYPE_A,
100                              ("%s.%s" % (
101                                  LEVEL2_EXACT,
102                                  self.get_dns_domain())),
103                              LEVEL2_EXACT_IP)]
104
105         c = self.dns_connect()
106         for (typ, name, data) in self.dns_records:
107             self.add_record(c, typ, name, data)
108
109     def tearDown(self):
110         c = self.dns_connect()
111         for (typ, name, data) in self.dns_records:
112             self.delete_record(c, typ, name, data)
113
114     def dns_connect(self):
115         binding_str = "ncacn_ip_tcp:%s[sign]" % self.server_ip
116         return dnsserver.dnsserver(binding_str, self.lp, self.creds)
117
118     def delete_record(self, dns_conn, typ, name, data):
119
120         rec = data_to_dns_record(typ, data)
121         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
122         del_rec_buf.rec = rec
123
124         try:
125             dns_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
126                                          0,
127                                          self.server,
128                                          self.get_dns_domain(),
129                                          name,
130                                          None,
131                                          del_rec_buf)
132         except WERRORError as e:
133             # Ignore record does not exist errors
134             if e.args[0] != werror.WERR_DNS_ERROR_NAME_DOES_NOT_EXIST:
135                 raise e
136
137     def add_record(self, dns_conn, typ, name, data):
138
139         rec = data_to_dns_record(typ, data)
140         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
141         add_rec_buf.rec = rec
142         try:
143             dns_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
144                                          0,
145                                          self.server,
146                                          self.get_dns_domain(),
147                                          name,
148                                          add_rec_buf,
149                                          None)
150         except WERRORError as e:
151             raise e
152
153     def test_one_a_query_match_wildcard(self):
154         "Query an A record, should match the wildcard entry"
155
156         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
157         questions = []
158
159         # Check the record
160         name = "miss.wildcardtest.%s" % self.get_dns_domain()
161         q = self.make_name_question(name,
162                                     dns.DNS_QTYPE_A,
163                                     dns.DNS_QCLASS_IN)
164         questions.append(q)
165
166         self.finish_name_packet(p, questions)
167         (response, response_packet) =\
168             self.dns_transaction_udp(p, host=self.server_ip)
169         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
170         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
171         self.assertEquals(response.ancount, 1)
172         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_A)
173         self.assertEquals(response.answers[0].rdata, WILDCARD_IP)
174
175     def test_one_a_query_match_wildcard_2_labels(self):
176         """ Query an A record, should match the wild card entry
177             have two labels to the left of the wild card target.
178         """
179
180         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
181         questions = []
182
183         # Check the record
184         name = "label2.label1.wildcardtest.%s" % self.get_dns_domain()
185         q = self.make_name_question(name,
186                                     dns.DNS_QTYPE_A,
187                                     dns.DNS_QCLASS_IN)
188         questions.append(q)
189
190         self.finish_name_packet(p, questions)
191         (response, response_packet) =\
192             self.dns_transaction_udp(p, host=self.server_ip)
193         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
194         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
195         self.assertEquals(response.ancount, 1)
196         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_A)
197         self.assertEquals(response.answers[0].rdata, WILDCARD_IP)
198
199     def test_one_a_query_wildcard_entry(self):
200         "Query the wildcard entry"
201
202         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
203         questions = []
204
205         # Check the record
206         name = "%s.%s" % (WILDCARD, self.get_dns_domain())
207         q = self.make_name_question(name,
208                                     dns.DNS_QTYPE_A,
209                                     dns.DNS_QCLASS_IN)
210         questions.append(q)
211
212         self.finish_name_packet(p, questions)
213         (response, response_packet) =\
214             self.dns_transaction_udp(p, host=self.server_ip)
215         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
216         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
217         self.assertEquals(response.ancount, 1)
218         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_A)
219         self.assertEquals(response.answers[0].rdata, WILDCARD_IP)
220
221     def test_one_a_query_exact_match(self):
222         """Query an entry that matches the wild card but has an exact match as
223          well.
224          """
225         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
226         questions = []
227
228         # Check the record
229         name = "%s.%s" % (EXACT, self.get_dns_domain())
230         q = self.make_name_question(name,
231                                     dns.DNS_QTYPE_A,
232                                     dns.DNS_QCLASS_IN)
233         questions.append(q)
234
235         self.finish_name_packet(p, questions)
236         (response, response_packet) =\
237             self.dns_transaction_udp(p, host=self.server_ip)
238         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
239         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
240         self.assertEquals(response.ancount, 1)
241         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_A)
242         self.assertEquals(response.answers[0].rdata, EXACT_IP)
243
244     def test_one_a_query_match_wildcard_l2(self):
245         "Query an A record, should match the level 2 wildcard entry"
246
247         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
248         questions = []
249
250         # Check the record
251         name = "miss.level2.wildcardtest.%s" % self.get_dns_domain()
252         q = self.make_name_question(name,
253                                     dns.DNS_QTYPE_A,
254                                     dns.DNS_QCLASS_IN)
255         questions.append(q)
256
257         self.finish_name_packet(p, questions)
258         (response, response_packet) =\
259             self.dns_transaction_udp(p, host=self.server_ip)
260         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262         self.assertEquals(response.ancount, 1)
263         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_A)
264         self.assertEquals(response.answers[0].rdata, LEVEL2_WILDCARD_IP)
265
266     def test_one_a_query_match_wildcard_l2_2_labels(self):
267         """Query an A record, should match the level 2 wild card entry
268            have two labels to the left of the wild card target
269         """
270
271         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
272         questions = []
273
274         # Check the record
275         name = "label1.label2.level2.wildcardtest.%s" % self.get_dns_domain()
276         q = self.make_name_question(name,
277                                     dns.DNS_QTYPE_A,
278                                     dns.DNS_QCLASS_IN)
279         questions.append(q)
280
281         self.finish_name_packet(p, questions)
282         (response, response_packet) =\
283             self.dns_transaction_udp(p, host=self.server_ip)
284         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
285         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
286         self.assertEquals(response.ancount, 1)
287         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_A)
288         self.assertEquals(response.answers[0].rdata, LEVEL2_WILDCARD_IP)
289
290     def test_one_a_query_exact_match_l2(self):
291         """Query an entry that matches the wild card but has an exact match as
292          well.
293          """
294         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
295         questions = []
296
297         # Check the record
298         name = "%s.%s" % (LEVEL2_EXACT, self.get_dns_domain())
299         q = self.make_name_question(name,
300                                     dns.DNS_QTYPE_A,
301                                     dns.DNS_QCLASS_IN)
302         questions.append(q)
303
304         self.finish_name_packet(p, questions)
305         (response, response_packet) =\
306             self.dns_transaction_udp(p, host=self.server_ip)
307         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
308         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
309         self.assertEquals(response.ancount, 1)
310         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_A)
311         self.assertEquals(response.answers[0].rdata, LEVEL2_EXACT_IP)
312
313     def test_one_a_query_wildcard_entry_l2(self):
314         "Query the level 2 wildcard entry"
315
316         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
317         questions = []
318
319         # Check the record
320         name = "%s.%s" % (LEVEL2_WILDCARD, self.get_dns_domain())
321         q = self.make_name_question(name,
322                                     dns.DNS_QTYPE_A,
323                                     dns.DNS_QCLASS_IN)
324         questions.append(q)
325
326         self.finish_name_packet(p, questions)
327         (response, response_packet) =\
328             self.dns_transaction_udp(p, host=self.server_ip)
329         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
330         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
331         self.assertEquals(response.ancount, 1)
332         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_A)
333         self.assertEquals(response.answers[0].rdata, LEVEL2_WILDCARD_IP)
334
335
336 TestProgram(module=__name__, opts=subunitopts)