dns scavenging: Add extra tests for custom filter
authorGary Lockyer <gary@catalyst.net.nz>
Thu, 2 Aug 2018 02:52:16 +0000 (14:52 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Mon, 6 Aug 2018 03:36:42 +0000 (05:36 +0200)
Add extra tests for the custom ldb filter used by the dns scavenging
code.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Autobuild-User(master): Andrew Bartlett <abartlet@samba.org>
Autobuild-Date(master): Mon Aug  6 05:36:43 CEST 2018 on sn-devel-144

python/samba/tests/dns.py
selftest/knownfail.d/dns

index 6771e3bb8c4be6cb6edb223328555231a434dfb6..0878054e5655287a8ca04c4abc3836a08cc17ac6 100644 (file)
@@ -22,6 +22,7 @@ from samba.ndr import ndr_unpack, ndr_pack
 from samba.samdb import SamDB
 from samba.auth import system_session
 import ldb
+from ldb import ERR_OPERATIONS_ERROR
 import os
 import sys
 import struct
@@ -35,6 +36,8 @@ from samba import werror, WERRORError
 from samba.tests.dns_base import DNSTest
 import samba.getopt as options
 import optparse
+import samba.dcerpc.dnsp
+
 
 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
 sambaopts = options.SambaOptions(parser)
@@ -941,7 +944,6 @@ class TestZones(DNSTest):
                            lp=self.get_loadparm(),
                            session_info=system_session(),
                            credentials=self.creds)
-
         self.zone_dn = "DC=" + self.zone +\
                        ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
                        str(self.samdb.get_default_basedn())
@@ -1018,6 +1020,15 @@ class TestZones(DNSTest):
         self.assertEqual(len(recs), 1)
         return recs[0]
 
+    def dns_tombstone(self, prefix, txt, zone):
+        name = prefix + "." + zone
+
+        to = dnsp.DnssrvRpcRecord()
+        to.dwTimeStamp = 1000
+        to.wType = dnsp.DNS_TYPE_TOMBSTONE
+
+        self.samdb.dns_replace(name, [to])
+
     def ldap_get_records(self, name):
         # The use of SCOPE_SUBTREE here avoids raising an exception in the
         # 0 results case for a test below.
@@ -1206,19 +1217,22 @@ class TestZones(DNSTest):
         name, txt = 'agingtest', ['test txt']
         name2, txt2 = 'agingtest2', ['test txt2']
         name3, txt3 = 'agingtest3', ['test txt3']
+        name4, txt4 = 'agingtest4', ['test txt4']
+        name5, txt5 = 'agingtest5', ['test txt5']
+
         self.create_zone(self.zone, aging_enabled=True)
         interval = 10
         self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
                         Aging=1, zone=self.zone,
                         AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
 
-        self.dns_update_record(name, txt),
+        self.dns_update_record(name, txt)
 
-        self.dns_update_record(name2, txt),
-        self.dns_update_record(name2, txt2),
+        self.dns_update_record(name2, txt)
+        self.dns_update_record(name2, txt2)
 
-        self.dns_update_record(name3, txt),
-        self.dns_update_record(name3, txt2),
+        self.dns_update_record(name3, txt)
+        self.dns_update_record(name3, txt2)
         last_update = self.dns_update_record(name3, txt3)
 
         # Modify txt1 of the first 2 names
@@ -1228,6 +1242,22 @@ class TestZones(DNSTest):
         self.ldap_modify_dnsrecs(name, mod_ts)
         self.ldap_modify_dnsrecs(name2, mod_ts)
 
+        # create a static dns record.
+        rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        rec_buf.rec = TXTRecord(txt4)
+        self.rpc_conn.DnssrvUpdateRecord2(
+            dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+            0,
+            self.server_ip,
+            self.zone,
+            name4,
+            rec_buf,
+            None)
+
+        # Create a tomb stoned record.
+        self.dns_update_record(name5, txt5)
+        self.dns_tombstone(name5, txt5, self.zone)
+
         self.ldap_get_dns_records(name3)
         expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
         expr = expr.format(int(last_update.dwTimeStamp) - 1)
@@ -1239,15 +1269,101 @@ class TestZones(DNSTest):
         updated_names = {str(r.get('name')) for r in res}
         self.assertEqual(updated_names, set([name, name2]))
 
