s4-python: Move dnspython to lib/, like the other Python modules
[samba.git] / lib / dnspython / dns / rdata.py
diff --git a/lib/dnspython/dns/rdata.py b/lib/dnspython/dns/rdata.py
new file mode 100644 (file)
index 0000000..ce02686
--- /dev/null
@@ -0,0 +1,456 @@
+# Copyright (C) 2001-2007, 2009, 2010 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""DNS rdata.
+
+@var _rdata_modules: A dictionary mapping a (rdclass, rdtype) tuple to
+the module which implements that type.
+@type _rdata_modules: dict
+@var _module_prefix: The prefix to use when forming modules names.  The
+default is 'dns.rdtypes'.  Changing this value will break the library.
+@type _module_prefix: string
+@var _hex_chunk: At most this many octets that will be represented in each
+chunk of hexstring that _hexify() produces before whitespace occurs.
+@type _hex_chunk: int"""
+
+import cStringIO
+
+import dns.exception
+import dns.rdataclass
+import dns.rdatatype
+import dns.tokenizer
+
+_hex_chunksize = 32
+
+def _hexify(data, chunksize=None):
+    """Convert a binary string into its hex encoding, broken up into chunks
+    of I{chunksize} characters separated by a space.
+
+    @param data: the binary string
+    @type data: string
+    @param chunksize: the chunk size.  Default is L{dns.rdata._hex_chunksize}
+    @rtype: string
+    """
+
+    if chunksize is None:
+        chunksize = _hex_chunksize
+    hex = data.encode('hex_codec')
+    l = len(hex)
+    if l > chunksize:
+        chunks = []
+        i = 0
+        while i < l:
+            chunks.append(hex[i : i + chunksize])
+            i += chunksize
+        hex = ' '.join(chunks)
+    return hex
+
+_base64_chunksize = 32
+
+def _base64ify(data, chunksize=None):
+    """Convert a binary string into its base64 encoding, broken up into chunks
+    of I{chunksize} characters separated by a space.
+
+    @param data: the binary string
+    @type data: string
+    @param chunksize: the chunk size.  Default is
+    L{dns.rdata._base64_chunksize}
+    @rtype: string
+    """
+
+    if chunksize is None:
+        chunksize = _base64_chunksize
+    b64 = data.encode('base64_codec')
+    b64 = b64.replace('\n', '')
+    l = len(b64)
+    if l > chunksize:
+        chunks = []
+        i = 0
+        while i < l:
+            chunks.append(b64[i : i + chunksize])
+            i += chunksize
+        b64 = ' '.join(chunks)
+    return b64
+
+__escaped = {
+    '"' : True,
+    '\\' : True,
+    }
+
+def _escapify(qstring):
+    """Escape the characters in a quoted string which need it.
+
+    @param qstring: the string
+    @type qstring: string
+    @returns: the escaped string
+    @rtype: string
+    """
+
+    text = ''
+    for c in qstring:
+        if c in __escaped:
+            text += '\\' + c
+        elif ord(c) >= 0x20 and ord(c) < 0x7F:
+            text += c
+        else:
+            text += '\\%03d' % ord(c)
+    return text
+
+def _truncate_bitmap(what):
+    """Determine the index of greatest byte that isn't all zeros, and
+    return the bitmap that contains all the bytes less than that index.
+
+    @param what: a string of octets representing a bitmap.
+    @type what: string
+    @rtype: string
+    """
+
+    for i in xrange(len(what) - 1, -1, -1):
+        if what[i] != '\x00':
+            break
+    return ''.join(what[0 : i + 1])
+
+class Rdata(object):
+    """Base class for all DNS rdata types.
+    """
+
+    __slots__ = ['rdclass', 'rdtype']
+
+    def __init__(self, rdclass, rdtype):
+        """Initialize an rdata.
+        @param rdclass: The rdata class
+        @type rdclass: int
+        @param rdtype: The rdata type
+        @type rdtype: int
+        """
+
+        self.rdclass = rdclass
+        self.rdtype = rdtype
+
+    def covers(self):
+        """DNS SIG/RRSIG rdatas apply to a specific type; this type is
+        returned by the covers() function.  If the rdata type is not
+        SIG or RRSIG, dns.rdatatype.NONE is returned.  This is useful when
+        creating rdatasets, allowing the rdataset to contain only RRSIGs
+        of a particular type, e.g. RRSIG(NS).
+        @rtype: int
+        """
+
+        return dns.rdatatype.NONE
+
+    def extended_rdatatype(self):
+        """Return a 32-bit type value, the least significant 16 bits of
+        which are the ordinary DNS type, and the upper 16 bits of which are
+        the "covered" type, if any.
+        @rtype: int
+        """
+
+        return self.covers() << 16 | self.rdtype
+
+    def to_text(self, origin=None, relativize=True, **kw):
+        """Convert an rdata to text format.
+        @rtype: string
+        """
+        raise NotImplementedError
+
+    def to_wire(self, file, compress = None, origin = None):
+        """Convert an rdata to wire format.
+        @rtype: string
+        """
+
+        raise NotImplementedError
+
+    def to_digestable(self, origin = None):
+        """Convert rdata to a format suitable for digesting in hashes.  This
+        is also the DNSSEC canonical form."""
+        f = cStringIO.StringIO()
+        self.to_wire(f, None, origin)
+        return f.getvalue()
+
+    def validate(self):
+        """Check that the current contents of the rdata's fields are
+        valid.  If you change an rdata by assigning to its fields,
+        it is a good idea to call validate() when you are done making
+        changes.
+        """
+        dns.rdata.from_text(self.rdclass, self.rdtype, self.to_text())
+
+    def __repr__(self):
+        covers = self.covers()
+        if covers == dns.rdatatype.NONE:
+            ctext = ''
+        else:
+            ctext = '(' + dns.rdatatype.to_text(covers) + ')'
+        return '<DNS ' + dns.rdataclass.to_text(self.rdclass) + ' ' + \
+               dns.rdatatype.to_text(self.rdtype) + ctext + ' rdata: ' + \
+               str(self) + '>'
+
+    def __str__(self):
+        return self.to_text()
+
+    def _cmp(self, other):
+        """Compare an rdata with another rdata of the same rdtype and
+        rdclass.  Return < 0 if self < other in the DNSSEC ordering,
+        0 if self == other, and > 0 if self > other.
+        """
+
+        raise NotImplementedError
+
+    def __eq__(self, other):
+        if not isinstance(other, Rdata):
+            return False
+        if self.rdclass != other.rdclass or \
+           self.rdtype != other.rdtype:
+            return False
+        return self._cmp(other) == 0
+
+    def __ne__(self, other):
+        if not isinstance(other, Rdata):
+            return True
+        if self.rdclass != other.rdclass or \
+           self.rdtype != other.rdtype:
+            return True
+        return self._cmp(other) != 0
+
+    def __lt__(self, other):
+        if not isinstance(other, Rdata) or \
+               self.rdclass != other.rdclass or \
+               self.rdtype != other.rdtype:
+            return NotImplemented
+        return self._cmp(other) < 0
+
+    def __le__(self, other):
+        if not isinstance(other, Rdata) or \
+               self.rdclass != other.rdclass or \
+               self.rdtype != other.rdtype:
+            return NotImplemented
+        return self._cmp(other) <= 0
+
+    def __ge__(self, other):
+        if not isinstance(other, Rdata) or \
+               self.rdclass != other.rdclass or \
+               self.rdtype != other.rdtype:
+            return NotImplemented
+        return self._cmp(other) >= 0
+
+    def __gt__(self, other):
+        if not isinstance(other, Rdata) or \
+               self.rdclass != other.rdclass or \
+               self.rdtype != other.rdtype:
+            return NotImplemented
+        return self._cmp(other) > 0
+
+    def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
+        """Build an rdata object from text format.
+
+        @param rdclass: The rdata class
+        @type rdclass: int
+        @param rdtype: The rdata type
+        @type rdtype: int
+        @param tok: The tokenizer
+        @type tok: dns.tokenizer.Tokenizer
+        @param origin: The origin to use for relative names
+        @type origin: dns.name.Name
+        @param relativize: should names be relativized?
+        @type relativize: bool
+        @rtype: dns.rdata.Rdata instance
+        """
+
+        raise NotImplementedError
+
+    from_text = classmethod(from_text)
+
+    def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
+        """Build an rdata object from wire format
+
+        @param rdclass: The rdata class
+        @type rdclass: int
+        @param rdtype: The rdata type
+        @type rdtype: int
+        @param wire: The wire-format message
+        @type wire: string
+        @param current: The offet in wire of the beginning of the rdata.
+        @type current: int
+        @param rdlen: The length of the wire-format rdata
+        @type rdlen: int
+        @param origin: The origin to use for relative names
+        @type origin: dns.name.Name
+        @rtype: dns.rdata.Rdata instance
+        """
+
+        raise NotImplementedError
+
+    from_wire = classmethod(from_wire)
+
+    def choose_relativity(self, origin = None, relativize = True):
+        """Convert any domain names in the rdata to the specified
+        relativization.
+        """
+
+        pass
+
+
+class GenericRdata(Rdata):
+    """Generate Rdata Class
+
+    This class is used for rdata types for which we have no better
+    implementation.  It implements the DNS "unknown RRs" scheme.
+    """
+
+    __slots__ = ['data']
+
+    def __init__(self, rdclass, rdtype, data):
+        super(GenericRdata, self).__init__(rdclass, rdtype)
+        self.data = data
+
+    def to_text(self, origin=None, relativize=True, **kw):
+        return r'\# %d ' % len(self.data) + _hexify(self.data)
+
+    def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
+        token = tok.get()
+        if not token.is_identifier() or token.value != '\#':
+            raise dns.exception.SyntaxError(r'generic rdata does not start with \#')
+        length = tok.get_int()
+        chunks = []
+        while 1:
+            token = tok.get()
+            if token.is_eol_or_eof():
+                break
+            chunks.append(token.value)
+        hex = ''.join(chunks)
+        data = hex.decode('hex_codec')
+        if len(data) != length:
+            raise dns.exception.SyntaxError('generic rdata hex data has wrong length')
+        return cls(rdclass, rdtype, data)
+
+    from_text = classmethod(from_text)
+
+    def to_wire(self, file, compress = None, origin = None):
+        file.write(self.data)
+
+    def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
+        return cls(rdclass, rdtype, wire[current : current + rdlen])
+
+    from_wire = classmethod(from_wire)
+
+    def _cmp(self, other):
+        return cmp(self.data, other.data)
+
+_rdata_modules = {}
+_module_prefix = 'dns.rdtypes'
+
+def get_rdata_class(rdclass, rdtype):
+
+    def import_module(name):
+        mod = __import__(name)
+        components = name.split('.')
+        for comp in components[1:]:
+            mod = getattr(mod, comp)
+        return mod
+
+    mod = _rdata_modules.get((rdclass, rdtype))
+    rdclass_text = dns.rdataclass.to_text(rdclass)
+    rdtype_text = dns.rdatatype.to_text(rdtype)
+    rdtype_text = rdtype_text.replace('-', '_')
+    if not mod:
+        mod = _rdata_modules.get((dns.rdatatype.ANY, rdtype))
+        if not mod:
+            try:
+                mod = import_module('.'.join([_module_prefix,
+                                              rdclass_text, rdtype_text]))
+                _rdata_modules[(rdclass, rdtype)] = mod
+            except ImportError:
+                try:
+                    mod = import_module('.'.join([_module_prefix,
+                                                  'ANY', rdtype_text]))
+                    _rdata_modules[(dns.rdataclass.ANY, rdtype)] = mod
+                except ImportError:
+                    mod = None
+    if mod:
+        cls = getattr(mod, rdtype_text)
+    else:
+        cls = GenericRdata
+    return cls
+
+def from_text(rdclass, rdtype, tok, origin = None, relativize = True):
+    """Build an rdata object from text format.
+
+    This function attempts to dynamically load a class which
+    implements the specified rdata class and type.  If there is no
+    class-and-type-specific implementation, the GenericRdata class
+    is used.
+
+    Once a class is chosen, its from_text() class method is called
+    with the parameters to this function.
+
+    @param rdclass: The rdata class
+    @type rdclass: int
+    @param rdtype: The rdata type
+    @type rdtype: int
+    @param tok: The tokenizer
+    @type tok: dns.tokenizer.Tokenizer
+    @param origin: The origin to use for relative names
+    @type origin: dns.name.Name
+    @param relativize: Should names be relativized?
+    @type relativize: bool
+    @rtype: dns.rdata.Rdata instance"""
+
+    if isinstance(tok, str):
+        tok = dns.tokenizer.Tokenizer(tok)
+    cls = get_rdata_class(rdclass, rdtype)
+    if cls != GenericRdata:
+        # peek at first token
+        token = tok.get()
+        tok.unget(token)
+        if token.is_identifier() and \
+           token.value == r'\#':
+            #
+            # Known type using the generic syntax.  Extract the
+            # wire form from the generic syntax, and then run
+            # from_wire on it.
+            #
+            rdata = GenericRdata.from_text(rdclass, rdtype, tok, origin,
+                                           relativize)
+            return from_wire(rdclass, rdtype, rdata.data, 0, len(rdata.data),
+                             origin)
+    return cls.from_text(rdclass, rdtype, tok, origin, relativize)
+
+def from_wire(rdclass, rdtype, wire, current, rdlen, origin = None):
+    """Build an rdata object from wire format
+
+    This function attempts to dynamically load a class which
+    implements the specified rdata class and type.  If there is no
+    class-and-type-specific implementation, the GenericRdata class
+    is used.
+
+    Once a class is chosen, its from_wire() class method is called
+    with the parameters to this function.
+
+    @param rdclass: The rdata class
+    @type rdclass: int
+    @param rdtype: The rdata type
+    @type rdtype: int
+    @param wire: The wire-format message
+    @type wire: string
+    @param current: The offet in wire of the beginning of the rdata.
+    @type current: int
+    @param rdlen: The length of the wire-format rdata
+    @type rdlen: int
+    @param origin: The origin to use for relative names
+    @type origin: dns.name.Name
+    @rtype: dns.rdata.Rdata instance"""
+
+    cls = get_rdata_class(rdclass, rdtype)
+    return cls.from_wire(rdclass, rdtype, wire, current, rdlen, origin)