python/tests: add tests for samba-tool dns
[samba.git] / python / samba / tests / samba_tool / dnscmd.py
diff --git a/python/samba/tests/samba_tool/dnscmd.py b/python/samba/tests/samba_tool/dnscmd.py
new file mode 100644 (file)
index 0000000..c1cb7df
--- /dev/null
@@ -0,0 +1,557 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Andrew Bartlett <abartlet@catalyst.net.nz>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+import ldb
+
+from samba.auth import system_session
+from samba.samdb import SamDB
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.dcerpc import dnsp
+from samba.tests.samba_tool.base import SambaToolCmdTest
+
+class DnsCmdTestCase(SambaToolCmdTest):
+    def setUp(self):
+        super(DnsCmdTestCase, self).setUp()
+
+        self.dburl = "ldap://%s" % os.environ["SERVER"]
+        self.creds_string = "-U%s%%%s" % (os.environ["DC_USERNAME"],
+                                          os.environ["DC_PASSWORD"])
+
+        self.samdb = self.getSamDB("-H", self.dburl, self.creds_string)
+        self.config_dn = str(self.samdb.get_config_basedn())
+
+        self.testip = "192.168.0.193"
+        self.testip2 = "192.168.0.194"
+
+        self.addZone()
+
+        # Note: SOA types don't work (and shouldn't), as we only have one zone per DNS record.
+
+        good_dns = ["SAMDOM.EXAMPLE.COM",
+                    "1.EXAMPLE.COM",
+                    "%sEXAMPLE.COM" % ("1."*100),
+                    "EXAMPLE",
+                    "\n.COM",
+                    "!@#$%^&*()_",
+                    "HIGH\xFFBYTE",
+                    "@.EXAMPLE.COM",
+                    "."]
+        bad_dns = ["...",
+                   ".EXAMPLE.COM",
+                   ".EXAMPLE.",
+                   "",
+                   "SAMDOM..EXAMPLE.COM"]
+
+        good_mx = ["SAMDOM.EXAMPLE.COM 65530"]
+        bad_mx = ["SAMDOM.EXAMPLE.COM -1",
+                  "SAMDOM.EXAMPLE.COM",
+                  " ",
+                  "SAMDOM.EXAMPLE.COM 1 1",
+                  "SAMDOM.EXAMPLE.COM SAMDOM.EXAMPLE.COM"]
+
+        good_srv = ["SAMDOM.EXAMPLE.COM 65530 65530 65530"]
+        bad_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
+                   "SAMDOM.EXAMPLE.COM 0 0 65536",
+                   "SAMDOM.EXAMPLE.COM 65536 0 0" ]
+
+        for bad_dn in bad_dns:
+            bad_mx.append("%s 1" % bad_dn)
+            bad_srv.append("%s 0 0 0" % bad_dn)
+        for good_dn in good_dns:
+            good_mx.append("%s 1" % good_dn)
+            good_srv.append("%s 0 0 0" % good_dn)
+
+        self.good_records = {
+                "A":["192.168.0.1", "255.255.255.255"],
+                "AAAA":["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
+                        "0000:0000:0000:0000:0000:0000:0000:0000",
+                        "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
+                        "1234:1234:1234::",
+                        "1234:5678:9ABC:DEF0::",
+                        "0000:0000::0000",
+                        "1234::5678:9ABC:0000:0000:0000:0000",
+                        "::1",
+                        "::",
+                        "1:1:1:1:1:1:1:1"],
+                "PTR":good_dns,
+                "CNAME":good_dns,
+                "NS":good_dns,
+                "MX":good_mx,
+                "SRV":good_srv,
+                "TXT":["text", "", "@#!", "\n"]
+        }
+
+        self.bad_records = {
+                "A":["192.168.0.500",
+                     "255.255.255.255/32"],
+                "AAAA":["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
+                        "0000:0000:0000:0000:0000:0000:0000:0000/1",
+                        "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
+                        "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
+                        "1234:5678:9ABC:DEF0:1234:5678:9ABC",
+                        "1111::1111::1111"],
+                "PTR":bad_dns,
+                "CNAME":bad_dns,
+                "NS":bad_dns,
+                "MX":bad_mx,
+                "SRV":bad_srv
+        }
+
+    def tearDown(self):
+        self.deleteZone()
+        super(DnsCmdTestCase, self).tearDown()
+
+    def resetZone(self):
+        self.deleteZone()
+        self.addZone()
+
+    def addZone(self):
+        self.zone = "zone"
+        result, out, err = self.runsubcmd("dns",
+                                          "zonecreate",
+                                          os.environ["SERVER"],
+                                          self.zone,
+                                          self.creds_string)
+        self.assertCmdSuccess(result, out, err)
+
+    def deleteZone(self):
+        result, out, err = self.runsubcmd("dns",
+                                          "zonedelete",
+                                          os.environ["SERVER"],
+                                          self.zone,
+                                          self.creds_string)
+        self.assertCmdSuccess(result, out, err)
+
+    def get_record_from_db(self, zone_name, record_name):
+        zones = self.samdb.search(base="DC=DomainDnsZones,%s"
+                                  % self.samdb.get_default_basedn(),
+                                  scope=ldb.SCOPE_SUBTREE,
+                                  expression="(objectClass=dnsZone)",
+                                  attrs=["cn"])
+
+        for zone in zones:
+            if zone_name in str(zone.dn):
+                zone_dn = zone.dn
+                break
+
+        records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
+                                    expression="(objectClass=dnsNode)",
+                                    attrs=["dnsRecord"])
+
+        for old_packed_record in records:
+            if record_name in str(old_packed_record.dn):
+                return (old_packed_record.dn,
+                        ndr_unpack(dnsp.DnssrvRpcRecord,
+                                   old_packed_record["dnsRecord"][0]))
+
+    def test_rank_none(self):
+        record_str = "192.168.50.50"
+        record_type_str = "A"
+
+        result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
+                                          self.zone, "testrecord", record_type_str,
+                                          record_str, self.creds_string)
+        self.assertCmdSuccess(result, out, err,
+                              "Failed to add record '%s' with type %s."
+                              % (record_str, record_type_str))
+
+        dn, record = self.get_record_from_db(self.zone, "testrecord")
+        record.rank = 0 # DNS_RANK_NONE
+        res = self.samdb.dns_replace_by_dn(dn, [record])
+        if res is not None:
+            self.fail("Unable to update dns record to have DNS_RANK_NONE.")
+
+        errors = []
+
+        # The record should still exist
+        result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
+                                          self.zone, "testrecord", record_type_str,
+                                          self.creds_string)
+        try:
+            self.assertCmdSuccess(result, out, err,
+                                  "Failed to query for a record" \
+                                  "which had DNS_RANK_NONE.")
+            self.assertTrue("testrecord" in out and record_str in out,
+                            "Query for a record which had DNS_RANK_NONE" \
+                            "succeeded but produced no resulting records.")
+        except AssertionError, e:
+            # Windows produces no resulting records
+            pass
+
+        # We should not be able to add a duplicate
+        result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
+                                          self.zone, "testrecord", record_type_str,
+                                          record_str, self.creds_string)
+        try:
+            self.assertCmdFail(result, "Successfully added duplicate record" \
+                               "of one which had DNS_RANK_NONE.")
+        except AssertionError, e:
+            errors.append(e)
+
+        # We should be able to delete it
+        result, out, err = self.runsubcmd("dns", "delete", os.environ["SERVER"],
+                                          self.zone, "testrecord", record_type_str,
+                                          record_str, self.creds_string)
+        try:
+            self.assertCmdSuccess(result, out, err, "Failed to delete record" \
+                                  "which had DNS_RANK_NONE.")
+        except AssertionError, e:
+            errors.append(e)
+
+        # Now the record should not exist
+        result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
+                                          self.zone, "testrecord",
+                                          record_type_str, self.creds_string)
+        try:
+            self.assertCmdFail(result, "Successfully queried for deleted record" \
+                               "which had DNS_RANK_NONE.")
+        except AssertionError, e:
+            errors.append(e)
+
+        if len(errors) > 0:
+            err_str = "Failed appropriate behaviour with DNS_RANK_NONE:"
+            for error in errors:
+                err_str = err_str + "\n" + str(error)
+            raise AssertionError(err_str)
+
+    def test_accept_valid_commands(self):
+        """
+        For all good records, attempt to add, query and delete them.
+        """
+        num_failures = 0
+        failure_msgs = []
+        for dnstype in self.good_records:
+            for record in self.good_records[dnstype]:
+                try:
+                    result, out, err = self.runsubcmd("dns", "add",
+                                                      os.environ["SERVER"],
+                                                      self.zone, "testrecord",
+                                                      dnstype, record,
+                                                      self.creds_string)
+                    self.assertCmdSuccess(result, out, err, "Failed to add" \
+                                          "record %s with type %s."
+                                          % (record, dnstype))
+
+                    result, out, err = self.runsubcmd("dns", "query",
+                                                      os.environ["SERVER"],
+                                                      self.zone, "testrecord",
+                                                      dnstype,
+                                                      self.creds_string)
+                    self.assertCmdSuccess(result, out, err, "Failed to query" \
+                                          "record %s with qualifier %s."
+                                          % (record, dnstype))
+
+                    result, out, err = self.runsubcmd("dns", "delete",
+                                                      os.environ["SERVER"],
+                                                      self.zone, "testrecord",
+                                                      dnstype, record,
+                                                      self.creds_string)
+                    self.assertCmdSuccess(result, out, err, "Failed to remove" \
+                                          "record %s with type %s."
+                                          % (record, dnstype))
+                except AssertionError as e:
+                    num_failures = num_failures + 1
+                    failure_msgs.append(e)
+
+        if num_failures > 0:
+            for msg in failure_msgs:
+                print(msg)
+            self.fail("Failed to accept valid commands. %d total failures." \
+                      "Errors above." % num_failures)
+
+    def test_reject_invalid_commands(self):
+        """
+        For all bad records, attempt to add them and update to them,
+        making sure that both operations fail.
+        """
+        num_failures = 0
+        failure_msgs = []
+
+        # Add invalid records and make sure they fail to be added
+        for dnstype in self.bad_records:
+            for record in self.bad_records[dnstype]:
+                try:
+                    result, out, err = self.runsubcmd("dns", "add",
+                                                      os.environ["SERVER"],
+                                                      self.zone, "testrecord",
+                                                      dnstype, record,
+                                                      self.creds_string)
+                    self.assertCmdFail(result, "Successfully added invalid" \
+                                       "record '%s' of type '%s'."
+                                       % (record, dnstype))
+                except AssertionError as e:
+                    num_failures = num_failures + 1
+                    failure_msgs.append(e)
+                    self.resetZone()
+                try:
+                    result, out, err = self.runsubcmd("dns", "delete",
+                                                      os.environ["SERVER"],
+                                                      self.zone, "testrecord",
+                                                      dnstype, record,
+                                                      self.creds_string)
+                    self.assertCmdFail(result, "Successfully deleted invalid" \
+                                       "record '%s' of type '%s' which" \
+                                       "shouldn't exist." % (record, dnstype))
+                except AssertionError as e:
+                    num_failures = num_failures + 1
+                    failure_msgs.append(e)
+                    self.resetZone()
+
+        # Update valid records to invalid ones and make sure they
+        # fail to be updated
+        for dnstype in self.bad_records:
+            for bad_record in self.bad_records[dnstype]:
+                good_record = self.good_records[dnstype][0]
+
+                try:
+                    result, out, err = self.runsubcmd("dns", "add",
+                                                      os.environ["SERVER"],
+                                                      self.zone, "testrecord",
+                                                      dnstype, good_record,
+                                                      self.creds_string)
+                    self.assertCmdSuccess(result, out, err, "Failed to add " \
+                                          "record '%s' with type %s."
+                                          % (record, dnstype))
+
+                    result, out, err = self.runsubcmd("dns", "update",
+                                                      os.environ["SERVER"],
+                                                      self.zone, "testrecord",
+                                                      dnstype, good_record,
+                                                      bad_record,
+                                                      self.creds_string)
+                    self.assertCmdFail(result, "Successfully updated valid " \
+                                       "record '%s' of type '%s' to invalid " \
+                                       "record '%s' of the same type."
+                                       % (good_record, dnstype, bad_record))
+
+                    result, out, err = self.runsubcmd("dns", "delete",
+                                                      os.environ["SERVER"],
+                                                      self.zone, "testrecord",
+                                                      dnstype, good_record,
+                                                      self.creds_string)
+                    self.assertCmdSuccess(result, out, err, "Could not delete " \
+                                          "valid record '%s' of type '%s'."
+                                          % (good_record, dnstype))
+                except AssertionError as e:
+                    num_failures = num_failures + 1
+                    failure_msgs.append(e)
+                    self.resetZone()
+
+        if num_failures > 0:
+            for msg in failure_msgs:
+                print(msg)
+            self.fail("Failed to reject invalid commands. %d total failures. " \
+                      "Errors above." % num_failures)
+
+    def test_update_invalid_type(self):
+        """
+        Make sure that a record can't be updated to one of a different type.
+        """
+        for dnstype1 in self.good_records:
+            record1 = self.good_records[dnstype1][0]
+            result, out, err = self.runsubcmd("dns", "add",
+                                              os.environ["SERVER"],
+                                              self.zone, "testrecord",
+                                              dnstype1, record1,
+                                              self.creds_string)
+            self.assertCmdSuccess(result, out, err, "Failed to add " \
+                                  "record %s with type %s."
+                                  % (record1, dnstype1))
+
+            for dnstype2 in self.good_records:
+                record2 = self.good_records[dnstype2][0]
+
+                # Make sure that record2 isn't a valid entry of dnstype1.
+                # For example, any A-type will also be a valid TXT-type.
+                result, out, err = self.runsubcmd("dns", "add",
+                                                  os.environ["SERVER"],
+                                                  self.zone, "testrecord",
+                                                  dnstype1, record2,
+                                                  self.creds_string)
+                try:
+                    self.assertCmdFail(result)
+                except AssertionError:
+                    continue # Don't check this one, because record2 _is_ a valid entry of dnstype1.
+
+                # Check both ways: Give the current type and try to update,
+                # and give the new type and try to update.
+                result, out, err = self.runsubcmd("dns", "update",
+                                                  os.environ["SERVER"],
+                                                  self.zone, "testrecord",
+                                                  dnstype1, record1,
+                                                  record2, self.creds_string)
+                self.assertCmdFail(result, "Successfully updated record '%s' " \
+                                   "to '%s', even though the latter is of " \
+                                   "type '%s' where '%s' was expected."
+                                   % (record1, record2, dnstype2, dnstype1))
+
+                result, out, err = self.runsubcmd("dns", "update",
+                                                  os.environ["SERVER"],
+                                                  self.zone, "testrecord",
+                                                  dnstype2, record1, record2,
+                                                  self.creds_string)
+                self.assertCmdFail(result, "Successfully updated record " \
+                                   "'%s' to '%s', even though the former " \
+                                   "is of type '%s' where '%s' was expected."
+                                   % (record1, record2, dnstype1, dnstype2))
+
+
+    def test_update_valid_type(self):
+        for dnstype in self.good_records:
+            for record in self.good_records[dnstype]:
+                result, out, err = self.runsubcmd("dns", "add",
+                                                  os.environ["SERVER"],
+                                                  self.zone, "testrecord",
+                                                  dnstype, record,
+                                                  self.creds_string)
+                self.assertCmdSuccess(result, out, err, "Failed to add " \
+                                      "record %s with type %s."
+                                      % (record, dnstype))
+
+                # Update the record to be the same.
+                result, out, err = self.runsubcmd("dns", "update",
+                                                  os.environ["SERVER"],
+                                                  self.zone, "testrecord",
+                                                  dnstype, record, record,
+                                                  self.creds_string)
+                self.assertCmdFail(result, "Successfully updated record " \
+                                   "'%s' to be exactly the same." % record)
+
+                result, out, err = self.runsubcmd("dns", "delete",
+                                                  os.environ["SERVER"],
+                                                  self.zone, "testrecord",
+                                                  dnstype, record,
+                                                  self.creds_string)
+                self.assertCmdSuccess(result, out, err, "Could not delete " \
+                                      "valid record '%s' of type '%s'."
+                                      % (record, dnstype))
+
+        for record in self.good_records["SRV"]:
+            result, out, err = self.runsubcmd("dns", "add",
+                                              os.environ["SERVER"],
+                                              self.zone, "testrecord",
+                                              "SRV", record,
+                                              self.creds_string)
+            self.assertCmdSuccess(result, out, err, "Failed to add " \
+                                  "record %s with type 'SRV'." % record)
+
+            split = record.split(' ')
+            new_bit = str(int(split[3]) + 1)
+            new_record = '%s %s %s %s' % (split[0], split[1], split[2], new_bit)
+
+            result, out, err = self.runsubcmd("dns", "update",
+                                              os.environ["SERVER"],
+                                              self.zone, "testrecord",
+                                              "SRV", record,
+                                              new_record, self.creds_string)
+            self.assertCmdSuccess(result, out, err, "Failed to update record " \
+                                  "'%s' of type '%s' to '%s'."
+                                  % (record, "SRV", new_record))
+
+            result, out, err = self.runsubcmd("dns", "query",
+                                              os.environ["SERVER"],
+                                              self.zone, "testrecord",
+                                              "SRV", self.creds_string)
+            self.assertCmdSuccess(result, out, err, "Failed to query for " \
+                                  "record '%s' of type '%s'."
+                                  % (new_record, "SRV"))
+
+            result, out, err = self.runsubcmd("dns", "delete",
+                                              os.environ["SERVER"],
+                                              self.zone, "testrecord",
+                                              "SRV", new_record,
+                                              self.creds_string)
+            self.assertCmdSuccess(result, out, err, "Could not delete " \
+                                  "valid record '%s' of type '%s'."
+                                  % (new_record, "SRV"))
+
+        # Since 'dns update' takes the current value as a parameter, make sure
+        # we can't enter the wrong current value for a given record.
+        for dnstype in self.good_records:
+            if len(self.good_records[dnstype]) < 3:
+                continue # Not enough records of this type to do this test
+
+            used_record = self.good_records[dnstype][0]
+            unused_record = self.good_records[dnstype][1]
+            new_record = self.good_records[dnstype][2]
+
+            result, out, err = self.runsubcmd("dns", "add",
+                                              os.environ["SERVER"],
+                                              self.zone, "testrecord",
+                                              dnstype, used_record,
+                                              self.creds_string)
+            self.assertCmdSuccess(result, out, err, "Failed to add record %s " \
+                                  "with type %s." % (used_record, dnstype))
+
+            result, out, err = self.runsubcmd("dns", "update",
+                                              os.environ["SERVER"],
+                                              self.zone, "testrecord",
+                                              dnstype, unused_record,
+                                              new_record,
+                                              self.creds_string)
+            self.assertCmdFail(result, "Successfully updated record '%s' " \
+                               "from '%s' to '%s', even though the given " \
+                               "source record is incorrect."
+                               % (used_record, unused_record, new_record))
+
+    def test_invalid_types(self):
+        result, out, err = self.runsubcmd("dns", "add",
+                                          os.environ["SERVER"],
+                                          self.zone, "testrecord",
+                                          "SOA", "test",
+                                          self.creds_string)
+        self.assertCmdFail(result, "Successfully added record of type SOA, " \
+                           "when this type should not be available.")
+        self.assertTrue("type SOA is not supported" in err,
+                        "Invalid error message '%s' when attempting to " \
+                        "add record of type SOA." % err)
+
+    def test_query_deleted_record(self):
+        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+                       "testrecord", "A", self.testip, self.creds_string)
+        self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
+                       "testrecord", "A", self.testip, self.creds_string)
+
+        result, out, err = self.runsubcmd("dns", "query",
+                                          os.environ["SERVER"],
+                                          self.zone, "testrecord",
+                                          "A", self.creds_string)
+        self.assertCmdFail(result)
+
+    def test_remove_deleted_record(self):
+        self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
+                       "testrecord", "A", self.testip, self.creds_string)
+        self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
+                       "testrecord", "A", self.testip, self.creds_string)
+
+        # Attempting to delete a record that has already been deleted or has never existed should fail
+        result, out, err = self.runsubcmd("dns", "delete",
+                                          os.environ["SERVER"],
+                                          self.zone, "testrecord",
+                                          "A", self.testip, self.creds_string)
+        self.assertCmdFail(result)
+        result, out, err = self.runsubcmd("dns", "query",
+                                          os.environ["SERVER"],
+                                          self.zone, "testrecord",
+                                          "A", self.creds_string)
+        self.assertCmdFail(result)
+        result, out, err = self.runsubcmd("dns", "delete",
+                                          os.environ["SERVER"],
+                                          self.zone, "testrecord2",
+                                          "A", self.testip, self.creds_string)
+        self.assertCmdFail(result)