response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
+ def test_update_add_mx_record(self):
+ "test adding MX records works"
+ p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
+ updates = []
+
+ name = self.get_dns_domain()
+
+ u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
+ updates.append(u)
+ self.finish_name_packet(p, updates)
+
+ updates = []
+ r = dns.res_rec()
+ r.name = "%s" % self.get_dns_domain()
+ r.rr_type = dns.DNS_QTYPE_MX
+ r.rr_class = dns.DNS_QCLASS_IN
+ r.ttl = 900
+ r.length = 0xffff
+ rdata = dns.mx_record()
+ rdata.preference = 10
+ rdata.exchange = 'mail.%s' % self.get_dns_domain()
+ r.rdata = rdata
+ updates.append(r)
+ p.nscount = len(updates)
+ p.nsrecs = updates
+
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+
+ p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+ questions = []
+
+ name = "%s" % self.get_dns_domain()
+ q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
+ questions.append(q)
+
+ self.finish_name_packet(p, questions)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.assertEqual(response.ancount, 1)
+ self.assertEqual(response.answers[0].rdata.preference, 10)
+ self.assertEqual(response.answers[0].rdata.exchange, 'mail.%s' % self.get_dns_domain())
+
class TestComplexQueries(DNSTest):
case DNS_QTYPE_PTR:
ans[ai].rdata.ptr_record = talloc_strdup(ans, rec->data.ptr);
break;
+ case DNS_QTYPE_MX:
+ ans[ai].rdata.mx_record.preference = rec->data.mx.wPriority;
+ ans[ai].rdata.mx_record.exchange = talloc_strdup(
+ ans, rec->data.mx.nameTarget);
+ if (ans[ai].rdata.mx_record.exchange == NULL) {
+ return WERR_NOMEM;
+ }
+ break;
case DNS_QTYPE_TXT:
tmp = talloc_asprintf(ans, "\"%s\"", rec->data.txt.str[0]);
W_ERROR_HAVE_NO_MEMORY(tmp);