import samba.dcerpc.dns as dns
from samba.tests import TestCase
+FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
+
+
class DNSTest(TestCase):
def errstr(self, errcode):
"Helper to get dns domain"
return os.getenv('REALM', 'example.com').lower()
- def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP')):
+ def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
"send a DNS query and read the reply"
s = None
try:
send_packet = ndr.ndr_pack(packet)
+ if dump:
+ print self.hexdump(send_packet)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
s.connect((host, 53))
s.send(send_packet, 0)
recv_packet = s.recv(2048, 0)
+ if dump:
+ print self.hexdump(recv_packet)
return ndr.ndr_unpack(dns.name_packet, recv_packet)
finally:
if s is not None:
if s is not None:
s.close()
+ def hexdump(self, src, length=8):
+ N=0; result=''
+ while src:
+ s,src = src[:length],src[length:]
+ hexa = ' '.join(["%02X"%ord(x) for x in s])
+ s = s.translate(FILTER)
+ result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
+ N+=length
+ return result
class TestSimpleQueries(DNSTest):
r.length = 0xffff
r.rdata = dns.mx_record()
r.rdata.preference = 10
- r.rdata.exchange = 'mail.%s' % self.get_dns_domain()
+ r.rdata.exchange = 'mail' #% self.get_dns_domain()
updates.append(r)
p.nscount = len(updates)
p.nsrecs = updates
- response = self.dns_transaction_udp(p)
+ print ndr.ndr_print(p)
+ response = self.dns_transaction_udp(p, dump=True)
+ print ndr.ndr_print(response)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.assertEqual(response.ancount, 1)
- self.assertEqual(response.answers[0].rdata.preference, 10)
- self.assertEqual(response.answers[0].rdata.exchange, 'mail.%s' % self.get_dns_domain())
+ ans = response.answers[0]
+ self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
+ self.assertEqual(ans.rdata.preference, 10)
+ self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
class TestComplexQueries(DNSTest):