import socket
import samba.ndr as ndr
import samba.dcerpc.dns as dns
+from samba import credentials, param
from samba.tests import TestCase
+from samba.dcerpc import dnsp, dnsserver
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
if s is not None:
s.close()
+class TestZones(DNSTest):
+ def get_loadparm(self):
+ lp = param.LoadParm()
+ lp.load(os.getenv("SMB_CONF_PATH"))
+ return lp
+
+ def get_credentials(self, lp):
+ creds = credentials.Credentials()
+ creds.guess(lp)
+ creds.set_machine_account(lp)
+ creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
+ return creds
+
+ def setUp(self):
+ super(TestZones, self).setUp()
+ self.lp = self.get_loadparm()
+ self.creds = self.get_credentials(self.lp)
+ self.server = os.getenv("SERVER_IP")
+ self.zone = "test.lan"
+ self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
+ self.lp, self.creds)
+
+ def create_zone(self, zone):
+ zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
+ zone_create.pszZoneName = zone
+ zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
+ zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
+ zone_create.fAging = 0
+ zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
+ self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0,
+ self.server,
+ None,
+ 0,
+ 'ZoneCreate',
+ dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
+ zone_create)
+
+ def delete_zone(self, zone):
+ self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0,
+ self.server,
+ zone,
+ 0,
+ 'DeleteZoneFromDs',
+ dnsserver.DNSSRV_TYPEID_NULL,
+ None)
+
+ def test_soa_query(self):
+ zone = "test.lan"
+ p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+ questions = []
+
+ q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
+ questions.append(q)
+ self.finish_name_packet(p, questions)
+
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
+ self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+ self.assertEquals(response.ancount, 0)
+
+ self.create_zone(zone)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+ self.assertEquals(response.ancount, 1)
+ self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
+
+ self.delete_zone(zone)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
+ self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+ self.assertEquals(response.ancount, 0)
+
+
if __name__ == "__main__":
import unittest