+    def test_dns_tombstone_custom_match_rule_no_records(self):
+        lp = self.get_loadparm()
+        self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
+                           session_info=system_session(),
+                           credentials=self.creds)
+
+        self.create_zone(self.zone, aging_enabled=True)
+        interval = 10
+        self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
+                        Aging=1, zone=self.zone,
+                        AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
+
+        expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
+        expr = expr.format(1)
+
+        try:
+            res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
+                                    expression=expr, attrs=["*"])
+        except ldb.LdbError as e:
+            self.fail(str(e))
+        self.assertEqual(0, len(res))
+
     def test_dns_tombstone_custom_match_rule_fail(self):
         self.create_zone(self.zone, aging_enabled=True)
+        samdb = SamDB(url=lp.samdb_url(),
+                      lp=lp,
+                      session_info=system_session(),
+                      credentials=self.creds)
 
-        # The check here is that this does not blow up on silly input
+        # Property name in not dnsRecord
         expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
-        res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
-                                expression=expr, attrs=["*"])
+        res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
+                           expression=expr, attrs=["*"])
         self.assertEquals(len(res), 0)
 
+        # No value for tombstone time
+        try:
+            expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
+            res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
+                               expression=expr, attrs=["*"])
+            self.assertEquals(len(res), 0)
+            self.fail("Exception: ldb.ldbError not generated")
+        except ldb.LdbError as e:
+            (num, msg) = e.args
+            self.assertEquals(num, ERR_OPERATIONS_ERROR)
+
+        # Tombstone time = -
+        try:
+            expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
+            res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
+                               expression=expr, attrs=["*"])
+            self.assertEquals(len(res), 0)
+            self.fail("Exception: ldb.ldbError not generated")
+        except ldb.LdbError as e:
+            (num, _) = e.args
+            self.assertEquals(num, ERR_OPERATIONS_ERROR)
+
+        # Tombstone time longer than 64 characters
+        try:
+            expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
+            expr = expr.format("1" * 65)
+            res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
+                               expression=expr, attrs=["*"])
+            self.assertEquals(len(res), 0)
+            self.fail("Exception: ldb.ldbError not generated")
+        except ldb.LdbError as e:
+            (num, _) = e.args
+            self.assertEquals(num, ERR_OPERATIONS_ERROR)
+
+        # Non numeric Tombstone time
+        try:
+            expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
+            res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
+                               expression=expr, attrs=["*"])
+            self.assertEquals(len(res), 0)
+            self.fail("Exception: ldb.ldbError not generated")
+        except ldb.LdbError as e:
+            (num, _) = e.args
+            self.assertEquals(num, ERR_OPERATIONS_ERROR)
+
+        # Non system session
+        try:
+            db = SamDB(url="ldap://" + self.server_ip,
+                       lp=self.get_loadparm(),
+                       credentials=self.creds)
+
+            expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
+            res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
+                            expression=expr, attrs=["*"])
+            self.assertEquals(len(res), 0)
+            self.fail("Exception: ldb.ldbError not generated")
+        except ldb.LdbError as e:
+            (num, _) = e.args
+            self.assertEquals(num, ERR_OPERATIONS_ERROR)
+
     def test_basic_scavenging(self):
         lp = self.get_loadparm()
         self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
index 99b0f1d63a08e785414e3ac4afbea170f5f071b7..d23f5ebe21c3041f23953aca20cbd8e9b6173617 100644 (file)
@@ -45,6 +45,7 @@ samba.tests.dns.__main__.TestZones.test_aging_refresh\(rodc:local\)
 samba.tests.dns.__main__.TestZones.test_rpc_add_no_timestamp\(rodc:local\)
 samba.tests.dns.__main__.TestZones.test_basic_scavenging\(rodc:local\)
 samba.tests.dns.__main__.TestZones.test_dns_tombstone_custom_match_rule\(rodc:local\)
+samba.tests.dns.__main__.TestZones.test_dns_tombstone_custom_match_rule_no_records\(rodc:local\)
 samba.tests.dns.__main__.TestZones.test_dns_tombstone_custom_match_rule_fail\(rodc:local\)
 samba.tests.dns.__main__.TestZones.test_dynamic_record_static_update\(rodc:local\)
 samba.tests.dns.__main__.TestZones.test_static_record_dynamic_update\(rodc:local\)