Move dnspython to third_party.
[samba.git] / third_party / dnspython / dns / rdataset.py
1 # Copyright (C) 2001-2007, 2009-2011 Nominum, Inc.
2 #
3 # Permission to use, copy, modify, and distribute this software and its
4 # documentation for any purpose with or without fee is hereby granted,
5 # provided that the above copyright notice and this permission notice
6 # appear in all copies.
7 #
8 # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
9 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
11 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
14 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16 """DNS rdatasets (an rdataset is a set of rdatas of a given type and class)"""
17
18 import random
19 import StringIO
20 import struct
21
22 import dns.exception
23 import dns.rdatatype
24 import dns.rdataclass
25 import dns.rdata
26 import dns.set
27
28 # define SimpleSet here for backwards compatibility
29 SimpleSet = dns.set.Set
30
31 class DifferingCovers(dns.exception.DNSException):
32     """Raised if an attempt is made to add a SIG/RRSIG whose covered type
33     is not the same as that of the other rdatas in the rdataset."""
34     pass
35
36 class IncompatibleTypes(dns.exception.DNSException):
37     """Raised if an attempt is made to add rdata of an incompatible type."""
38     pass
39
40 class Rdataset(dns.set.Set):
41     """A DNS rdataset.
42
43     @ivar rdclass: The class of the rdataset
44     @type rdclass: int
45     @ivar rdtype: The type of the rdataset
46     @type rdtype: int
47     @ivar covers: The covered type.  Usually this value is
48     dns.rdatatype.NONE, but if the rdtype is dns.rdatatype.SIG or
49     dns.rdatatype.RRSIG, then the covers value will be the rdata
50     type the SIG/RRSIG covers.  The library treats the SIG and RRSIG
51     types as if they were a family of
52     types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).  This makes RRSIGs much
53     easier to work with than if RRSIGs covering different rdata
54     types were aggregated into a single RRSIG rdataset.
55     @type covers: int
56     @ivar ttl: The DNS TTL (Time To Live) value
57     @type ttl: int
58     """
59
60     __slots__ = ['rdclass', 'rdtype', 'covers', 'ttl']
61
62     def __init__(self, rdclass, rdtype, covers=dns.rdatatype.NONE):
63         """Create a new rdataset of the specified class and type.
64
65         @see: the description of the class instance variables for the
66         meaning of I{rdclass} and I{rdtype}"""
67
68         super(Rdataset, self).__init__()
69         self.rdclass = rdclass
70         self.rdtype = rdtype
71         self.covers = covers
72         self.ttl = 0
73
74     def _clone(self):
75         obj = super(Rdataset, self)._clone()
76         obj.rdclass = self.rdclass
77         obj.rdtype = self.rdtype
78         obj.covers = self.covers
79         obj.ttl = self.ttl
80         return obj
81
82     def update_ttl(self, ttl):
83         """Set the TTL of the rdataset to be the lesser of the set's current
84         TTL or the specified TTL.  If the set contains no rdatas, set the TTL
85         to the specified TTL.
86         @param ttl: The TTL
87         @type ttl: int"""
88
89         if len(self) == 0:
90             self.ttl = ttl
91         elif ttl < self.ttl:
92             self.ttl = ttl
93
94     def add(self, rd, ttl=None):
95         """Add the specified rdata to the rdataset.
96
97         If the optional I{ttl} parameter is supplied, then
98         self.update_ttl(ttl) will be called prior to adding the rdata.
99
100         @param rd: The rdata
101         @type rd: dns.rdata.Rdata object
102         @param ttl: The TTL
103         @type ttl: int"""
104
105         #
106         # If we're adding a signature, do some special handling to
107         # check that the signature covers the same type as the
108         # other rdatas in this rdataset.  If this is the first rdata
109         # in the set, initialize the covers field.
110         #
111         if self.rdclass != rd.rdclass or self.rdtype != rd.rdtype:
112             raise IncompatibleTypes
113         if not ttl is None:
114             self.update_ttl(ttl)
115         if self.rdtype == dns.rdatatype.RRSIG or \
116            self.rdtype == dns.rdatatype.SIG:
117             covers = rd.covers()
118             if len(self) == 0 and self.covers == dns.rdatatype.NONE:
119                 self.covers = covers
120             elif self.covers != covers:
121                 raise DifferingCovers
122         if dns.rdatatype.is_singleton(rd.rdtype) and len(self) > 0:
123             self.clear()
124         super(Rdataset, self).add(rd)
125
126     def union_update(self, other):
127         self.update_ttl(other.ttl)
128         super(Rdataset, self).union_update(other)
129
130     def intersection_update(self, other):
131         self.update_ttl(other.ttl)
132         super(Rdataset, self).intersection_update(other)
133
134     def update(self, other):
135         """Add all rdatas in other to self.
136
137         @param other: The rdataset from which to update
138         @type other: dns.rdataset.Rdataset object"""
139
140         self.update_ttl(other.ttl)
141         super(Rdataset, self).update(other)
142
143     def __repr__(self):
144         if self.covers == 0:
145             ctext = ''
146         else:
147             ctext = '(' + dns.rdatatype.to_text(self.covers) + ')'
148         return '<DNS ' + dns.rdataclass.to_text(self.rdclass) + ' ' + \
149                dns.rdatatype.to_text(self.rdtype) + ctext + ' rdataset>'
150
151     def __str__(self):
152         return self.to_text()
153
154     def __eq__(self, other):
155         """Two rdatasets are equal if they have the same class, type, and
156         covers, and contain the same rdata.
157         @rtype: bool"""
158
159         if not isinstance(other, Rdataset):
160             return False
161         if self.rdclass != other.rdclass or \
162            self.rdtype != other.rdtype or \
163            self.covers != other.covers:
164             return False
165         return super(Rdataset, self).__eq__(other)
166
167     def __ne__(self, other):
168         return not self.__eq__(other)
169
170     def to_text(self, name=None, origin=None, relativize=True,
171                 override_rdclass=None, **kw):
172         """Convert the rdataset into DNS master file format.
173
174         @see: L{dns.name.Name.choose_relativity} for more information
175         on how I{origin} and I{relativize} determine the way names
176         are emitted.
177
178         Any additional keyword arguments are passed on to the rdata
179         to_text() method.
180
181         @param name: If name is not None, emit a RRs with I{name} as
182         the owner name.
183         @type name: dns.name.Name object
184         @param origin: The origin for relative names, or None.
185         @type origin: dns.name.Name object
186         @param relativize: True if names should names be relativized
187         @type relativize: bool"""
188         if not name is None:
189             name = name.choose_relativity(origin, relativize)
190             ntext = str(name)
191             pad = ' '
192         else:
193             ntext = ''
194             pad = ''
195         s = StringIO.StringIO()
196         if not override_rdclass is None:
197             rdclass = override_rdclass
198         else:
199             rdclass = self.rdclass
200         if len(self) == 0:
201             #
202             # Empty rdatasets are used for the question section, and in
203             # some dynamic updates, so we don't need to print out the TTL
204             # (which is meaningless anyway).
205             #
206             print >> s, '%s%s%s %s' % (ntext, pad,
207                                        dns.rdataclass.to_text(rdclass),
208                                        dns.rdatatype.to_text(self.rdtype))
209         else:
210             for rd in self:
211                 print >> s, '%s%s%d %s %s %s' % \
212                       (ntext, pad, self.ttl, dns.rdataclass.to_text(rdclass),
213                        dns.rdatatype.to_text(self.rdtype),
214                        rd.to_text(origin=origin, relativize=relativize, **kw))
215         #
216         # We strip off the final \n for the caller's convenience in printing
217         #
218         return s.getvalue()[:-1]
219
220     def to_wire(self, name, file, compress=None, origin=None,
221                 override_rdclass=None, want_shuffle=True):
222         """Convert the rdataset to wire format.
223
224         @param name: The owner name of the RRset that will be emitted
225         @type name: dns.name.Name object
226         @param file: The file to which the wire format data will be appended
227         @type file: file
228         @param compress: The compression table to use; the default is None.
229         @type compress: dict
230         @param origin: The origin to be appended to any relative names when
231         they are emitted.  The default is None.
232         @returns: the number of records emitted
233         @rtype: int
234         """
235
236         if not override_rdclass is None:
237             rdclass =  override_rdclass
238             want_shuffle = False
239         else:
240             rdclass = self.rdclass
241         file.seek(0, 2)
242         if len(self) == 0:
243             name.to_wire(file, compress, origin)
244             stuff = struct.pack("!HHIH", self.rdtype, rdclass, 0, 0)
245             file.write(stuff)
246             return 1
247         else:
248             if want_shuffle:
249                 l = list(self)
250                 random.shuffle(l)
251             else:
252                 l = self
253             for rd in l:
254                 name.to_wire(file, compress, origin)
255                 stuff = struct.pack("!HHIH", self.rdtype, rdclass,
256                                     self.ttl, 0)
257                 file.write(stuff)
258                 start = file.tell()
259                 rd.to_wire(file, compress, origin)
260                 end = file.tell()
261                 assert end - start < 65536
262                 file.seek(start - 2)
263                 stuff = struct.pack("!H", end - start)
264                 file.write(stuff)
265                 file.seek(0, 2)
266             return len(self)
267
268     def match(self, rdclass, rdtype, covers):
269         """Returns True if this rdataset matches the specified class, type,
270         and covers"""
271         if self.rdclass == rdclass and \
272            self.rdtype == rdtype and \
273            self.covers == covers:
274             return True
275         return False
276
277 def from_text_list(rdclass, rdtype, ttl, text_rdatas):
278     """Create an rdataset with the specified class, type, and TTL, and with
279     the specified list of rdatas in text format.
280
281     @rtype: dns.rdataset.Rdataset object
282     """
283
284     if isinstance(rdclass, (str, unicode)):
285         rdclass = dns.rdataclass.from_text(rdclass)
286     if isinstance(rdtype, (str, unicode)):
287         rdtype = dns.rdatatype.from_text(rdtype)
288     r = Rdataset(rdclass, rdtype)
289     r.update_ttl(ttl)
290     for t in text_rdatas:
291         rd = dns.rdata.from_text(r.rdclass, r.rdtype, t)
292         r.add(rd)
293     return r
294
295 def from_text(rdclass, rdtype, ttl, *text_rdatas):
296     """Create an rdataset with the specified class, type, and TTL, and with
297     the specified rdatas in text format.
298
299     @rtype: dns.rdataset.Rdataset object
300     """
301
302     return from_text_list(rdclass, rdtype, ttl, text_rdatas)
303
304 def from_rdata_list(ttl, rdatas):
305     """Create an rdataset with the specified TTL, and with
306     the specified list of rdata objects.
307
308     @rtype: dns.rdataset.Rdataset object
309     """
310
311     if len(rdatas) == 0:
312         raise ValueError("rdata list must not be empty")
313     r = None
314     for rd in rdatas:
315         if r is None:
316             r = Rdataset(rd.rdclass, rd.rdtype)
317             r.update_ttl(ttl)
318             first_time = False
319         r.add(rd)
320     return r
321
322 def from_rdata(ttl, *rdatas):
323     """Create an rdataset with the specified TTL, and with
324     the specified rdata objects.
325
326     @rtype: dns.rdataset.Rdataset object
327     """
328
329     return from_rdata_list(ttl, rdatas)