s4-python: Format to PEP8, simplify tests.
[idra/samba.git] / source4 / scripting / python / samba_external / dnspython / dns / update.py
1 # Copyright (C) 2003-2007, 2009, 2010 Nominum, Inc.
2 #
3 # Permission to use, copy, modify, and distribute this software and its
4 # documentation for any purpose with or without fee is hereby granted,
5 # provided that the above copyright notice and this permission notice
6 # appear in all copies.
7 #
8 # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
9 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
11 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
14 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16 """DNS Dynamic Update Support"""
17
18 import dns.message
19 import dns.name
20 import dns.opcode
21 import dns.rdata
22 import dns.rdataclass
23 import dns.rdataset
24
25 class Update(dns.message.Message):
26     def __init__(self, zone, rdclass=dns.rdataclass.IN, keyring=None,
27                  keyname=None, keyalgorithm=dns.tsig.default_algorithm):
28         """Initialize a new DNS Update object.
29
30         @param zone: The zone which is being updated.
31         @type zone: A dns.name.Name or string
32         @param rdclass: The class of the zone; defaults to dns.rdataclass.IN.
33         @type rdclass: An int designating the class, or a string whose value
34         is the name of a class.
35         @param keyring: The TSIG keyring to use; defaults to None.
36         @type keyring: dict
37         @param keyname: The name of the TSIG key to use; defaults to None.
38         The key must be defined in the keyring.  If a keyring is specified
39         but a keyname is not, then the key used will be the first key in the
40         keyring.  Note that the order of keys in a dictionary is not defined,
41         so applications should supply a keyname when a keyring is used, unless
42         they know the keyring contains only one key.
43         @type keyname: dns.name.Name or string
44         @param keyalgorithm: The TSIG algorithm to use; defaults to
45         dns.tsig.default_algorithm
46         @type keyalgorithm: string
47         """
48         super(Update, self).__init__()
49         self.flags |= dns.opcode.to_flags(dns.opcode.UPDATE)
50         if isinstance(zone, (str, unicode)):
51             zone = dns.name.from_text(zone)
52         self.origin = zone
53         if isinstance(rdclass, str):
54             rdclass = dns.rdataclass.from_text(rdclass)
55         self.zone_rdclass = rdclass
56         self.find_rrset(self.question, self.origin, rdclass, dns.rdatatype.SOA,
57                         create=True, force_unique=True)
58         if not keyring is None:
59             self.use_tsig(keyring, keyname, keyalgorithm)
60
61     def _add_rr(self, name, ttl, rd, deleting=None, section=None):
62         """Add a single RR to the update section."""
63
64         if section is None:
65             section = self.authority
66         covers = rd.covers()
67         rrset = self.find_rrset(section, name, self.zone_rdclass, rd.rdtype,
68                                 covers, deleting, True, True)
69         rrset.add(rd, ttl)
70
71     def _add(self, replace, section, name, *args):
72         """Add records.  The first argument is the replace mode.  If
73         false, RRs are added to an existing RRset; if true, the RRset
74         is replaced with the specified contents.  The second
75         argument is the section to add to.  The third argument
76         is always a name.  The other arguments can be:
77
78                 - rdataset...
79
80                 - ttl, rdata...
81
82                 - ttl, rdtype, string..."""
83
84         if isinstance(name, (str, unicode)):
85             name = dns.name.from_text(name, None)
86         if isinstance(args[0], dns.rdataset.Rdataset):
87             for rds in args:
88                 if replace:
89                     self.delete(name, rds.rdtype)
90                 for rd in rds:
91                     self._add_rr(name, rds.ttl, rd, section=section)
92         else:
93             args = list(args)
94             ttl = int(args.pop(0))
95             if isinstance(args[0], dns.rdata.Rdata):
96                 if replace:
97                     self.delete(name, args[0].rdtype)
98                 for rd in args:
99                     self._add_rr(name, ttl, rd, section=section)
100             else:
101                 rdtype = args.pop(0)
102                 if isinstance(rdtype, str):
103                     rdtype = dns.rdatatype.from_text(rdtype)
104                 if replace:
105                     self.delete(name, rdtype)
106                 for s in args:
107                     rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s,
108                                              self.origin)
109                     self._add_rr(name, ttl, rd, section=section)
110
111     def add(self, name, *args):
112         """Add records.  The first argument is always a name.  The other
113         arguments can be:
114
115                 - rdataset...
116
117                 - ttl, rdata...
118
119                 - ttl, rdtype, string..."""
120         self._add(False, self.authority, name, *args)
121
122     def delete(self, name, *args):
123         """Delete records.  The first argument is always a name.  The other
124         arguments can be:
125
126                 - I{nothing}
127
128                 - rdataset...
129
130                 - rdata...
131
132                 - rdtype, [string...]"""
133
134         if isinstance(name, (str, unicode)):
135             name = dns.name.from_text(name, None)
136         if len(args) == 0:
137             rrset = self.find_rrset(self.authority, name, dns.rdataclass.ANY,
138                                     dns.rdatatype.ANY, dns.rdatatype.NONE,
139                                     dns.rdatatype.ANY, True, True)
140         elif isinstance(args[0], dns.rdataset.Rdataset):
141             for rds in args:
142                 for rd in rds:
143                     self._add_rr(name, 0, rd, dns.rdataclass.NONE)
144         else:
145             args = list(args)
146             if isinstance(args[0], dns.rdata.Rdata):
147                 for rd in args:
148                     self._add_rr(name, 0, rd, dns.rdataclass.NONE)
149             else:
150                 rdtype = args.pop(0)
151                 if isinstance(rdtype, str):
152                     rdtype = dns.rdatatype.from_text(rdtype)
153                 if len(args) == 0:
154                     rrset = self.find_rrset(self.authority, name,
155                                             self.zone_rdclass, rdtype,
156                                             dns.rdatatype.NONE,
157                                             dns.rdataclass.ANY,
158                                             True, True)
159                 else:
160                     for s in args:
161                         rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s,
162                                                  self.origin)
163                         self._add_rr(name, 0, rd, dns.rdataclass.NONE)
164
165     def replace(self, name, *args):
166         """Replace records.  The first argument is always a name.  The other
167         arguments can be:
168
169                 - rdataset...
170
171                 - ttl, rdata...
172
173                 - ttl, rdtype, string...
174
175         Note that if you want to replace the entire node, you should do
176         a delete of the name followed by one or more calls to add."""
177
178         self._add(True, self.authority, name, *args)
179
180     def present(self, name, *args):
181         """Require that an owner name (and optionally an rdata type,
182         or specific rdataset) exists as a prerequisite to the
183         execution of the update.  The first argument is always a name.
184         The other arguments can be:
185
186                 - rdataset...
187
188                 - rdata...
189
190                 - rdtype, string..."""
191
192         if isinstance(name, (str, unicode)):
193             name = dns.name.from_text(name, None)
194         if len(args) == 0:
195             rrset = self.find_rrset(self.answer, name,
196                                     dns.rdataclass.ANY, dns.rdatatype.ANY,
197                                     dns.rdatatype.NONE, None,
198                                     True, True)
199         elif isinstance(args[0], dns.rdataset.Rdataset) or \
200              isinstance(args[0], dns.rdata.Rdata) or \
201              len(args) > 1:
202             if not isinstance(args[0], dns.rdataset.Rdataset):
203                 # Add a 0 TTL
204                 args = list(args)
205                 args.insert(0, 0)
206             self._add(False, self.answer, name, *args)
207         else:
208             rdtype = args[0]
209             if isinstance(rdtype, str):
210                 rdtype = dns.rdatatype.from_text(rdtype)
211             rrset = self.find_rrset(self.answer, name,
212                                     dns.rdataclass.ANY, rdtype,
213                                     dns.rdatatype.NONE, None,
214                                     True, True)
215
216     def absent(self, name, rdtype=None):
217         """Require that an owner name (and optionally an rdata type) does
218         not exist as a prerequisite to the execution of the update."""
219
220         if isinstance(name, (str, unicode)):
221             name = dns.name.from_text(name, None)
222         if rdtype is None:
223             rrset = self.find_rrset(self.answer, name,
224                                     dns.rdataclass.NONE, dns.rdatatype.ANY,
225                                     dns.rdatatype.NONE, None,
226                                     True, True)
227         else:
228             if isinstance(rdtype, str):
229                 rdtype = dns.rdatatype.from_text(rdtype)
230             rrset = self.find_rrset(self.answer, name,
231                                     dns.rdataclass.NONE, rdtype,
232                                     dns.rdatatype.NONE, None,
233                                     True, True)
234
235     def to_wire(self, origin=None, max_size=65535):
236         """Return a string containing the update in DNS compressed wire
237         format.
238         @rtype: string"""
239         if origin is None:
240             origin = self.origin
241         return super(Update, self).to_wire(origin, max_size)