Move dnspython to third_party.
[samba.git] / third_party / dnspython / dns / dnssec.py
1 # Copyright (C) 2003-2007, 2009, 2011 Nominum, Inc.
2 #
3 # Permission to use, copy, modify, and distribute this software and its
4 # documentation for any purpose with or without fee is hereby granted,
5 # provided that the above copyright notice and this permission notice
6 # appear in all copies.
7 #
8 # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
9 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
11 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
14 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16 """Common DNSSEC-related functions and constants."""
17
18 import cStringIO
19 import struct
20 import time
21
22 import dns.exception
23 import dns.hash
24 import dns.name
25 import dns.node
26 import dns.rdataset
27 import dns.rdata
28 import dns.rdatatype
29 import dns.rdataclass
30
31 class UnsupportedAlgorithm(dns.exception.DNSException):
32     """Raised if an algorithm is not supported."""
33     pass
34
35 class ValidationFailure(dns.exception.DNSException):
36     """The DNSSEC signature is invalid."""
37     pass
38
39 RSAMD5 = 1
40 DH = 2
41 DSA = 3
42 ECC = 4
43 RSASHA1 = 5
44 DSANSEC3SHA1 = 6
45 RSASHA1NSEC3SHA1 = 7
46 RSASHA256 = 8
47 RSASHA512 = 10
48 INDIRECT = 252
49 PRIVATEDNS = 253
50 PRIVATEOID = 254
51
52 _algorithm_by_text = {
53     'RSAMD5' : RSAMD5,
54     'DH' : DH,
55     'DSA' : DSA,
56     'ECC' : ECC,
57     'RSASHA1' : RSASHA1,
58     'DSANSEC3SHA1' : DSANSEC3SHA1,
59     'RSASHA1NSEC3SHA1' : RSASHA1NSEC3SHA1,
60     'RSASHA256' : RSASHA256,
61     'RSASHA512' : RSASHA512,
62     'INDIRECT' : INDIRECT,
63     'PRIVATEDNS' : PRIVATEDNS,
64     'PRIVATEOID' : PRIVATEOID,
65     }
66
67 # We construct the inverse mapping programmatically to ensure that we
68 # cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
69 # would cause the mapping not to be true inverse.
70
71 _algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.iteritems()])
72
73 def algorithm_from_text(text):
74     """Convert text into a DNSSEC algorithm value
75     @rtype: int"""
76
77     value = _algorithm_by_text.get(text.upper())
78     if value is None:
79         value = int(text)
80     return value
81
82 def algorithm_to_text(value):
83     """Convert a DNSSEC algorithm value to text
84     @rtype: string"""
85
86     text = _algorithm_by_value.get(value)
87     if text is None:
88         text = str(value)
89     return text
90
91 def _to_rdata(record, origin):
92     s = cStringIO.StringIO()
93     record.to_wire(s, origin=origin)
94     return s.getvalue()
95
96 def key_id(key, origin=None):
97     rdata = _to_rdata(key, origin)
98     if key.algorithm == RSAMD5:
99         return (ord(rdata[-3]) << 8) + ord(rdata[-2])
100     else:
101         total = 0
102         for i in range(len(rdata) // 2):
103             total += (ord(rdata[2 * i]) << 8) + ord(rdata[2 * i + 1])
104         if len(rdata) % 2 != 0:
105             total += ord(rdata[len(rdata) - 1]) << 8
106         total += ((total >> 16) & 0xffff);
107         return total & 0xffff
108
109 def make_ds(name, key, algorithm, origin=None):
110     if algorithm.upper() == 'SHA1':
111         dsalg = 1
112         hash = dns.hash.get('SHA1')()
113     elif algorithm.upper() == 'SHA256':
114         dsalg = 2
115         hash = dns.hash.get('SHA256')()
116     else:
117         raise UnsupportedAlgorithm, 'unsupported algorithm "%s"' % algorithm
118
119     if isinstance(name, (str, unicode)):
120         name = dns.name.from_text(name, origin)
121     hash.update(name.canonicalize().to_wire())
122     hash.update(_to_rdata(key, origin))
123     digest = hash.digest()
124
125     dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
126     return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
127                                len(dsrdata))
128
129 def _find_key(keys, rrsig):
130     value = keys.get(rrsig.signer)
131     if value is None:
132         return None
133     if isinstance(value, dns.node.Node):
134         try:
135             rdataset = node.find_rdataset(dns.rdataclass.IN,
136                                           dns.rdatatype.DNSKEY)
137         except KeyError:
138             return None
139     else:
140         rdataset = value
141     for rdata in rdataset:
142         if rdata.algorithm == rrsig.algorithm and \
143                key_id(rdata) == rrsig.key_tag:
144             return rdata
145     return None
146
147 def _is_rsa(algorithm):
148     return algorithm in (RSAMD5, RSASHA1,
149                          RSASHA1NSEC3SHA1, RSASHA256,
150                          RSASHA512)
151
152 def _is_dsa(algorithm):
153     return algorithm in (DSA, DSANSEC3SHA1)
154
155 def _is_md5(algorithm):
156     return algorithm == RSAMD5
157
158 def _is_sha1(algorithm):
159     return algorithm in (DSA, RSASHA1,
160                          DSANSEC3SHA1, RSASHA1NSEC3SHA1)
161
162 def _is_sha256(algorithm):
163     return algorithm == RSASHA256
164
165 def _is_sha512(algorithm):
166     return algorithm == RSASHA512
167
168 def _make_hash(algorithm):
169     if _is_md5(algorithm):
170         return dns.hash.get('MD5')()
171     if _is_sha1(algorithm):
172         return dns.hash.get('SHA1')()
173     if _is_sha256(algorithm):
174         return dns.hash.get('SHA256')()
175     if _is_sha512(algorithm):
176         return dns.hash.get('SHA512')()
177     raise ValidationFailure, 'unknown hash for algorithm %u' % algorithm
178
179 def _make_algorithm_id(algorithm):
180     if _is_md5(algorithm):
181         oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05]
182     elif _is_sha1(algorithm):
183         oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a]
184     elif _is_sha256(algorithm):
185         oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01]
186     elif _is_sha512(algorithm):
187         oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
188     else:
189         raise ValidationFailure, 'unknown algorithm %u' % algorithm
190     olen = len(oid)
191     dlen = _make_hash(algorithm).digest_size
192     idbytes = [0x30] + [8 + olen + dlen] + \
193               [0x30, olen + 4] + [0x06, olen] + oid + \
194               [0x05, 0x00] + [0x04, dlen]
195     return ''.join(map(chr, idbytes))
196
197 def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
198     """Validate an RRset against a single signature rdata
199
200     The owner name of the rrsig is assumed to be the same as the owner name
201     of the rrset.
202
203     @param rrset: The RRset to validate
204     @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
205     tuple
206     @param rrsig: The signature rdata
207     @type rrsig: dns.rrset.Rdata
208     @param keys: The key dictionary.
209     @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
210     @param origin: The origin to use for relative names
211     @type origin: dns.name.Name or None
212     @param now: The time to use when validating the signatures.  The default
213     is the current time.
214     @type now: int
215     """
216
217     if isinstance(origin, (str, unicode)):
218         origin = dns.name.from_text(origin, dns.name.root)
219
220     key = _find_key(keys, rrsig)
221     if not key:
222         raise ValidationFailure, 'unknown key'
223
224     # For convenience, allow the rrset to be specified as a (name, rdataset)
225     # tuple as well as a proper rrset
226     if isinstance(rrset, tuple):
227         rrname = rrset[0]
228         rdataset = rrset[1]
229     else:
230         rrname = rrset.name
231         rdataset = rrset
232
233     if now is None:
234         now = time.time()
235     if rrsig.expiration < now:
236         raise ValidationFailure, 'expired'
237     if rrsig.inception > now:
238         raise ValidationFailure, 'not yet valid'
239
240     hash = _make_hash(rrsig.algorithm)
241
242     if _is_rsa(rrsig.algorithm):
243         keyptr = key.key
244         (bytes,) = struct.unpack('!B', keyptr[0:1])
245         keyptr = keyptr[1:]
246         if bytes == 0:
247             (bytes,) = struct.unpack('!H', keyptr[0:2])
248             keyptr = keyptr[2:]
249         rsa_e = keyptr[0:bytes]
250         rsa_n = keyptr[bytes:]
251         keylen = len(rsa_n) * 8
252         pubkey = Crypto.PublicKey.RSA.construct(
253             (Crypto.Util.number.bytes_to_long(rsa_n),
254              Crypto.Util.number.bytes_to_long(rsa_e)))
255         sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
256     elif _is_dsa(rrsig.algorithm):
257         keyptr = key.key
258         (t,) = struct.unpack('!B', keyptr[0:1])
259         keyptr = keyptr[1:]
260         octets = 64 + t * 8
261         dsa_q = keyptr[0:20]
262         keyptr = keyptr[20:]
263         dsa_p = keyptr[0:octets]
264         keyptr = keyptr[octets:]
265         dsa_g = keyptr[0:octets]
266         keyptr = keyptr[octets:]
267         dsa_y = keyptr[0:octets]
268         pubkey = Crypto.PublicKey.DSA.construct(
269             (Crypto.Util.number.bytes_to_long(dsa_y),
270              Crypto.Util.number.bytes_to_long(dsa_g),
271              Crypto.Util.number.bytes_to_long(dsa_p),
272              Crypto.Util.number.bytes_to_long(dsa_q)))
273         (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
274         sig = (Crypto.Util.number.bytes_to_long(dsa_r),
275                Crypto.Util.number.bytes_to_long(dsa_s))
276     else:
277         raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
278
279     hash.update(_to_rdata(rrsig, origin)[:18])
280     hash.update(rrsig.signer.to_digestable(origin))
281
282     if rrsig.labels < len(rrname) - 1:
283         suffix = rrname.split(rrsig.labels + 1)[1]
284         rrname = dns.name.from_text('*', suffix)
285     rrnamebuf = rrname.to_digestable(origin)
286     rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
287                           rrsig.original_ttl)
288     rrlist = sorted(rdataset);
289     for rr in rrlist:
290         hash.update(rrnamebuf)
291         hash.update(rrfixed)
292         rrdata = rr.to_digestable(origin)
293         rrlen = struct.pack('!H', len(rrdata))
294         hash.update(rrlen)
295         hash.update(rrdata)
296
297     digest = hash.digest()
298
299     if _is_rsa(rrsig.algorithm):
300         # PKCS1 algorithm identifier goop
301         digest = _make_algorithm_id(rrsig.algorithm) + digest
302         padlen = keylen // 8 - len(digest) - 3
303         digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
304     elif _is_dsa(rrsig.algorithm):
305         pass
306     else:
307         # Raise here for code clarity; this won't actually ever happen
308         # since if the algorithm is really unknown we'd already have
309         # raised an exception above
310         raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
311
312     if not pubkey.verify(digest, sig):
313         raise ValidationFailure, 'verify failure'
314
315 def _validate(rrset, rrsigset, keys, origin=None, now=None):
316     """Validate an RRset
317
318     @param rrset: The RRset to validate
319     @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
320     tuple
321     @param rrsigset: The signature RRset
322     @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
323     tuple
324     @param keys: The key dictionary.
325     @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
326     @param origin: The origin to use for relative names
327     @type origin: dns.name.Name or None
328     @param now: The time to use when validating the signatures.  The default
329     is the current time.
330     @type now: int
331     """
332
333     if isinstance(origin, (str, unicode)):
334         origin = dns.name.from_text(origin, dns.name.root)
335
336     if isinstance(rrset, tuple):
337         rrname = rrset[0]
338     else:
339         rrname = rrset.name
340
341     if isinstance(rrsigset, tuple):
342         rrsigname = rrsigset[0]
343         rrsigrdataset = rrsigset[1]
344     else:
345         rrsigname = rrsigset.name
346         rrsigrdataset = rrsigset
347
348     rrname = rrname.choose_relativity(origin)
349     rrsigname = rrname.choose_relativity(origin)
350     if rrname != rrsigname:
351         raise ValidationFailure, "owner names do not match"
352
353     for rrsig in rrsigrdataset:
354         try:
355             _validate_rrsig(rrset, rrsig, keys, origin, now)
356             return
357         except ValidationFailure, e:
358             pass
359     raise ValidationFailure, "no RRSIGs validated"
360
361 def _need_pycrypto(*args, **kwargs):
362     raise NotImplementedError, "DNSSEC validation requires pycrypto"
363
364 try:
365     import Crypto.PublicKey.RSA
366     import Crypto.PublicKey.DSA
367     import Crypto.Util.number
368     validate = _validate
369     validate_rrsig = _validate_rrsig
370 except ImportError:
371     validate = _need_pycrypto
372     validate_rrsig = _need_pycrypto