1 # Copyright (C) 2003-2007, 2009, 2011 Nominum, Inc.
3 # Permission to use, copy, modify, and distribute this software and its
4 # documentation for any purpose with or without fee is hereby granted,
5 # provided that the above copyright notice and this permission notice
6 # appear in all copies.
8 # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
9 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
11 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
14 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16 """Common DNSSEC-related functions and constants."""
31 class UnsupportedAlgorithm(dns.exception.DNSException):
32 """Raised if an algorithm is not supported."""
35 class ValidationFailure(dns.exception.DNSException):
36 """The DNSSEC signature is invalid."""
52 _algorithm_by_text = {
58 'DSANSEC3SHA1' : DSANSEC3SHA1,
59 'RSASHA1NSEC3SHA1' : RSASHA1NSEC3SHA1,
60 'RSASHA256' : RSASHA256,
61 'RSASHA512' : RSASHA512,
62 'INDIRECT' : INDIRECT,
63 'PRIVATEDNS' : PRIVATEDNS,
64 'PRIVATEOID' : PRIVATEOID,
67 # We construct the inverse mapping programmatically to ensure that we
68 # cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
69 # would cause the mapping not to be true inverse.
71 _algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.iteritems()])
73 def algorithm_from_text(text):
74 """Convert text into a DNSSEC algorithm value
77 value = _algorithm_by_text.get(text.upper())
82 def algorithm_to_text(value):
83 """Convert a DNSSEC algorithm value to text
86 text = _algorithm_by_value.get(value)
91 def _to_rdata(record, origin):
92 s = cStringIO.StringIO()
93 record.to_wire(s, origin=origin)
96 def key_id(key, origin=None):
97 rdata = _to_rdata(key, origin)
98 if key.algorithm == RSAMD5:
99 return (ord(rdata[-3]) << 8) + ord(rdata[-2])
102 for i in range(len(rdata) // 2):
103 total += (ord(rdata[2 * i]) << 8) + ord(rdata[2 * i + 1])
104 if len(rdata) % 2 != 0:
105 total += ord(rdata[len(rdata) - 1]) << 8
106 total += ((total >> 16) & 0xffff);
107 return total & 0xffff
109 def make_ds(name, key, algorithm, origin=None):
110 if algorithm.upper() == 'SHA1':
112 hash = dns.hash.get('SHA1')()
113 elif algorithm.upper() == 'SHA256':
115 hash = dns.hash.get('SHA256')()
117 raise UnsupportedAlgorithm, 'unsupported algorithm "%s"' % algorithm
119 if isinstance(name, (str, unicode)):
120 name = dns.name.from_text(name, origin)
121 hash.update(name.canonicalize().to_wire())
122 hash.update(_to_rdata(key, origin))
123 digest = hash.digest()
125 dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
126 return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
129 def _find_key(keys, rrsig):
130 value = keys.get(rrsig.signer)
133 if isinstance(value, dns.node.Node):
135 rdataset = node.find_rdataset(dns.rdataclass.IN,
136 dns.rdatatype.DNSKEY)
141 for rdata in rdataset:
142 if rdata.algorithm == rrsig.algorithm and \
143 key_id(rdata) == rrsig.key_tag:
147 def _is_rsa(algorithm):
148 return algorithm in (RSAMD5, RSASHA1,
149 RSASHA1NSEC3SHA1, RSASHA256,
152 def _is_dsa(algorithm):
153 return algorithm in (DSA, DSANSEC3SHA1)
155 def _is_md5(algorithm):
156 return algorithm == RSAMD5
158 def _is_sha1(algorithm):
159 return algorithm in (DSA, RSASHA1,
160 DSANSEC3SHA1, RSASHA1NSEC3SHA1)
162 def _is_sha256(algorithm):
163 return algorithm == RSASHA256
165 def _is_sha512(algorithm):
166 return algorithm == RSASHA512
168 def _make_hash(algorithm):
169 if _is_md5(algorithm):
170 return dns.hash.get('MD5')()
171 if _is_sha1(algorithm):
172 return dns.hash.get('SHA1')()
173 if _is_sha256(algorithm):
174 return dns.hash.get('SHA256')()
175 if _is_sha512(algorithm):
176 return dns.hash.get('SHA512')()
177 raise ValidationFailure, 'unknown hash for algorithm %u' % algorithm
179 def _make_algorithm_id(algorithm):
180 if _is_md5(algorithm):
181 oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05]
182 elif _is_sha1(algorithm):
183 oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a]
184 elif _is_sha256(algorithm):
185 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01]
186 elif _is_sha512(algorithm):
187 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
189 raise ValidationFailure, 'unknown algorithm %u' % algorithm
191 dlen = _make_hash(algorithm).digest_size
192 idbytes = [0x30] + [8 + olen + dlen] + \
193 [0x30, olen + 4] + [0x06, olen] + oid + \
194 [0x05, 0x00] + [0x04, dlen]
195 return ''.join(map(chr, idbytes))
197 def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
198 """Validate an RRset against a single signature rdata
200 The owner name of the rrsig is assumed to be the same as the owner name
203 @param rrset: The RRset to validate
204 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
206 @param rrsig: The signature rdata
207 @type rrsig: dns.rrset.Rdata
208 @param keys: The key dictionary.
209 @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
210 @param origin: The origin to use for relative names
211 @type origin: dns.name.Name or None
212 @param now: The time to use when validating the signatures. The default
217 if isinstance(origin, (str, unicode)):
218 origin = dns.name.from_text(origin, dns.name.root)
220 key = _find_key(keys, rrsig)
222 raise ValidationFailure, 'unknown key'
224 # For convenience, allow the rrset to be specified as a (name, rdataset)
225 # tuple as well as a proper rrset
226 if isinstance(rrset, tuple):
235 if rrsig.expiration < now:
236 raise ValidationFailure, 'expired'
237 if rrsig.inception > now:
238 raise ValidationFailure, 'not yet valid'
240 hash = _make_hash(rrsig.algorithm)
242 if _is_rsa(rrsig.algorithm):
244 (bytes,) = struct.unpack('!B', keyptr[0:1])
247 (bytes,) = struct.unpack('!H', keyptr[0:2])
249 rsa_e = keyptr[0:bytes]
250 rsa_n = keyptr[bytes:]
251 keylen = len(rsa_n) * 8
252 pubkey = Crypto.PublicKey.RSA.construct(
253 (Crypto.Util.number.bytes_to_long(rsa_n),
254 Crypto.Util.number.bytes_to_long(rsa_e)))
255 sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
256 elif _is_dsa(rrsig.algorithm):
258 (t,) = struct.unpack('!B', keyptr[0:1])
263 dsa_p = keyptr[0:octets]
264 keyptr = keyptr[octets:]
265 dsa_g = keyptr[0:octets]
266 keyptr = keyptr[octets:]
267 dsa_y = keyptr[0:octets]
268 pubkey = Crypto.PublicKey.DSA.construct(
269 (Crypto.Util.number.bytes_to_long(dsa_y),
270 Crypto.Util.number.bytes_to_long(dsa_g),
271 Crypto.Util.number.bytes_to_long(dsa_p),
272 Crypto.Util.number.bytes_to_long(dsa_q)))
273 (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
274 sig = (Crypto.Util.number.bytes_to_long(dsa_r),
275 Crypto.Util.number.bytes_to_long(dsa_s))
277 raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
279 hash.update(_to_rdata(rrsig, origin)[:18])
280 hash.update(rrsig.signer.to_digestable(origin))
282 if rrsig.labels < len(rrname) - 1:
283 suffix = rrname.split(rrsig.labels + 1)[1]
284 rrname = dns.name.from_text('*', suffix)
285 rrnamebuf = rrname.to_digestable(origin)
286 rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
288 rrlist = sorted(rdataset);
290 hash.update(rrnamebuf)
292 rrdata = rr.to_digestable(origin)
293 rrlen = struct.pack('!H', len(rrdata))
297 digest = hash.digest()
299 if _is_rsa(rrsig.algorithm):
300 # PKCS1 algorithm identifier goop
301 digest = _make_algorithm_id(rrsig.algorithm) + digest
302 padlen = keylen // 8 - len(digest) - 3
303 digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
304 elif _is_dsa(rrsig.algorithm):
307 # Raise here for code clarity; this won't actually ever happen
308 # since if the algorithm is really unknown we'd already have
309 # raised an exception above
310 raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
312 if not pubkey.verify(digest, sig):
313 raise ValidationFailure, 'verify failure'
315 def _validate(rrset, rrsigset, keys, origin=None, now=None):
318 @param rrset: The RRset to validate
319 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
321 @param rrsigset: The signature RRset
322 @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
324 @param keys: The key dictionary.
325 @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
326 @param origin: The origin to use for relative names
327 @type origin: dns.name.Name or None
328 @param now: The time to use when validating the signatures. The default
333 if isinstance(origin, (str, unicode)):
334 origin = dns.name.from_text(origin, dns.name.root)
336 if isinstance(rrset, tuple):
341 if isinstance(rrsigset, tuple):
342 rrsigname = rrsigset[0]
343 rrsigrdataset = rrsigset[1]
345 rrsigname = rrsigset.name
346 rrsigrdataset = rrsigset
348 rrname = rrname.choose_relativity(origin)
349 rrsigname = rrname.choose_relativity(origin)
350 if rrname != rrsigname:
351 raise ValidationFailure, "owner names do not match"
353 for rrsig in rrsigrdataset:
355 _validate_rrsig(rrset, rrsig, keys, origin, now)
357 except ValidationFailure, e:
359 raise ValidationFailure, "no RRSIGs validated"
361 def _need_pycrypto(*args, **kwargs):
362 raise NotImplementedError, "DNSSEC validation requires pycrypto"
365 import Crypto.PublicKey.RSA
366 import Crypto.PublicKey.DSA
367 import Crypto.Util.number
369 validate_rrsig = _validate_rrsig
371 validate = _need_pycrypto
372 validate_rrsig = _need_pycrypto