zone_dn = None
for zone in zones:
- if zone_name in str(zone.dn):
+ if "DC=%s," % zone_name in str(zone.dn):
zone_dn = zone.dn
break
for old_packed_record in records:
if record_name in str(old_packed_record.dn):
- return (old_packed_record.dn, ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0]))
+ rec = ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0])
+ return (old_packed_record.dn, rec)
def test_duplicate_matching(self):
"""
self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
+ def check_params(self, wDataLength, rank, flags, dwTtlSeconds, dwReserved, data,
+ wType, dwTimeStamp=0, zone="zone", rec_name="testrecord"):
+ res = self.get_record_from_db(zone, rec_name)
+ self.assertIsNotNone(res, "Expected record %s but was not found over LDAP." % data)
+ (rec_dn, rec) = res
+ self.assertEqual(wDataLength, rec.wDataLength, "Unexpected data length for record %s. Got %s, expected %s." % (data, rec.wDataLength, wDataLength))
+ self.assertEqual(rank, rec.rank, "Unexpected rank for record %s. Got %s, expected %s." % (data, rec.rank, rank))
+ self.assertEqual(flags, rec.flags, "Unexpected flags for record %s. Got %s, expected %s." % (data, rec.flags, flags))
+ self.assertEqual(dwTtlSeconds, rec.dwTtlSeconds, "Unexpected time to live for record %s. Got %s, expected %s." % (data, rec.dwTtlSeconds, dwTtlSeconds))
+ self.assertEqual(dwReserved, rec.dwReserved, "Unexpected dwReserved for record %s. Got %s, expected %s." % (data, rec.dwReserved, dwReserved))
+ self.assertEqual(data.lower(), rec.data.lower(), "Unexpected data for record %s. Got %s, expected %s." % (data, rec.data.lower(), data.lower()))
+ self.assertEqual(wType, rec.wType, "Unexpected wType for record %s. Got %s, expected %s." % (data, rec.wType, wType))
+ self.assertEqual(dwTimeStamp, rec.dwTimeStamp, "Unexpected timestamp for record %s. Got %s, expected %s." % (data, rec.dwTimeStamp, dwTimeStamp))
+
+ def test_record_params(self):
+ """
+ Make sure that, when we add records to the database,
+ they're added with reasonable parameters.
+ """
+ self.add_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
+ self.check_params(4, 240, 0, 900, 0, "192.168.50.50", 1)
+ self.delete_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
+ self.add_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
+ self.check_params(16, 240, 0, 900, 0, "AAAA:AAAA:0000:0000:0000:0000:0000:0000", 28)
+ self.delete_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
+ self.add_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
+ self.check_params(13, 240, 0, 900, 0, "cnamedest", 5)
+ self.delete_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
+
def test_reject_invalid_commands(self):
"""
Make sure that we can't add a variety of invalid records,