From: Samuel Cabrero Date: Tue, 16 Dec 2014 17:04:13 +0000 (+0100) Subject: dns.py: Test dns server reload zones from DSDB when are created or deleted X-Git-Tag: ldb-1.1.20~167 X-Git-Url: http://git.samba.org/samba.git/?p=kai%2Fsamba-autobuild%2F.git;a=commitdiff_plain;h=336ffb29b50298a0597c15b9f60416adb745bc3d dns.py: Test dns server reload zones from DSDB when are created or deleted Signed-off-by: Samuel Cabrero Reviewed-by: Andrew Bartlett Reviewed-by: Garming Sam --- diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py index 34edf6b66e6..ac946af4591 100644 --- a/python/samba/tests/dns.py +++ b/python/samba/tests/dns.py @@ -21,7 +21,9 @@ import random import socket import samba.ndr as ndr import samba.dcerpc.dns as dns +from samba import credentials, param from samba.tests import TestCase +from samba.dcerpc import dnsp, dnsserver FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)]) @@ -864,6 +866,82 @@ class TestInvalidQueries(DNSTest): if s is not None: s.close() +class TestZones(DNSTest): + def get_loadparm(self): + lp = param.LoadParm() + lp.load(os.getenv("SMB_CONF_PATH")) + return lp + + def get_credentials(self, lp): + creds = credentials.Credentials() + creds.guess(lp) + creds.set_machine_account(lp) + creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE) + return creds + + def setUp(self): + super(TestZones, self).setUp() + self.lp = self.get_loadparm() + self.creds = self.get_credentials(self.lp) + self.server = os.getenv("SERVER_IP") + self.zone = "test.lan" + self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server), + self.lp, self.creds) + + def create_zone(self, zone): + zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() + zone_create.pszZoneName = zone + zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY + zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE + zone_create.fAging = 0 + zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT + self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + None, + 0, + 'ZoneCreate', + dnsserver.DNSSRV_TYPEID_ZONE_CREATE, + zone_create) + + def delete_zone(self, zone): + self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + zone, + 0, + 'DeleteZoneFromDs', + dnsserver.DNSSRV_TYPEID_NULL, + None) + + def test_soa_query(self): + zone = "test.lan" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + questions.append(q) + self.finish_name_packet(p, questions) + + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 0) + + self.create_zone(zone) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 1) + self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA) + + self.delete_zone(zone) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 0) + + if __name__ == "__main__": import unittest