dns.py: Test dns server reload zones from DSDB when are created or deleted
authorSamuel Cabrero <samuelcabrero@kernevil.me>
Tue, 16 Dec 2014 17:04:13 +0000 (18:04 +0100)
committerGarming Sam <garming@samba.org>
Mon, 22 Dec 2014 04:57:08 +0000 (05:57 +0100)
Signed-off-by: Samuel Cabrero <samuelcabrero@kernevil.me>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
python/samba/tests/dns.py

index 34edf6b66e63ae0cd19dbf346e89d9ea93e56129..ac946af4591a9b148c34301cd68b1743d2d5d399 100644 (file)
@@ -21,7 +21,9 @@ import random
 import socket
 import samba.ndr as ndr
 import samba.dcerpc.dns as dns
+from samba import credentials, param
 from samba.tests import TestCase
+from samba.dcerpc import dnsp, dnsserver
 
 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
 
@@ -864,6 +866,82 @@ class TestInvalidQueries(DNSTest):
             if s is not None:
                 s.close()
 
+class TestZones(DNSTest):
+    def get_loadparm(self):
+        lp = param.LoadParm()
+        lp.load(os.getenv("SMB_CONF_PATH"))
+        return lp
+
+    def get_credentials(self, lp):
+        creds = credentials.Credentials()
+        creds.guess(lp)
+        creds.set_machine_account(lp)
+        creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
+        return creds
+
+    def setUp(self):
+        super(TestZones, self).setUp()
+        self.lp = self.get_loadparm()
+        self.creds = self.get_credentials(self.lp)
+        self.server = os.getenv("SERVER_IP")
+        self.zone = "test.lan"
+        self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
+                                            self.lp, self.creds)
+
+    def create_zone(self, zone):
+        zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
+        zone_create.pszZoneName = zone
+        zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
+        zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
+        zone_create.fAging = 0
+        zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
+        self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                       0,
+                                       self.server,
+                                       None,
+                                       0,
+                                       'ZoneCreate',
+                                       dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
+                                       zone_create)
+
+    def delete_zone(self, zone):
+        self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                       0,
+                                       self.server,
+                                       zone,
+                                       0,
+                                       'DeleteZoneFromDs',
+                                       dnsserver.DNSSRV_TYPEID_NULL,
+                                       None)
+
+    def test_soa_query(self):
+        zone = "test.lan"
+        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+        questions = []
+
+        q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
+        questions.append(q)
+        self.finish_name_packet(p, questions)
+
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
+        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+        self.assertEquals(response.ancount, 0)
+
+        self.create_zone(zone)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+        self.assertEquals(response.ancount, 1)
+        self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
+
+        self.delete_zone(zone)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
+        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+        self.assertEquals(response.ancount, 0)
+
+
 
 if __name__ == "__main__":
     import unittest