1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@catalyst.net.nz>
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from samba.auth import system_session
22 from samba.samdb import SamDB
23 from samba.ndr import ndr_unpack, ndr_pack
24 from samba.dcerpc import dnsp
25 from samba.tests.samba_tool.base import SambaToolCmdTest
28 class DnsCmdTestCase(SambaToolCmdTest):
30 super(DnsCmdTestCase, self).setUp()
32 self.dburl = "ldap://%s" % os.environ["SERVER"]
33 self.creds_string = "-U%s%%%s" % (os.environ["DC_USERNAME"],
34 os.environ["DC_PASSWORD"])
36 self.samdb = self.getSamDB("-H", self.dburl, self.creds_string)
37 self.config_dn = str(self.samdb.get_config_basedn())
39 self.testip = "192.168.0.193"
40 self.testip2 = "192.168.0.194"
44 # Note: SOA types don't work (and shouldn't), as we only have one zone per DNS record.
46 good_dns = ["SAMDOM.EXAMPLE.COM",
48 "%sEXAMPLE.COM" % ("1." * 100),
59 "SAMDOM..EXAMPLE.COM"]
61 good_mx = ["SAMDOM.EXAMPLE.COM 65530"]
62 bad_mx = ["SAMDOM.EXAMPLE.COM -1",
65 "SAMDOM.EXAMPLE.COM 1 1",
66 "SAMDOM.EXAMPLE.COM SAMDOM.EXAMPLE.COM"]
68 good_srv = ["SAMDOM.EXAMPLE.COM 65530 65530 65530"]
69 bad_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
70 "SAMDOM.EXAMPLE.COM 0 0 65536",
71 "SAMDOM.EXAMPLE.COM 65536 0 0"]
73 for bad_dn in bad_dns:
74 bad_mx.append("%s 1" % bad_dn)
75 bad_srv.append("%s 0 0 0" % bad_dn)
76 for good_dn in good_dns:
77 good_mx.append("%s 1" % good_dn)
78 good_srv.append("%s 0 0 0" % good_dn)
81 "A":["192.168.0.1", "255.255.255.255"],
82 "AAAA":["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
83 "0000:0000:0000:0000:0000:0000:0000:0000",
84 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
86 "1234:5678:9ABC:DEF0::",
88 "1234::5678:9ABC:0000:0000:0000:0000",
97 "TXT": ["text", "", "@#!", "\n"]
101 "A":["192.168.0.500",
102 "255.255.255.255/32"],
103 "AAAA":["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
104 "0000:0000:0000:0000:0000:0000:0000:0000/1",
105 "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
106 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
107 "1234:5678:9ABC:DEF0:1234:5678:9ABC",
118 super(DnsCmdTestCase, self).tearDown()
126 result, out, err = self.runsubcmd("dns",
128 os.environ["SERVER"],
131 self.assertCmdSuccess(result, out, err)
133 def deleteZone(self):
134 result, out, err = self.runsubcmd("dns",
136 os.environ["SERVER"],
139 self.assertCmdSuccess(result, out, err)
141 def get_record_from_db(self, zone_name, record_name):
142 zones = self.samdb.search(base="DC=DomainDnsZones,%s"
143 % self.samdb.get_default_basedn(),
144 scope=ldb.SCOPE_SUBTREE,
145 expression="(objectClass=dnsZone)",
149 if zone_name in str(zone.dn):
153 records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
154 expression="(objectClass=dnsNode)",
157 for old_packed_record in records:
158 if record_name in str(old_packed_record.dn):
159 return (old_packed_record.dn,
160 ndr_unpack(dnsp.DnssrvRpcRecord,
161 old_packed_record["dnsRecord"][0]))
163 def test_rank_none(self):
164 record_str = "192.168.50.50"
165 record_type_str = "A"
167 result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
168 self.zone, "testrecord", record_type_str,
169 record_str, self.creds_string)
170 self.assertCmdSuccess(result, out, err,
171 "Failed to add record '%s' with type %s."
172 % (record_str, record_type_str))
174 dn, record = self.get_record_from_db(self.zone, "testrecord")
175 record.rank = 0 # DNS_RANK_NONE
176 res = self.samdb.dns_replace_by_dn(dn, [record])
178 self.fail("Unable to update dns record to have DNS_RANK_NONE.")
182 # The record should still exist
183 result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
184 self.zone, "testrecord", record_type_str,
187 self.assertCmdSuccess(result, out, err,
188 "Failed to query for a record" \
189 "which had DNS_RANK_NONE.")
190 self.assertTrue("testrecord" in out and record_str in out,
191 "Query for a record which had DNS_RANK_NONE" \
192 "succeeded but produced no resulting records.")
193 except AssertionError as e:
194 # Windows produces no resulting records
197 # We should not be able to add a duplicate
198 result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
199 self.zone, "testrecord", record_type_str,
200 record_str, self.creds_string)
202 self.assertCmdFail(result, "Successfully added duplicate record" \
203 "of one which had DNS_RANK_NONE.")
204 except AssertionError as e:
207 # We should be able to delete it
208 result, out, err = self.runsubcmd("dns", "delete", os.environ["SERVER"],
209 self.zone, "testrecord", record_type_str,
210 record_str, self.creds_string)
212 self.assertCmdSuccess(result, out, err, "Failed to delete record" \
213 "which had DNS_RANK_NONE.")
214 except AssertionError as e:
217 # Now the record should not exist
218 result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
219 self.zone, "testrecord",
220 record_type_str, self.creds_string)
222 self.assertCmdFail(result, "Successfully queried for deleted record" \
223 "which had DNS_RANK_NONE.")
224 except AssertionError as e:
228 err_str = "Failed appropriate behaviour with DNS_RANK_NONE:"
230 err_str = err_str + "\n" + str(error)
231 raise AssertionError(err_str)
233 def test_accept_valid_commands(self):
235 For all good records, attempt to add, query and delete them.
239 for dnstype in self.good_records:
240 for record in self.good_records[dnstype]:
242 result, out, err = self.runsubcmd("dns", "add",
243 os.environ["SERVER"],
244 self.zone, "testrecord",
247 self.assertCmdSuccess(result, out, err, "Failed to add" \
248 "record %s with type %s."
251 result, out, err = self.runsubcmd("dns", "query",
252 os.environ["SERVER"],
253 self.zone, "testrecord",
256 self.assertCmdSuccess(result, out, err, "Failed to query" \
257 "record %s with qualifier %s."
260 result, out, err = self.runsubcmd("dns", "delete",
261 os.environ["SERVER"],
262 self.zone, "testrecord",
265 self.assertCmdSuccess(result, out, err, "Failed to remove" \
266 "record %s with type %s."
268 except AssertionError as e:
269 num_failures = num_failures + 1
270 failure_msgs.append(e)
273 for msg in failure_msgs:
275 self.fail("Failed to accept valid commands. %d total failures." \
276 "Errors above." % num_failures)
278 def test_reject_invalid_commands(self):
280 For all bad records, attempt to add them and update to them,
281 making sure that both operations fail.
286 # Add invalid records and make sure they fail to be added
287 for dnstype in self.bad_records:
288 for record in self.bad_records[dnstype]:
290 result, out, err = self.runsubcmd("dns", "add",
291 os.environ["SERVER"],
292 self.zone, "testrecord",
295 self.assertCmdFail(result, "Successfully added invalid" \
296 "record '%s' of type '%s'."
298 except AssertionError as e:
299 num_failures = num_failures + 1
300 failure_msgs.append(e)
303 result, out, err = self.runsubcmd("dns", "delete",
304 os.environ["SERVER"],
305 self.zone, "testrecord",
308 self.assertCmdFail(result, "Successfully deleted invalid" \
309 "record '%s' of type '%s' which" \
310 "shouldn't exist." % (record, dnstype))
311 except AssertionError as e:
312 num_failures = num_failures + 1
313 failure_msgs.append(e)
316 # Update valid records to invalid ones and make sure they
318 for dnstype in self.bad_records:
319 for bad_record in self.bad_records[dnstype]:
320 good_record = self.good_records[dnstype][0]
323 result, out, err = self.runsubcmd("dns", "add",
324 os.environ["SERVER"],
325 self.zone, "testrecord",
326 dnstype, good_record,
328 self.assertCmdSuccess(result, out, err, "Failed to add " \
329 "record '%s' with type %s."
332 result, out, err = self.runsubcmd("dns", "update",
333 os.environ["SERVER"],
334 self.zone, "testrecord",
335 dnstype, good_record,
338 self.assertCmdFail(result, "Successfully updated valid " \
339 "record '%s' of type '%s' to invalid " \
340 "record '%s' of the same type."
341 % (good_record, dnstype, bad_record))
343 result, out, err = self.runsubcmd("dns", "delete",
344 os.environ["SERVER"],
345 self.zone, "testrecord",
346 dnstype, good_record,
348 self.assertCmdSuccess(result, out, err, "Could not delete " \
349 "valid record '%s' of type '%s'."
350 % (good_record, dnstype))
351 except AssertionError as e:
352 num_failures = num_failures + 1
353 failure_msgs.append(e)
357 for msg in failure_msgs:
359 self.fail("Failed to reject invalid commands. %d total failures. " \
360 "Errors above." % num_failures)
362 def test_update_invalid_type(self):
364 Make sure that a record can't be updated to one of a different type.
366 for dnstype1 in self.good_records:
367 record1 = self.good_records[dnstype1][0]
368 result, out, err = self.runsubcmd("dns", "add",
369 os.environ["SERVER"],
370 self.zone, "testrecord",
373 self.assertCmdSuccess(result, out, err, "Failed to add " \
374 "record %s with type %s."
375 % (record1, dnstype1))
377 for dnstype2 in self.good_records:
378 record2 = self.good_records[dnstype2][0]
380 # Make sure that record2 isn't a valid entry of dnstype1.
381 # For example, any A-type will also be a valid TXT-type.
382 result, out, err = self.runsubcmd("dns", "add",
383 os.environ["SERVER"],
384 self.zone, "testrecord",
388 self.assertCmdFail(result)
389 except AssertionError:
390 continue # Don't check this one, because record2 _is_ a valid entry of dnstype1.
392 # Check both ways: Give the current type and try to update,
393 # and give the new type and try to update.
394 result, out, err = self.runsubcmd("dns", "update",
395 os.environ["SERVER"],
396 self.zone, "testrecord",
398 record2, self.creds_string)
399 self.assertCmdFail(result, "Successfully updated record '%s' " \
400 "to '%s', even though the latter is of " \
401 "type '%s' where '%s' was expected."
402 % (record1, record2, dnstype2, dnstype1))
404 result, out, err = self.runsubcmd("dns", "update",
405 os.environ["SERVER"],
406 self.zone, "testrecord",
407 dnstype2, record1, record2,
409 self.assertCmdFail(result, "Successfully updated record " \
410 "'%s' to '%s', even though the former " \
411 "is of type '%s' where '%s' was expected."
412 % (record1, record2, dnstype1, dnstype2))
414 def test_update_valid_type(self):
415 for dnstype in self.good_records:
416 for record in self.good_records[dnstype]:
417 result, out, err = self.runsubcmd("dns", "add",
418 os.environ["SERVER"],
419 self.zone, "testrecord",
422 self.assertCmdSuccess(result, out, err, "Failed to add " \
423 "record %s with type %s."
426 # Update the record to be the same.
427 result, out, err = self.runsubcmd("dns", "update",
428 os.environ["SERVER"],
429 self.zone, "testrecord",
430 dnstype, record, record,
432 self.assertCmdFail(result, "Successfully updated record " \
433 "'%s' to be exactly the same." % record)
435 result, out, err = self.runsubcmd("dns", "delete",
436 os.environ["SERVER"],
437 self.zone, "testrecord",
440 self.assertCmdSuccess(result, out, err, "Could not delete " \
441 "valid record '%s' of type '%s'."
444 for record in self.good_records["SRV"]:
445 result, out, err = self.runsubcmd("dns", "add",
446 os.environ["SERVER"],
447 self.zone, "testrecord",
450 self.assertCmdSuccess(result, out, err, "Failed to add " \
451 "record %s with type 'SRV'." % record)
453 split = record.split(' ')
454 new_bit = str(int(split[3]) + 1)
455 new_record = '%s %s %s %s' % (split[0], split[1], split[2], new_bit)
457 result, out, err = self.runsubcmd("dns", "update",
458 os.environ["SERVER"],
459 self.zone, "testrecord",
461 new_record, self.creds_string)
462 self.assertCmdSuccess(result, out, err, "Failed to update record " \
463 "'%s' of type '%s' to '%s'."
464 % (record, "SRV", new_record))
466 result, out, err = self.runsubcmd("dns", "query",
467 os.environ["SERVER"],
468 self.zone, "testrecord",
469 "SRV", self.creds_string)
470 self.assertCmdSuccess(result, out, err, "Failed to query for " \
471 "record '%s' of type '%s'."
472 % (new_record, "SRV"))
474 result, out, err = self.runsubcmd("dns", "delete",
475 os.environ["SERVER"],
476 self.zone, "testrecord",
479 self.assertCmdSuccess(result, out, err, "Could not delete " \
480 "valid record '%s' of type '%s'."
481 % (new_record, "SRV"))
483 # Since 'dns update' takes the current value as a parameter, make sure
484 # we can't enter the wrong current value for a given record.
485 for dnstype in self.good_records:
486 if len(self.good_records[dnstype]) < 3:
487 continue # Not enough records of this type to do this test
489 used_record = self.good_records[dnstype][0]
490 unused_record = self.good_records[dnstype][1]
491 new_record = self.good_records[dnstype][2]
493 result, out, err = self.runsubcmd("dns", "add",
494 os.environ["SERVER"],
495 self.zone, "testrecord",
496 dnstype, used_record,
498 self.assertCmdSuccess(result, out, err, "Failed to add record %s " \
499 "with type %s." % (used_record, dnstype))
501 result, out, err = self.runsubcmd("dns", "update",
502 os.environ["SERVER"],
503 self.zone, "testrecord",
504 dnstype, unused_record,
507 self.assertCmdFail(result, "Successfully updated record '%s' " \
508 "from '%s' to '%s', even though the given " \
509 "source record is incorrect."
510 % (used_record, unused_record, new_record))
512 def test_invalid_types(self):
513 result, out, err = self.runsubcmd("dns", "add",
514 os.environ["SERVER"],
515 self.zone, "testrecord",
518 self.assertCmdFail(result, "Successfully added record of type SOA, " \
519 "when this type should not be available.")
520 self.assertTrue("type SOA is not supported" in err,
521 "Invalid error message '%s' when attempting to " \
522 "add record of type SOA." % err)
524 def test_add_overlapping_different_type(self):
526 Make sure that we can add an entry with the same name as an existing one but a different type.
530 for dnstype1 in self.good_records:
531 record1 = self.good_records[dnstype1][0]
532 for dnstype2 in self.good_records:
533 # Only do some subset of dns types, otherwise it takes a long time.
538 if dnstype1 == dnstype2:
541 record2 = self.good_records[dnstype2][0]
543 result, out, err = self.runsubcmd("dns", "add",
544 os.environ["SERVER"],
545 self.zone, "testrecord",
548 self.assertCmdSuccess(result, out, err, "Failed to add record " \
549 "'%s' of type '%s'." % (record1, dnstype1))
551 result, out, err = self.runsubcmd("dns", "add",
552 os.environ["SERVER"],
553 self.zone, "testrecord",
556 self.assertCmdSuccess(result, out, err, "Failed to add record " \
557 "'%s' of type '%s' when a record '%s' " \
558 "of type '%s' with the same name exists."
559 % (record1, dnstype1, record2, dnstype2))
561 result, out, err = self.runsubcmd("dns", "query",
562 os.environ["SERVER"],
563 self.zone, "testrecord",
564 dnstype1, self.creds_string)
565 self.assertCmdSuccess(result, out, err, "Failed to query for " \
566 "record '%s' of type '%s' when a new " \
567 "record '%s' of type '%s' with the same " \
569 % (record1, dnstype1, record2, dnstype2))
571 result, out, err = self.runsubcmd("dns", "query",
572 os.environ["SERVER"],
573 self.zone, "testrecord",
574 dnstype2, self.creds_string)
575 self.assertCmdSuccess(result, out, err, "Failed to query " \
576 "record '%s' of type '%s' which should " \
577 "have been added with the same name as " \
578 "record '%s' of type '%s'."
579 % (record2, dnstype2, record1, dnstype1))
581 result, out, err = self.runsubcmd("dns", "delete",
582 os.environ["SERVER"],
583 self.zone, "testrecord",
586 self.assertCmdSuccess(result, out, err, "Failed to delete " \
587 "record '%s' of type '%s'."
588 % (record1, dnstype1))
590 result, out, err = self.runsubcmd("dns", "delete",
591 os.environ["SERVER"],
592 self.zone, "testrecord",
595 self.assertCmdSuccess(result, out, err, "Failed to delete " \
596 "record '%s' of type '%s'."
597 % (record2, dnstype2))
599 def test_query_deleted_record(self):
600 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
601 "testrecord", "A", self.testip, self.creds_string)
602 self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
603 "testrecord", "A", self.testip, self.creds_string)
605 result, out, err = self.runsubcmd("dns", "query",
606 os.environ["SERVER"],
607 self.zone, "testrecord",
608 "A", self.creds_string)
609 self.assertCmdFail(result)
611 def test_add_duplicate_record(self):
612 for record_type in self.good_records:
613 result, out, err = self.runsubcmd("dns", "add",
614 os.environ["SERVER"],
615 self.zone, "testrecord",
617 self.good_records[record_type][0],
619 self.assertCmdSuccess(result, out, err)
620 result, out, err = self.runsubcmd("dns", "add",
621 os.environ["SERVER"],
622 self.zone, "testrecord",
624 self.good_records[record_type][0],
626 self.assertCmdFail(result)
627 result, out, err = self.runsubcmd("dns", "query",
628 os.environ["SERVER"],
629 self.zone, "testrecord",
630 record_type, self.creds_string)
631 self.assertCmdSuccess(result, out, err)
632 result, out, err = self.runsubcmd("dns", "delete",
633 os.environ["SERVER"],
634 self.zone, "testrecord",
636 self.good_records[record_type][0],
638 self.assertCmdSuccess(result, out, err)
640 def test_remove_deleted_record(self):
641 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
642 "testrecord", "A", self.testip, self.creds_string)
643 self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
644 "testrecord", "A", self.testip, self.creds_string)
646 # Attempting to delete a record that has already been deleted or has never existed should fail
647 result, out, err = self.runsubcmd("dns", "delete",
648 os.environ["SERVER"],
649 self.zone, "testrecord",
650 "A", self.testip, self.creds_string)
651 self.assertCmdFail(result)
652 result, out, err = self.runsubcmd("dns", "query",
653 os.environ["SERVER"],
654 self.zone, "testrecord",
655 "A", self.creds_string)
656 self.assertCmdFail(result)
657 result, out, err = self.runsubcmd("dns", "delete",
658 os.environ["SERVER"],
659 self.zone, "testrecord2",
660 "A", self.testip, self.creds_string)
661 self.assertCmdFail(result)
663 def test_cleanup_record(self):
665 Test dns cleanup command is working fine.
669 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
670 'testa', "A", self.testip, self.creds_string)
672 # the above A record points to this host
673 dnshostname = '{}.{}'.format('testa', self.zone.lower())
675 # add a CNAME record points to above host
676 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
677 'testcname', "CNAME", dnshostname, self.creds_string)
680 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
681 'testns', "NS", dnshostname, self.creds_string)
683 # add a PTR record points to above host
684 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
685 'testptr', "PTR", dnshostname, self.creds_string)
687 # add a SRV record points to above host
688 srv_record = "{} 65530 65530 65530".format(dnshostname)
689 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
690 'testsrv', "SRV", srv_record, self.creds_string)
692 # cleanup record for this dns host
693 self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
694 dnshostname, self.creds_string)
696 # all records should be marked as dNSTombstoned
697 for record_name in ['testa', 'testcname', 'testns', 'testptr', 'testsrv']:
699 records = self.samdb.search(
700 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
701 scope=ldb.SCOPE_SUBTREE,
702 expression="(&(objectClass=dnsNode)(name={}))".format(record_name),
703 attrs=["dNSTombstoned"])
705 self.assertEqual(len(records), 1)
706 for record in records:
707 self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
709 def test_cleanup_record_no_A_record(self):
711 Test dns cleanup command works with no A record.
715 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
716 'notesta', "A", self.testip, self.creds_string)
718 # the above A record points to this host
719 dnshostname = '{}.{}'.format('testa', self.zone.lower())
721 # add a CNAME record points to above host
722 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
723 'notestcname', "CNAME", dnshostname, self.creds_string)
726 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
727 'notestns', "NS", dnshostname, self.creds_string)
729 # add a PTR record points to above host
730 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
731 'notestptr', "PTR", dnshostname, self.creds_string)
733 # add a SRV record points to above host
734 srv_record = "{} 65530 65530 65530".format(dnshostname)
735 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
736 'notestsrv', "SRV", srv_record, self.creds_string)
738 # Remove the initial A record (leading to hanging references)
739 self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
740 'notesta', "A", self.testip, self.creds_string)
742 # cleanup record for this dns host
743 self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
744 dnshostname, self.creds_string)
746 # all records should be marked as dNSTombstoned
747 for record_name in ['notestcname', 'notestns', 'notestptr', 'notestsrv']:
749 records = self.samdb.search(
750 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
751 scope=ldb.SCOPE_SUBTREE,
752 expression="(&(objectClass=dnsNode)(name={}))".format(record_name),
753 attrs=["dNSTombstoned"])
755 self.assertEqual(len(records), 1)
756 for record in records:
757 self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
759 def test_cleanup_multi_srv_record(self):
761 Test dns cleanup command for multi-valued SRV record.
764 - Add 2 A records host1 and host2
765 - Add a SRV record srv1 and points to both host1 and host2
766 - Run cleanup command for host1
767 - Check records for srv1, data for host1 should be gone and host2 is kept.
770 hosts = ['host1', 'host2'] # A record names
775 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
776 host, "A", self.testip, self.creds_string)
778 # the above A record points to this host
779 dnshostname = '{}.{}'.format(host, self.zone.lower())
781 # add a SRV record points to above host
782 srv_record = "{} 65530 65530 65530".format(dnshostname)
783 self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
784 srv_name, "SRV", srv_record, self.creds_string)
786 records = self.samdb.search(
787 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
788 scope=ldb.SCOPE_SUBTREE,
789 expression="(&(objectClass=dnsNode)(name={}))".format(srv_name),
791 # should have 2 records here
792 self.assertEqual(len(records[0]['dnsRecord']), 2)
794 # cleanup record for dns host1
795 dnshostname1 = 'host1.{}'.format(self.zone.lower())
796 self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
797 dnshostname1, self.creds_string)
799 records = self.samdb.search(
800 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
801 scope=ldb.SCOPE_SUBTREE,
802 expression="(&(objectClass=dnsNode)(name={}))".format(srv_name),
803 attrs=['dnsRecord', 'dNSTombstoned'])
805 # dnsRecord for host1 should be deleted
806 self.assertEqual(len(records[0]['dnsRecord']), 1)
809 dns_record_bin = records[0]['dnsRecord'][0]
810 dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
812 # dnsRecord for host2 is still there and is the only one
813 dnshostname2 = 'host2.{}'.format(self.zone.lower())
814 self.assertEqual(dns_record_obj.data.nameTarget, dnshostname2)
816 # assert that the record isn't spuriously tombstoned
817 self.assertTrue('dNSTombstoned' not in records[0] or
818 str(record['dNSTombstoned']) == 'FALSE')
820 def test_dns_wildcards(self):
822 Ensure that DNS wild card entries can be added deleted and queried
826 records = [("*.", "MISS", "A", "1.1.1.1"),
827 ("*.SAMDOM", "MISS.SAMDOM", "A", "1.1.1.2")]
828 for (name, miss, dnstype, record) in records:
830 result, out, err = self.runsubcmd("dns", "add",
831 os.environ["SERVER"],
835 self.assertCmdSuccess(
839 ("Failed to add record %s (%s) with type %s."
840 % (name, record, dnstype)))
842 result, out, err = self.runsubcmd("dns", "query",
843 os.environ["SERVER"],
847 self.assertCmdSuccess(
851 ("Failed to query record %s with qualifier %s."
852 % (record, dnstype)))
854 # dns tool does not perform dns wildcard search if the name
856 result, out, err = self.runsubcmd("dns", "query",
857 os.environ["SERVER"],
863 ("Failed to query record %s with qualifier %s."
864 % (record, dnstype)))
866 result, out, err = self.runsubcmd("dns", "delete",
867 os.environ["SERVER"],
871 self.assertCmdSuccess(
875 ("Failed to remove record %s with type %s."
876 % (record, dnstype)))
877 except AssertionError as e:
878 num_failures = num_failures + 1
879 failure_msgs.append(e)
882 for msg in failure_msgs:
884 self.fail("Failed to accept valid commands. %d total failures."
885 "Errors above." % num_failures)