PEP8: fix E305: expected 2 blank lines after class or function definition, found 1
[samba.git] / python / samba / tests / dns_tkey.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 # Copyright (C) Ralph Boehme  <slow@samba.org> 2016
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import sys
20 import struct
21 import random
22 import socket
23 import optparse
24 import uuid
25 import time
26 import samba.ndr as ndr
27 import samba.getopt as options
28 from samba import credentials
29 from samba.dcerpc import dns, dnsp
30 from samba.tests.subunitrun import SubunitOptions, TestProgram
31 from samba.tests.dns_base import DNSTKeyTest
32
33 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
34 sambaopts = options.SambaOptions(parser)
35 parser.add_option_group(sambaopts)
36
37 # This timeout only has relevance when testing against Windows
38 # Format errors tend to return patchy responses, so a timeout is needed.
39 parser.add_option("--timeout", type="int", dest="timeout",
40                   help="Specify timeout for DNS requests")
41
42 # use command line creds if available
43 credopts = options.CredentialsOptions(parser)
44 parser.add_option_group(credopts)
45 subunitopts = SubunitOptions(parser)
46 parser.add_option_group(subunitopts)
47
48 opts, args = parser.parse_args()
49 timeout = opts.timeout
50
51 if len(args) < 2:
52     parser.print_usage()
53     sys.exit(1)
54
55 server_name = args[0]
56 server_ip = args[1]
57
58
59 class TestDNSUpdates(DNSTKeyTest):
60     def setUp(self):
61         self.server = server_name
62         self.server_ip = server_ip
63         super(TestDNSUpdates, self).setUp()
64
65     def test_tkey(self):
66         "test DNS TKEY handshake"
67
68         self.tkey_trans()
69
70     def test_update_wo_tsig(self):
71         "test DNS update without TSIG record"
72
73         p = self.make_update_request()
74         (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
75         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_REFUSED)
76
77         rcode = self.search_record(self.newrecname)
78         self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
79
80     def test_update_tsig_bad_keyname(self):
81         "test DNS update with a TSIG record with a bad keyname"
82
83         self.tkey_trans()
84
85         p = self.make_update_request()
86         self.sign_packet(p, "badkey")
87         (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
88         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTAUTH)
89         tsig_record = response.additional[0].rdata
90         self.assertEquals(tsig_record.error, dns.DNS_RCODE_BADKEY)
91         self.assertEquals(tsig_record.mac_size, 0)
92
93         rcode = self.search_record(self.newrecname)
94         self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
95
96     def test_update_tsig_bad_mac(self):
97         "test DNS update with a TSIG record with a bad MAC"
98
99         self.tkey_trans()
100
101         p = self.make_update_request()
102         self.bad_sign_packet(p, self.key_name)
103         (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
104         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTAUTH)
105         tsig_record = response.additional[0].rdata
106         self.assertEquals(tsig_record.error, dns.DNS_RCODE_BADSIG)
107         self.assertEquals(tsig_record.mac_size, 0)
108
109         rcode = self.search_record(self.newrecname)
110         self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
111
112     def test_update_tsig(self):
113         "test DNS update with correct TSIG record"
114
115         self.tkey_trans()
116
117         p = self.make_update_request()
118         mac = self.sign_packet(p, self.key_name)
119         (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
120         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
121         self.verify_packet(response, response_p, mac)
122
123         # Check the record is around
124         rcode = self.search_record(self.newrecname)
125         self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
126
127         # Now delete the record
128         p = self.make_update_request(delete=True)
129         mac = self.sign_packet(p, self.key_name)
130         (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
131         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
132         self.verify_packet(response, response_p, mac)
133
134         # check it's gone
135         rcode = self.search_record(self.newrecname)
136         self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
137
138     def test_update_tsig_windows(self):
139         "test DNS update with correct TSIG record (follow Windows pattern)"
140
141         newrecname = "win" + self.newrecname
142         rr_class = dns.DNS_QCLASS_IN
143         ttl = 1200
144
145         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
146         q = self.make_name_question(self.get_dns_domain(),
147                                     dns.DNS_QTYPE_SOA,
148                                     dns.DNS_QCLASS_IN)
149         questions = []
150         questions.append(q)
151         self.finish_name_packet(p, questions)
152
153         updates = []
154         r = dns.res_rec()
155         r.name = newrecname
156         r.rr_type = dns.DNS_QTYPE_A
157         r.rr_class = dns.DNS_QCLASS_ANY
158         r.ttl = 0
159         r.length = 0
160         updates.append(r)
161         r = dns.res_rec()
162         r.name = newrecname
163         r.rr_type = dns.DNS_QTYPE_AAAA
164         r.rr_class = dns.DNS_QCLASS_ANY
165         r.ttl = 0
166         r.length = 0
167         updates.append(r)
168         r = dns.res_rec()
169         r.name = newrecname
170         r.rr_type = dns.DNS_QTYPE_A
171         r.rr_class = rr_class
172         r.ttl = ttl
173         r.length = 0xffff
174         r.rdata = "10.1.45.64"
175         updates.append(r)
176         p.nscount = len(updates)
177         p.nsrecs = updates
178
179         prereqs = []
180         r = dns.res_rec()
181         r.name = newrecname
182         r.rr_type = dns.DNS_QTYPE_CNAME
183         r.rr_class = dns.DNS_QCLASS_NONE
184         r.ttl = 0
185         r.length = 0
186         prereqs.append(r)
187         p.ancount = len(prereqs)
188         p.answers = prereqs
189
190         (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
191         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_REFUSED)
192
193         self.tkey_trans()
194         mac = self.sign_packet(p, self.key_name)
195         (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
196         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
197         self.verify_packet(response, response_p, mac)
198
199         # Check the record is around
200         rcode = self.search_record(newrecname)
201         self.assert_rcode_equals(rcode, dns.DNS_RCODE_OK)
202
203         # Now delete the record
204         p = self.make_update_request(delete=True)
205         mac = self.sign_packet(p, self.key_name)
206         (response, response_p) = self.dns_transaction_udp(p, self.server_ip)
207         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
208         self.verify_packet(response, response_p, mac)
209
210         # check it's gone
211         rcode = self.search_record(self.newrecname)
212         self.assert_rcode_equals(rcode, dns.DNS_RCODE_NXDOMAIN)
213
214
215 TestProgram(module=__name__, opts=subunitopts)