python/tests: add tests for samba-tool dns
[samba.git] / python / samba / tests / samba_tool / dnscmd.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@catalyst.net.nz>
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import ldb
20
21 from samba.auth import system_session
22 from samba.samdb import SamDB
23 from samba.ndr import ndr_unpack, ndr_pack
24 from samba.dcerpc import dnsp
25 from samba.tests.samba_tool.base import SambaToolCmdTest
26
27 class DnsCmdTestCase(SambaToolCmdTest):
28     def setUp(self):
29         super(DnsCmdTestCase, self).setUp()
30
31         self.dburl = "ldap://%s" % os.environ["SERVER"]
32         self.creds_string = "-U%s%%%s" % (os.environ["DC_USERNAME"],
33                                           os.environ["DC_PASSWORD"])
34
35         self.samdb = self.getSamDB("-H", self.dburl, self.creds_string)
36         self.config_dn = str(self.samdb.get_config_basedn())
37
38         self.testip = "192.168.0.193"
39         self.testip2 = "192.168.0.194"
40
41         self.addZone()
42
43         # Note: SOA types don't work (and shouldn't), as we only have one zone per DNS record.
44
45         good_dns = ["SAMDOM.EXAMPLE.COM",
46                     "1.EXAMPLE.COM",
47                     "%sEXAMPLE.COM" % ("1."*100),
48                     "EXAMPLE",
49                     "\n.COM",
50                     "!@#$%^&*()_",
51                     "HIGH\xFFBYTE",
52                     "@.EXAMPLE.COM",
53                     "."]
54         bad_dns = ["...",
55                    ".EXAMPLE.COM",
56                    ".EXAMPLE.",
57                    "",
58                    "SAMDOM..EXAMPLE.COM"]
59
60         good_mx = ["SAMDOM.EXAMPLE.COM 65530"]
61         bad_mx = ["SAMDOM.EXAMPLE.COM -1",
62                   "SAMDOM.EXAMPLE.COM",
63                   " ",
64                   "SAMDOM.EXAMPLE.COM 1 1",
65                   "SAMDOM.EXAMPLE.COM SAMDOM.EXAMPLE.COM"]
66
67         good_srv = ["SAMDOM.EXAMPLE.COM 65530 65530 65530"]
68         bad_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
69                    "SAMDOM.EXAMPLE.COM 0 0 65536",
70                    "SAMDOM.EXAMPLE.COM 65536 0 0" ]
71
72         for bad_dn in bad_dns:
73             bad_mx.append("%s 1" % bad_dn)
74             bad_srv.append("%s 0 0 0" % bad_dn)
75         for good_dn in good_dns:
76             good_mx.append("%s 1" % good_dn)
77             good_srv.append("%s 0 0 0" % good_dn)
78
79         self.good_records = {
80                 "A":["192.168.0.1", "255.255.255.255"],
81                 "AAAA":["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
82                         "0000:0000:0000:0000:0000:0000:0000:0000",
83                         "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
84                         "1234:1234:1234::",
85                         "1234:5678:9ABC:DEF0::",
86                         "0000:0000::0000",
87                         "1234::5678:9ABC:0000:0000:0000:0000",
88                         "::1",
89                         "::",
90                         "1:1:1:1:1:1:1:1"],
91                 "PTR":good_dns,
92                 "CNAME":good_dns,
93                 "NS":good_dns,
94                 "MX":good_mx,
95                 "SRV":good_srv,
96                 "TXT":["text", "", "@#!", "\n"]
97         }
98
99         self.bad_records = {
100                 "A":["192.168.0.500",
101                      "255.255.255.255/32"],
102                 "AAAA":["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
103                         "0000:0000:0000:0000:0000:0000:0000:0000/1",
104                         "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
105                         "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
106                         "1234:5678:9ABC:DEF0:1234:5678:9ABC",
107                         "1111::1111::1111"],
108                 "PTR":bad_dns,
109                 "CNAME":bad_dns,
110                 "NS":bad_dns,
111                 "MX":bad_mx,
112                 "SRV":bad_srv
113         }
114
115     def tearDown(self):
116         self.deleteZone()
117         super(DnsCmdTestCase, self).tearDown()
118
119     def resetZone(self):
120         self.deleteZone()
121         self.addZone()
122
123     def addZone(self):
124         self.zone = "zone"
125         result, out, err = self.runsubcmd("dns",
126                                           "zonecreate",
127                                           os.environ["SERVER"],
128                                           self.zone,
129                                           self.creds_string)
130         self.assertCmdSuccess(result, out, err)
131
132     def deleteZone(self):
133         result, out, err = self.runsubcmd("dns",
134                                           "zonedelete",
135                                           os.environ["SERVER"],
136                                           self.zone,
137                                           self.creds_string)
138         self.assertCmdSuccess(result, out, err)
139
140     def get_record_from_db(self, zone_name, record_name):
141         zones = self.samdb.search(base="DC=DomainDnsZones,%s"
142                                   % self.samdb.get_default_basedn(),
143                                   scope=ldb.SCOPE_SUBTREE,
144                                   expression="(objectClass=dnsZone)",
145                                   attrs=["cn"])
146
147         for zone in zones:
148             if zone_name in str(zone.dn):
149                 zone_dn = zone.dn
150                 break
151
152         records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
153                                     expression="(objectClass=dnsNode)",
154                                     attrs=["dnsRecord"])
155
156         for old_packed_record in records:
157             if record_name in str(old_packed_record.dn):
158                 return (old_packed_record.dn,
159                         ndr_unpack(dnsp.DnssrvRpcRecord,
160                                    old_packed_record["dnsRecord"][0]))
161
162     def test_rank_none(self):
163         record_str = "192.168.50.50"
164         record_type_str = "A"
165
166         result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
167                                           self.zone, "testrecord", record_type_str,
168                                           record_str, self.creds_string)
169         self.assertCmdSuccess(result, out, err,
170                               "Failed to add record '%s' with type %s."
171                               % (record_str, record_type_str))
172
173         dn, record = self.get_record_from_db(self.zone, "testrecord")
174         record.rank = 0 # DNS_RANK_NONE
175         res = self.samdb.dns_replace_by_dn(dn, [record])
176         if res is not None:
177             self.fail("Unable to update dns record to have DNS_RANK_NONE.")
178
179         errors = []
180
181         # The record should still exist
182         result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
183                                           self.zone, "testrecord", record_type_str,
184                                           self.creds_string)
185         try:
186             self.assertCmdSuccess(result, out, err,
187                                   "Failed to query for a record" \
188                                   "which had DNS_RANK_NONE.")
189             self.assertTrue("testrecord" in out and record_str in out,
190                             "Query for a record which had DNS_RANK_NONE" \
191                             "succeeded but produced no resulting records.")
192         except AssertionError, e:
193             # Windows produces no resulting records
194             pass
195
196         # We should not be able to add a duplicate
197         result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
198                                           self.zone, "testrecord", record_type_str,
199                                           record_str, self.creds_string)
200         try:
201             self.assertCmdFail(result, "Successfully added duplicate record" \
202                                "of one which had DNS_RANK_NONE.")
203         except AssertionError, e:
204             errors.append(e)
205
206         # We should be able to delete it
207         result, out, err = self.runsubcmd("dns", "delete", os.environ["SERVER"],
208                                           self.zone, "testrecord", record_type_str,
209                                           record_str, self.creds_string)
210         try:
211             self.assertCmdSuccess(result, out, err, "Failed to delete record" \
212                                   "which had DNS_RANK_NONE.")
213         except AssertionError, e:
214             errors.append(e)
215
216         # Now the record should not exist
217         result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
218                                           self.zone, "testrecord",
219                                           record_type_str, self.creds_string)
220         try:
221             self.assertCmdFail(result, "Successfully queried for deleted record" \
222                                "which had DNS_RANK_NONE.")
223         except AssertionError, e:
224             errors.append(e)
225
226         if len(errors) > 0:
227             err_str = "Failed appropriate behaviour with DNS_RANK_NONE:"
228             for error in errors:
229                 err_str = err_str + "\n" + str(error)
230             raise AssertionError(err_str)
231
232     def test_accept_valid_commands(self):
233         """
234         For all good records, attempt to add, query and delete them.
235         """
236         num_failures = 0
237         failure_msgs = []
238         for dnstype in self.good_records:
239             for record in self.good_records[dnstype]:
240                 try:
241                     result, out, err = self.runsubcmd("dns", "add",
242                                                       os.environ["SERVER"],
243                                                       self.zone, "testrecord",
244                                                       dnstype, record,
245                                                       self.creds_string)
246                     self.assertCmdSuccess(result, out, err, "Failed to add" \
247                                           "record %s with type %s."
248                                           % (record, dnstype))
249
250                     result, out, err = self.runsubcmd("dns", "query",
251                                                       os.environ["SERVER"],
252                                                       self.zone, "testrecord",
253                                                       dnstype,
254                                                       self.creds_string)
255                     self.assertCmdSuccess(result, out, err, "Failed to query" \
256                                           "record %s with qualifier %s."
257                                           % (record, dnstype))
258
259                     result, out, err = self.runsubcmd("dns", "delete",
260                                                       os.environ["SERVER"],
261                                                       self.zone, "testrecord",
262                                                       dnstype, record,
263                                                       self.creds_string)
264                     self.assertCmdSuccess(result, out, err, "Failed to remove" \
265                                           "record %s with type %s."
266                                           % (record, dnstype))
267                 except AssertionError as e:
268                     num_failures = num_failures + 1
269                     failure_msgs.append(e)
270
271         if num_failures > 0:
272             for msg in failure_msgs:
273                 print(msg)
274             self.fail("Failed to accept valid commands. %d total failures." \
275                       "Errors above." % num_failures)
276
277     def test_reject_invalid_commands(self):
278         """
279         For all bad records, attempt to add them and update to them,
280         making sure that both operations fail.
281         """
282         num_failures = 0
283         failure_msgs = []
284
285         # Add invalid records and make sure they fail to be added
286         for dnstype in self.bad_records:
287             for record in self.bad_records[dnstype]:
288                 try:
289                     result, out, err = self.runsubcmd("dns", "add",
290                                                       os.environ["SERVER"],
291                                                       self.zone, "testrecord",
292                                                       dnstype, record,
293                                                       self.creds_string)
294                     self.assertCmdFail(result, "Successfully added invalid" \
295                                        "record '%s' of type '%s'."
296                                        % (record, dnstype))
297                 except AssertionError as e:
298                     num_failures = num_failures + 1
299                     failure_msgs.append(e)
300                     self.resetZone()
301                 try:
302                     result, out, err = self.runsubcmd("dns", "delete",
303                                                       os.environ["SERVER"],
304                                                       self.zone, "testrecord",
305                                                       dnstype, record,
306                                                       self.creds_string)
307                     self.assertCmdFail(result, "Successfully deleted invalid" \
308                                        "record '%s' of type '%s' which" \
309                                        "shouldn't exist." % (record, dnstype))
310                 except AssertionError as e:
311                     num_failures = num_failures + 1
312                     failure_msgs.append(e)
313                     self.resetZone()
314
315         # Update valid records to invalid ones and make sure they
316         # fail to be updated
317         for dnstype in self.bad_records:
318             for bad_record in self.bad_records[dnstype]:
319                 good_record = self.good_records[dnstype][0]
320
321                 try:
322                     result, out, err = self.runsubcmd("dns", "add",
323                                                       os.environ["SERVER"],
324                                                       self.zone, "testrecord",
325                                                       dnstype, good_record,
326                                                       self.creds_string)
327                     self.assertCmdSuccess(result, out, err, "Failed to add " \
328                                           "record '%s' with type %s."
329                                           % (record, dnstype))
330
331                     result, out, err = self.runsubcmd("dns", "update",
332                                                       os.environ["SERVER"],
333                                                       self.zone, "testrecord",
334                                                       dnstype, good_record,
335                                                       bad_record,
336                                                       self.creds_string)
337                     self.assertCmdFail(result, "Successfully updated valid " \
338                                        "record '%s' of type '%s' to invalid " \
339                                        "record '%s' of the same type."
340                                        % (good_record, dnstype, bad_record))
341
342                     result, out, err = self.runsubcmd("dns", "delete",
343                                                       os.environ["SERVER"],
344                                                       self.zone, "testrecord",
345                                                       dnstype, good_record,
346                                                       self.creds_string)
347                     self.assertCmdSuccess(result, out, err, "Could not delete " \
348                                           "valid record '%s' of type '%s'."
349                                           % (good_record, dnstype))
350                 except AssertionError as e:
351                     num_failures = num_failures + 1
352                     failure_msgs.append(e)
353                     self.resetZone()
354
355         if num_failures > 0:
356             for msg in failure_msgs:
357                 print(msg)
358             self.fail("Failed to reject invalid commands. %d total failures. " \
359                       "Errors above." % num_failures)
360
361     def test_update_invalid_type(self):
362         """
363         Make sure that a record can't be updated to one of a different type.
364         """
365         for dnstype1 in self.good_records:
366             record1 = self.good_records[dnstype1][0]
367             result, out, err = self.runsubcmd("dns", "add",
368                                               os.environ["SERVER"],
369                                               self.zone, "testrecord",
370                                               dnstype1, record1,
371                                               self.creds_string)
372             self.assertCmdSuccess(result, out, err, "Failed to add " \
373                                   "record %s with type %s."
374                                   % (record1, dnstype1))
375
376             for dnstype2 in self.good_records:
377                 record2 = self.good_records[dnstype2][0]
378
379                 # Make sure that record2 isn't a valid entry of dnstype1.
380                 # For example, any A-type will also be a valid TXT-type.
381                 result, out, err = self.runsubcmd("dns", "add",
382                                                   os.environ["SERVER"],
383                                                   self.zone, "testrecord",
384                                                   dnstype1, record2,
385                                                   self.creds_string)
386                 try:
387                     self.assertCmdFail(result)
388                 except AssertionError:
389                     continue # Don't check this one, because record2 _is_ a valid entry of dnstype1.
390
391                 # Check both ways: Give the current type and try to update,
392                 # and give the new type and try to update.
393                 result, out, err = self.runsubcmd("dns", "update",
394                                                   os.environ["SERVER"],
395                                                   self.zone, "testrecord",
396                                                   dnstype1, record1,
397                                                   record2, self.creds_string)
398                 self.assertCmdFail(result, "Successfully updated record '%s' " \
399                                    "to '%s', even though the latter is of " \
400                                    "type '%s' where '%s' was expected."
401                                    % (record1, record2, dnstype2, dnstype1))
402
403                 result, out, err = self.runsubcmd("dns", "update",
404                                                   os.environ["SERVER"],
405                                                   self.zone, "testrecord",
406                                                   dnstype2, record1, record2,
407                                                   self.creds_string)
408                 self.assertCmdFail(result, "Successfully updated record " \
409                                    "'%s' to '%s', even though the former " \
410                                    "is of type '%s' where '%s' was expected."
411                                    % (record1, record2, dnstype1, dnstype2))
412
413
414     def test_update_valid_type(self):
415         for dnstype in self.good_records:
416             for record in self.good_records[dnstype]:
417                 result, out, err = self.runsubcmd("dns", "add",
418                                                   os.environ["SERVER"],
419                                                   self.zone, "testrecord",
420                                                   dnstype, record,
421                                                   self.creds_string)
422                 self.assertCmdSuccess(result, out, err, "Failed to add " \
423                                       "record %s with type %s."
424                                       % (record, dnstype))
425
426                 # Update the record to be the same.
427                 result, out, err = self.runsubcmd("dns", "update",
428                                                   os.environ["SERVER"],
429                                                   self.zone, "testrecord",
430                                                   dnstype, record, record,
431                                                   self.creds_string)
432                 self.assertCmdFail(result, "Successfully updated record " \
433                                    "'%s' to be exactly the same." % record)
434
435                 result, out, err = self.runsubcmd("dns", "delete",
436                                                   os.environ["SERVER"],
437                                                   self.zone, "testrecord",
438                                                   dnstype, record,
439                                                   self.creds_string)
440                 self.assertCmdSuccess(result, out, err, "Could not delete " \
441                                       "valid record '%s' of type '%s'."
442                                       % (record, dnstype))
443
444         for record in self.good_records["SRV"]:
445             result, out, err = self.runsubcmd("dns", "add",
446                                               os.environ["SERVER"],
447                                               self.zone, "testrecord",
448                                               "SRV", record,
449                                               self.creds_string)
450             self.assertCmdSuccess(result, out, err, "Failed to add " \
451                                   "record %s with type 'SRV'." % record)
452
453             split = record.split(' ')
454             new_bit = str(int(split[3]) + 1)
455             new_record = '%s %s %s %s' % (split[0], split[1], split[2], new_bit)
456
457             result, out, err = self.runsubcmd("dns", "update",
458                                               os.environ["SERVER"],
459                                               self.zone, "testrecord",
460                                               "SRV", record,
461                                               new_record, self.creds_string)
462             self.assertCmdSuccess(result, out, err, "Failed to update record " \
463                                   "'%s' of type '%s' to '%s'."
464                                   % (record, "SRV", new_record))
465
466             result, out, err = self.runsubcmd("dns", "query",
467                                               os.environ["SERVER"],
468                                               self.zone, "testrecord",
469                                               "SRV", self.creds_string)
470             self.assertCmdSuccess(result, out, err, "Failed to query for " \
471                                   "record '%s' of type '%s'."
472                                   % (new_record, "SRV"))
473
474             result, out, err = self.runsubcmd("dns", "delete",
475                                               os.environ["SERVER"],
476                                               self.zone, "testrecord",
477                                               "SRV", new_record,
478                                               self.creds_string)
479             self.assertCmdSuccess(result, out, err, "Could not delete " \
480                                   "valid record '%s' of type '%s'."
481                                   % (new_record, "SRV"))
482
483         # Since 'dns update' takes the current value as a parameter, make sure
484         # we can't enter the wrong current value for a given record.
485         for dnstype in self.good_records:
486             if len(self.good_records[dnstype]) < 3:
487                 continue # Not enough records of this type to do this test
488
489             used_record = self.good_records[dnstype][0]
490             unused_record = self.good_records[dnstype][1]
491             new_record = self.good_records[dnstype][2]
492
493             result, out, err = self.runsubcmd("dns", "add",
494                                               os.environ["SERVER"],
495                                               self.zone, "testrecord",
496                                               dnstype, used_record,
497                                               self.creds_string)
498             self.assertCmdSuccess(result, out, err, "Failed to add record %s " \
499                                   "with type %s." % (used_record, dnstype))
500
501             result, out, err = self.runsubcmd("dns", "update",
502                                               os.environ["SERVER"],
503                                               self.zone, "testrecord",
504                                               dnstype, unused_record,
505                                               new_record,
506                                               self.creds_string)
507             self.assertCmdFail(result, "Successfully updated record '%s' " \
508                                "from '%s' to '%s', even though the given " \
509                                "source record is incorrect."
510                                % (used_record, unused_record, new_record))
511
512     def test_invalid_types(self):
513         result, out, err = self.runsubcmd("dns", "add",
514                                           os.environ["SERVER"],
515                                           self.zone, "testrecord",
516                                           "SOA", "test",
517                                           self.creds_string)
518         self.assertCmdFail(result, "Successfully added record of type SOA, " \
519                            "when this type should not be available.")
520         self.assertTrue("type SOA is not supported" in err,
521                         "Invalid error message '%s' when attempting to " \
522                         "add record of type SOA." % err)
523
524     def test_query_deleted_record(self):
525         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
526                        "testrecord", "A", self.testip, self.creds_string)
527         self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
528                        "testrecord", "A", self.testip, self.creds_string)
529
530         result, out, err = self.runsubcmd("dns", "query",
531                                           os.environ["SERVER"],
532                                           self.zone, "testrecord",
533                                           "A", self.creds_string)
534         self.assertCmdFail(result)
535
536     def test_remove_deleted_record(self):
537         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
538                        "testrecord", "A", self.testip, self.creds_string)
539         self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
540                        "testrecord", "A", self.testip, self.creds_string)
541
542         # Attempting to delete a record that has already been deleted or has never existed should fail
543         result, out, err = self.runsubcmd("dns", "delete",
544                                           os.environ["SERVER"],
545                                           self.zone, "testrecord",
546                                           "A", self.testip, self.creds_string)
547         self.assertCmdFail(result)
548         result, out, err = self.runsubcmd("dns", "query",
549                                           os.environ["SERVER"],
550                                           self.zone, "testrecord",
551                                           "A", self.creds_string)
552         self.assertCmdFail(result)
553         result, out, err = self.runsubcmd("dns", "delete",
554                                           os.environ["SERVER"],
555                                           self.zone, "testrecord2",
556                                           "A", self.testip, self.creds_string)
557         self.assertCmdFail(result)