c204055389ec61daf8a2a4f06535284f9616016d
[samba.git] / python / samba / tests / samba_tool / dnscmd.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@catalyst.net.nz>
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import ldb
20
21 from samba.auth import system_session
22 from samba.samdb import SamDB
23 from samba.ndr import ndr_unpack, ndr_pack
24 from samba.dcerpc import dnsp
25 from samba.tests.samba_tool.base import SambaToolCmdTest
26
27
28 class DnsCmdTestCase(SambaToolCmdTest):
29     def setUp(self):
30         super(DnsCmdTestCase, self).setUp()
31
32         self.dburl = "ldap://%s" % os.environ["SERVER"]
33         self.creds_string = "-U%s%%%s" % (os.environ["DC_USERNAME"],
34                                           os.environ["DC_PASSWORD"])
35
36         self.samdb = self.getSamDB("-H", self.dburl, self.creds_string)
37         self.config_dn = str(self.samdb.get_config_basedn())
38
39         self.testip = "192.168.0.193"
40         self.testip2 = "192.168.0.194"
41
42         self.addZone()
43
44         # Note: SOA types don't work (and shouldn't), as we only have one zone per DNS record.
45
46         good_dns = ["SAMDOM.EXAMPLE.COM",
47                     "1.EXAMPLE.COM",
48                     "%sEXAMPLE.COM" % ("1." * 100),
49                     "EXAMPLE",
50                     "\n.COM",
51                     "!@#$%^&*()_",
52                     "HIGH\xFFBYTE",
53                     "@.EXAMPLE.COM",
54                     "."]
55         bad_dns = ["...",
56                    ".EXAMPLE.COM",
57                    ".EXAMPLE.",
58                    "",
59                    "SAMDOM..EXAMPLE.COM"]
60
61         good_mx = ["SAMDOM.EXAMPLE.COM 65530"]
62         bad_mx = ["SAMDOM.EXAMPLE.COM -1",
63                   "SAMDOM.EXAMPLE.COM",
64                   " ",
65                   "SAMDOM.EXAMPLE.COM 1 1",
66                   "SAMDOM.EXAMPLE.COM SAMDOM.EXAMPLE.COM"]
67
68         good_srv = ["SAMDOM.EXAMPLE.COM 65530 65530 65530"]
69         bad_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
70                    "SAMDOM.EXAMPLE.COM 0 0 65536",
71                    "SAMDOM.EXAMPLE.COM 65536 0 0"]
72
73         for bad_dn in bad_dns:
74             bad_mx.append("%s 1" % bad_dn)
75             bad_srv.append("%s 0 0 0" % bad_dn)
76         for good_dn in good_dns:
77             good_mx.append("%s 1" % good_dn)
78             good_srv.append("%s 0 0 0" % good_dn)
79
80         self.good_records = {
81                 "A":["192.168.0.1", "255.255.255.255"],
82                 "AAAA":["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
83                         "0000:0000:0000:0000:0000:0000:0000:0000",
84                         "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
85                         "1234:1234:1234::",
86                         "1234:5678:9ABC:DEF0::",
87                         "0000:0000::0000",
88                         "1234::5678:9ABC:0000:0000:0000:0000",
89                         "::1",
90                         "::",
91                         "1:1:1:1:1:1:1:1"],
92                 "PTR": good_dns,
93                 "CNAME": good_dns,
94                 "NS": good_dns,
95                 "MX": good_mx,
96                 "SRV": good_srv,
97                 "TXT": ["text", "", "@#!", "\n"]
98         }
99
100         self.bad_records = {
101                 "A":["192.168.0.500",
102                      "255.255.255.255/32"],
103                 "AAAA":["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
104                         "0000:0000:0000:0000:0000:0000:0000:0000/1",
105                         "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
106                         "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
107                         "1234:5678:9ABC:DEF0:1234:5678:9ABC",
108                         "1111::1111::1111"],
109                 "PTR": bad_dns,
110                 "CNAME": bad_dns,
111                 "NS": bad_dns,
112                 "MX": bad_mx,
113                 "SRV": bad_srv
114         }
115
116     def tearDown(self):
117         self.deleteZone()
118         super(DnsCmdTestCase, self).tearDown()
119
120     def resetZone(self):
121         self.deleteZone()
122         self.addZone()
123
124     def addZone(self):
125         self.zone = "zone"
126         result, out, err = self.runsubcmd("dns",
127                                           "zonecreate",
128                                           os.environ["SERVER"],
129                                           self.zone,
130                                           self.creds_string)
131         self.assertCmdSuccess(result, out, err)
132
133     def deleteZone(self):
134         result, out, err = self.runsubcmd("dns",
135                                           "zonedelete",
136                                           os.environ["SERVER"],
137                                           self.zone,
138                                           self.creds_string)
139         self.assertCmdSuccess(result, out, err)
140
141     def get_record_from_db(self, zone_name, record_name):
142         zones = self.samdb.search(base="DC=DomainDnsZones,%s"
143                                   % self.samdb.get_default_basedn(),
144                                   scope=ldb.SCOPE_SUBTREE,
145                                   expression="(objectClass=dnsZone)",
146                                   attrs=["cn"])
147
148         for zone in zones:
149             if zone_name in str(zone.dn):
150                 zone_dn = zone.dn
151                 break
152
153         records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
154                                     expression="(objectClass=dnsNode)",
155                                     attrs=["dnsRecord"])
156
157         for old_packed_record in records:
158             if record_name in str(old_packed_record.dn):
159                 return (old_packed_record.dn,
160                         ndr_unpack(dnsp.DnssrvRpcRecord,
161                                    old_packed_record["dnsRecord"][0]))
162
163     def test_rank_none(self):
164         record_str = "192.168.50.50"
165         record_type_str = "A"
166
167         result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
168                                           self.zone, "testrecord", record_type_str,
169                                           record_str, self.creds_string)
170         self.assertCmdSuccess(result, out, err,
171                               "Failed to add record '%s' with type %s."
172                               % (record_str, record_type_str))
173
174         dn, record = self.get_record_from_db(self.zone, "testrecord")
175         record.rank = 0  # DNS_RANK_NONE
176         res = self.samdb.dns_replace_by_dn(dn, [record])
177         if res is not None:
178             self.fail("Unable to update dns record to have DNS_RANK_NONE.")
179
180         errors = []
181
182         # The record should still exist
183         result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
184                                           self.zone, "testrecord", record_type_str,
185                                           self.creds_string)
186         try:
187             self.assertCmdSuccess(result, out, err,
188                                   "Failed to query for a record" \
189                                   "which had DNS_RANK_NONE.")
190             self.assertTrue("testrecord" in out and record_str in out,
191                             "Query for a record which had DNS_RANK_NONE" \
192                             "succeeded but produced no resulting records.")
193         except AssertionError as e:
194             # Windows produces no resulting records
195             pass
196
197         # We should not be able to add a duplicate
198         result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
199                                           self.zone, "testrecord", record_type_str,
200                                           record_str, self.creds_string)
201         try:
202             self.assertCmdFail(result, "Successfully added duplicate record" \
203                                "of one which had DNS_RANK_NONE.")
204         except AssertionError as e:
205             errors.append(e)
206
207         # We should be able to delete it
208         result, out, err = self.runsubcmd("dns", "delete", os.environ["SERVER"],
209                                           self.zone, "testrecord", record_type_str,
210                                           record_str, self.creds_string)
211         try:
212             self.assertCmdSuccess(result, out, err, "Failed to delete record" \
213                                   "which had DNS_RANK_NONE.")
214         except AssertionError as e:
215             errors.append(e)
216
217         # Now the record should not exist
218         result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
219                                           self.zone, "testrecord",
220                                           record_type_str, self.creds_string)
221         try:
222             self.assertCmdFail(result, "Successfully queried for deleted record" \
223                                "which had DNS_RANK_NONE.")
224         except AssertionError as e:
225             errors.append(e)
226
227         if len(errors) > 0:
228             err_str = "Failed appropriate behaviour with DNS_RANK_NONE:"
229             for error in errors:
230                 err_str = err_str + "\n" + str(error)
231             raise AssertionError(err_str)
232
233     def test_accept_valid_commands(self):
234         """
235         For all good records, attempt to add, query and delete them.
236         """
237         num_failures = 0
238         failure_msgs = []
239         for dnstype in self.good_records:
240             for record in self.good_records[dnstype]:
241                 try:
242                     result, out, err = self.runsubcmd("dns", "add",
243                                                       os.environ["SERVER"],
244                                                       self.zone, "testrecord",
245                                                       dnstype, record,
246                                                       self.creds_string)
247                     self.assertCmdSuccess(result, out, err, "Failed to add" \
248                                           "record %s with type %s."
249                                           % (record, dnstype))
250
251                     result, out, err = self.runsubcmd("dns", "query",
252                                                       os.environ["SERVER"],
253                                                       self.zone, "testrecord",
254                                                       dnstype,
255                                                       self.creds_string)
256                     self.assertCmdSuccess(result, out, err, "Failed to query" \
257                                           "record %s with qualifier %s."
258                                           % (record, dnstype))
259
260                     result, out, err = self.runsubcmd("dns", "delete",
261                                                       os.environ["SERVER"],
262                                                       self.zone, "testrecord",
263                                                       dnstype, record,
264                                                       self.creds_string)
265                     self.assertCmdSuccess(result, out, err, "Failed to remove" \
266                                           "record %s with type %s."
267                                           % (record, dnstype))
268                 except AssertionError as e:
269                     num_failures = num_failures + 1
270                     failure_msgs.append(e)
271
272         if num_failures > 0:
273             for msg in failure_msgs:
274                 print(msg)
275             self.fail("Failed to accept valid commands. %d total failures." \
276                       "Errors above." % num_failures)
277
278     def test_reject_invalid_commands(self):
279         """
280         For all bad records, attempt to add them and update to them,
281         making sure that both operations fail.
282         """
283         num_failures = 0
284         failure_msgs = []
285
286         # Add invalid records and make sure they fail to be added
287         for dnstype in self.bad_records:
288             for record in self.bad_records[dnstype]:
289                 try:
290                     result, out, err = self.runsubcmd("dns", "add",
291                                                       os.environ["SERVER"],
292                                                       self.zone, "testrecord",
293                                                       dnstype, record,
294                                                       self.creds_string)
295                     self.assertCmdFail(result, "Successfully added invalid" \
296                                        "record '%s' of type '%s'."
297                                        % (record, dnstype))
298                 except AssertionError as e:
299                     num_failures = num_failures + 1
300                     failure_msgs.append(e)
301                     self.resetZone()
302                 try:
303                     result, out, err = self.runsubcmd("dns", "delete",
304                                                       os.environ["SERVER"],
305                                                       self.zone, "testrecord",
306                                                       dnstype, record,
307                                                       self.creds_string)
308                     self.assertCmdFail(result, "Successfully deleted invalid" \
309                                        "record '%s' of type '%s' which" \
310                                        "shouldn't exist." % (record, dnstype))
311                 except AssertionError as e:
312                     num_failures = num_failures + 1
313                     failure_msgs.append(e)
314                     self.resetZone()
315
316         # Update valid records to invalid ones and make sure they
317         # fail to be updated
318         for dnstype in self.bad_records:
319             for bad_record in self.bad_records[dnstype]:
320                 good_record = self.good_records[dnstype][0]
321
322                 try:
323                     result, out, err = self.runsubcmd("dns", "add",
324                                                       os.environ["SERVER"],
325                                                       self.zone, "testrecord",
326                                                       dnstype, good_record,
327                                                       self.creds_string)
328                     self.assertCmdSuccess(result, out, err, "Failed to add " \
329                                           "record '%s' with type %s."
330                                           % (record, dnstype))
331
332                     result, out, err = self.runsubcmd("dns", "update",
333                                                       os.environ["SERVER"],
334                                                       self.zone, "testrecord",
335                                                       dnstype, good_record,
336                                                       bad_record,
337                                                       self.creds_string)
338                     self.assertCmdFail(result, "Successfully updated valid " \
339                                        "record '%s' of type '%s' to invalid " \
340                                        "record '%s' of the same type."
341                                        % (good_record, dnstype, bad_record))
342
343                     result, out, err = self.runsubcmd("dns", "delete",
344                                                       os.environ["SERVER"],
345                                                       self.zone, "testrecord",
346                                                       dnstype, good_record,
347                                                       self.creds_string)
348                     self.assertCmdSuccess(result, out, err, "Could not delete " \
349                                           "valid record '%s' of type '%s'."
350                                           % (good_record, dnstype))
351                 except AssertionError as e:
352                     num_failures = num_failures + 1
353                     failure_msgs.append(e)
354                     self.resetZone()
355
356         if num_failures > 0:
357             for msg in failure_msgs:
358                 print(msg)
359             self.fail("Failed to reject invalid commands. %d total failures. " \
360                       "Errors above." % num_failures)
361
362     def test_update_invalid_type(self):
363         """
364         Make sure that a record can't be updated to one of a different type.
365         """
366         for dnstype1 in self.good_records:
367             record1 = self.good_records[dnstype1][0]
368             result, out, err = self.runsubcmd("dns", "add",
369                                               os.environ["SERVER"],
370                                               self.zone, "testrecord",
371                                               dnstype1, record1,
372                                               self.creds_string)
373             self.assertCmdSuccess(result, out, err, "Failed to add " \
374                                   "record %s with type %s."
375                                   % (record1, dnstype1))
376
377             for dnstype2 in self.good_records:
378                 record2 = self.good_records[dnstype2][0]
379
380                 # Make sure that record2 isn't a valid entry of dnstype1.
381                 # For example, any A-type will also be a valid TXT-type.
382                 result, out, err = self.runsubcmd("dns", "add",
383                                                   os.environ["SERVER"],
384                                                   self.zone, "testrecord",
385                                                   dnstype1, record2,
386                                                   self.creds_string)
387                 try:
388                     self.assertCmdFail(result)
389                 except AssertionError:
390                     continue  # Don't check this one, because record2 _is_ a valid entry of dnstype1.
391
392                 # Check both ways: Give the current type and try to update,
393                 # and give the new type and try to update.
394                 result, out, err = self.runsubcmd("dns", "update",
395                                                   os.environ["SERVER"],
396                                                   self.zone, "testrecord",
397                                                   dnstype1, record1,
398                                                   record2, self.creds_string)
399                 self.assertCmdFail(result, "Successfully updated record '%s' " \
400                                    "to '%s', even though the latter is of " \
401                                    "type '%s' where '%s' was expected."
402                                    % (record1, record2, dnstype2, dnstype1))
403
404                 result, out, err = self.runsubcmd("dns", "update",
405                                                   os.environ["SERVER"],
406                                                   self.zone, "testrecord",
407                                                   dnstype2, record1, record2,
408                                                   self.creds_string)
409                 self.assertCmdFail(result, "Successfully updated record " \
410                                    "'%s' to '%s', even though the former " \
411                                    "is of type '%s' where '%s' was expected."
412                                    % (record1, record2, dnstype1, dnstype2))
413
414     def test_update_valid_type(self):
415         for dnstype in self.good_records:
416             for record in self.good_records[dnstype]:
417                 result, out, err = self.runsubcmd("dns", "add",
418                                                   os.environ["SERVER"],
419                                                   self.zone, "testrecord",
420                                                   dnstype, record,
421                                                   self.creds_string)
422                 self.assertCmdSuccess(result, out, err, "Failed to add " \
423                                       "record %s with type %s."
424                                       % (record, dnstype))
425
426                 # Update the record to be the same.
427                 result, out, err = self.runsubcmd("dns", "update",
428                                                   os.environ["SERVER"],
429                                                   self.zone, "testrecord",
430                                                   dnstype, record, record,
431                                                   self.creds_string)
432                 self.assertCmdFail(result, "Successfully updated record " \
433                                    "'%s' to be exactly the same." % record)
434
435                 result, out, err = self.runsubcmd("dns", "delete",
436                                                   os.environ["SERVER"],
437                                                   self.zone, "testrecord",
438                                                   dnstype, record,
439                                                   self.creds_string)
440                 self.assertCmdSuccess(result, out, err, "Could not delete " \
441                                       "valid record '%s' of type '%s'."
442                                       % (record, dnstype))
443
444         for record in self.good_records["SRV"]:
445             result, out, err = self.runsubcmd("dns", "add",
446                                               os.environ["SERVER"],
447                                               self.zone, "testrecord",
448                                               "SRV", record,
449                                               self.creds_string)
450             self.assertCmdSuccess(result, out, err, "Failed to add " \
451                                   "record %s with type 'SRV'." % record)
452
453             split = record.split(' ')
454             new_bit = str(int(split[3]) + 1)
455             new_record = '%s %s %s %s' % (split[0], split[1], split[2], new_bit)
456
457             result, out, err = self.runsubcmd("dns", "update",
458                                               os.environ["SERVER"],
459                                               self.zone, "testrecord",
460                                               "SRV", record,
461                                               new_record, self.creds_string)
462             self.assertCmdSuccess(result, out, err, "Failed to update record " \
463                                   "'%s' of type '%s' to '%s'."
464                                   % (record, "SRV", new_record))
465
466             result, out, err = self.runsubcmd("dns", "query",
467                                               os.environ["SERVER"],
468                                               self.zone, "testrecord",
469                                               "SRV", self.creds_string)
470             self.assertCmdSuccess(result, out, err, "Failed to query for " \
471                                   "record '%s' of type '%s'."
472                                   % (new_record, "SRV"))
473
474             result, out, err = self.runsubcmd("dns", "delete",
475                                               os.environ["SERVER"],
476                                               self.zone, "testrecord",
477                                               "SRV", new_record,
478                                               self.creds_string)
479             self.assertCmdSuccess(result, out, err, "Could not delete " \
480                                   "valid record '%s' of type '%s'."
481                                   % (new_record, "SRV"))
482
483         # Since 'dns update' takes the current value as a parameter, make sure
484         # we can't enter the wrong current value for a given record.
485         for dnstype in self.good_records:
486             if len(self.good_records[dnstype]) < 3:
487                 continue  # Not enough records of this type to do this test
488
489             used_record = self.good_records[dnstype][0]
490             unused_record = self.good_records[dnstype][1]
491             new_record = self.good_records[dnstype][2]
492
493             result, out, err = self.runsubcmd("dns", "add",
494                                               os.environ["SERVER"],
495                                               self.zone, "testrecord",
496                                               dnstype, used_record,
497                                               self.creds_string)
498             self.assertCmdSuccess(result, out, err, "Failed to add record %s " \
499                                   "with type %s." % (used_record, dnstype))
500
501             result, out, err = self.runsubcmd("dns", "update",
502                                               os.environ["SERVER"],
503                                               self.zone, "testrecord",
504                                               dnstype, unused_record,
505                                               new_record,
506                                               self.creds_string)
507             self.assertCmdFail(result, "Successfully updated record '%s' " \
508                                "from '%s' to '%s', even though the given " \
509                                "source record is incorrect."
510                                % (used_record, unused_record, new_record))
511
512     def test_invalid_types(self):
513         result, out, err = self.runsubcmd("dns", "add",
514                                           os.environ["SERVER"],
515                                           self.zone, "testrecord",
516                                           "SOA", "test",
517                                           self.creds_string)
518         self.assertCmdFail(result, "Successfully added record of type SOA, " \
519                            "when this type should not be available.")
520         self.assertTrue("type SOA is not supported" in err,
521                         "Invalid error message '%s' when attempting to " \
522                         "add record of type SOA." % err)
523
524     def test_add_overlapping_different_type(self):
525         """
526         Make sure that we can add an entry with the same name as an existing one but a different type.
527         """
528
529         i = 0
530         for dnstype1 in self.good_records:
531             record1 = self.good_records[dnstype1][0]
532             for dnstype2 in self.good_records:
533                 # Only do some subset of dns types, otherwise it takes a long time.
534                 i += 1
535                 if i % 4 != 0:
536                     continue
537
538                 if dnstype1 == dnstype2:
539                     continue
540
541                 record2 = self.good_records[dnstype2][0]
542
543                 result, out, err = self.runsubcmd("dns", "add",
544                                                   os.environ["SERVER"],
545                                                   self.zone, "testrecord",
546                                                   dnstype1, record1,
547                                                   self.creds_string)
548                 self.assertCmdSuccess(result, out, err, "Failed to add record " \
549                                       "'%s' of type '%s'." % (record1, dnstype1))
550
551                 result, out, err = self.runsubcmd("dns", "add",
552                                                   os.environ["SERVER"],
553                                                   self.zone, "testrecord",
554                                                   dnstype2, record2,
555                                                   self.creds_string)
556                 self.assertCmdSuccess(result, out, err, "Failed to add record " \
557                                       "'%s' of type '%s' when a record '%s' " \
558                                       "of type '%s' with the same name exists."
559                                       % (record1, dnstype1, record2, dnstype2))
560
561                 result, out, err = self.runsubcmd("dns", "query",
562                                                   os.environ["SERVER"],
563                                                   self.zone, "testrecord",
564                                                   dnstype1, self.creds_string)
565                 self.assertCmdSuccess(result, out, err, "Failed to query for " \
566                                       "record '%s' of type '%s' when a new " \
567                                       "record '%s' of type '%s' with the same " \
568                                       "name was added."
569                                       % (record1, dnstype1, record2, dnstype2))
570
571                 result, out, err = self.runsubcmd("dns", "query",
572                                                   os.environ["SERVER"],
573                                                   self.zone, "testrecord",
574                                                   dnstype2, self.creds_string)
575                 self.assertCmdSuccess(result, out, err, "Failed to query " \
576                                       "record '%s' of type '%s' which should " \
577                                       "have been added with the same name as " \
578                                       "record '%s' of type '%s'."
579                                       % (record2, dnstype2, record1, dnstype1))
580
581                 result, out, err = self.runsubcmd("dns", "delete",
582                                                   os.environ["SERVER"],
583                                                   self.zone, "testrecord",
584                                                   dnstype1, record1,
585                                                   self.creds_string)
586                 self.assertCmdSuccess(result, out, err, "Failed to delete " \
587                                       "record '%s' of type '%s'."
588                                       % (record1, dnstype1))
589
590                 result, out, err = self.runsubcmd("dns", "delete",
591                                                   os.environ["SERVER"],
592                                                   self.zone, "testrecord",
593                                                   dnstype2, record2,
594                                                   self.creds_string)
595                 self.assertCmdSuccess(result, out, err, "Failed to delete " \
596                                       "record '%s' of type '%s'."
597                                       % (record2, dnstype2))
598
599     def test_query_deleted_record(self):
600         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
601                        "testrecord", "A", self.testip, self.creds_string)
602         self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
603                        "testrecord", "A", self.testip, self.creds_string)
604
605         result, out, err = self.runsubcmd("dns", "query",
606                                           os.environ["SERVER"],
607                                           self.zone, "testrecord",
608                                           "A", self.creds_string)
609         self.assertCmdFail(result)
610
611     def test_add_duplicate_record(self):
612         for record_type in self.good_records:
613             result, out, err = self.runsubcmd("dns", "add",
614                                               os.environ["SERVER"],
615                                               self.zone, "testrecord",
616                                               record_type,
617                                               self.good_records[record_type][0],
618                                               self.creds_string)
619             self.assertCmdSuccess(result, out, err)
620             result, out, err = self.runsubcmd("dns", "add",
621                                               os.environ["SERVER"],
622                                               self.zone, "testrecord",
623                                               record_type,
624                                               self.good_records[record_type][0],
625                                               self.creds_string)
626             self.assertCmdFail(result)
627             result, out, err = self.runsubcmd("dns", "query",
628                                               os.environ["SERVER"],
629                                               self.zone, "testrecord",
630                                               record_type, self.creds_string)
631             self.assertCmdSuccess(result, out, err)
632             result, out, err = self.runsubcmd("dns", "delete",
633                                               os.environ["SERVER"],
634                                               self.zone, "testrecord",
635                                               record_type,
636                                               self.good_records[record_type][0],
637                                               self.creds_string)
638             self.assertCmdSuccess(result, out, err)
639
640     def test_remove_deleted_record(self):
641         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
642                        "testrecord", "A", self.testip, self.creds_string)
643         self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
644                        "testrecord", "A", self.testip, self.creds_string)
645
646         # Attempting to delete a record that has already been deleted or has never existed should fail
647         result, out, err = self.runsubcmd("dns", "delete",
648                                           os.environ["SERVER"],
649                                           self.zone, "testrecord",
650                                           "A", self.testip, self.creds_string)
651         self.assertCmdFail(result)
652         result, out, err = self.runsubcmd("dns", "query",
653                                           os.environ["SERVER"],
654                                           self.zone, "testrecord",
655                                           "A", self.creds_string)
656         self.assertCmdFail(result)
657         result, out, err = self.runsubcmd("dns", "delete",
658                                           os.environ["SERVER"],
659                                           self.zone, "testrecord2",
660                                           "A", self.testip, self.creds_string)
661         self.assertCmdFail(result)
662
663     def test_cleanup_record(self):
664         """
665         Test dns cleanup command is working fine.
666         """
667
668         # add a A record
669         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
670                        'testa', "A", self.testip, self.creds_string)
671
672         # the above A record points to this host
673         dnshostname = '{}.{}'.format('testa', self.zone.lower())
674
675         # add a CNAME record points to above host
676         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
677                        'testcname', "CNAME", dnshostname, self.creds_string)
678
679         # add a NS record
680         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
681                        'testns', "NS", dnshostname, self.creds_string)
682
683         # add a PTR record points to above host
684         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
685                        'testptr', "PTR", dnshostname, self.creds_string)
686
687         # add a SRV record points to above host
688         srv_record = "{} 65530 65530 65530".format(dnshostname)
689         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
690                        'testsrv', "SRV", srv_record, self.creds_string)
691
692         # cleanup record for this dns host
693         self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
694                        dnshostname, self.creds_string)
695
696         # all records should be marked as dNSTombstoned
697         for record_name in ['testa', 'testcname', 'testns', 'testptr', 'testsrv']:
698
699             records = self.samdb.search(
700                 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
701                 scope=ldb.SCOPE_SUBTREE,
702                 expression="(&(objectClass=dnsNode)(name={}))".format(record_name),
703                 attrs=["dNSTombstoned"])
704
705             self.assertEqual(len(records), 1)
706             for record in records:
707                 self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
708
709     def test_cleanup_record_no_A_record(self):
710         """
711         Test dns cleanup command works with no A record.
712         """
713
714         # add a A record
715         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
716                        'notesta', "A", self.testip, self.creds_string)
717
718         # the above A record points to this host
719         dnshostname = '{}.{}'.format('testa', self.zone.lower())
720
721         # add a CNAME record points to above host
722         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
723                        'notestcname', "CNAME", dnshostname, self.creds_string)
724
725         # add a NS record
726         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
727                        'notestns', "NS", dnshostname, self.creds_string)
728
729         # add a PTR record points to above host
730         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
731                        'notestptr', "PTR", dnshostname, self.creds_string)
732
733         # add a SRV record points to above host
734         srv_record = "{} 65530 65530 65530".format(dnshostname)
735         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
736                        'notestsrv', "SRV", srv_record, self.creds_string)
737
738         # Remove the initial A record (leading to hanging references)
739         self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
740                        'notesta', "A", self.testip, self.creds_string)
741
742         # cleanup record for this dns host
743         self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
744                        dnshostname, self.creds_string)
745
746         # all records should be marked as dNSTombstoned
747         for record_name in ['notestcname', 'notestns', 'notestptr', 'notestsrv']:
748
749             records = self.samdb.search(
750                 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
751                 scope=ldb.SCOPE_SUBTREE,
752                 expression="(&(objectClass=dnsNode)(name={}))".format(record_name),
753                 attrs=["dNSTombstoned"])
754
755             self.assertEqual(len(records), 1)
756             for record in records:
757                 self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
758
759     def test_cleanup_multi_srv_record(self):
760         """
761         Test dns cleanup command for multi-valued SRV record.
762
763         Steps:
764         - Add 2 A records host1 and host2
765         - Add a SRV record srv1 and points to both host1 and host2
766         - Run cleanup command for host1
767         - Check records for srv1, data for host1 should be gone and host2 is kept.
768         """
769
770         hosts = ['host1', 'host2']  # A record names
771         srv_name = 'srv1'
772
773         # add A records
774         for host in hosts:
775             self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
776                            host, "A", self.testip, self.creds_string)
777
778             # the above A record points to this host
779             dnshostname = '{}.{}'.format(host, self.zone.lower())
780
781             # add a SRV record points to above host
782             srv_record = "{} 65530 65530 65530".format(dnshostname)
783             self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
784                            srv_name, "SRV", srv_record, self.creds_string)
785
786         records = self.samdb.search(
787             base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
788             scope=ldb.SCOPE_SUBTREE,
789             expression="(&(objectClass=dnsNode)(name={}))".format(srv_name),
790             attrs=['dnsRecord'])
791         # should have 2 records here
792         self.assertEqual(len(records[0]['dnsRecord']), 2)
793
794         # cleanup record for dns host1
795         dnshostname1 = 'host1.{}'.format(self.zone.lower())
796         self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
797                        dnshostname1, self.creds_string)
798
799         records = self.samdb.search(
800             base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
801             scope=ldb.SCOPE_SUBTREE,
802             expression="(&(objectClass=dnsNode)(name={}))".format(srv_name),
803             attrs=['dnsRecord', 'dNSTombstoned'])
804
805         # dnsRecord for host1 should be deleted
806         self.assertEqual(len(records[0]['dnsRecord']), 1)
807
808         # unpack data
809         dns_record_bin = records[0]['dnsRecord'][0]
810         dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
811
812         # dnsRecord for host2 is still there and is the only one
813         dnshostname2 = 'host2.{}'.format(self.zone.lower())
814         self.assertEqual(dns_record_obj.data.nameTarget, dnshostname2)
815
816         # assert that the record isn't spuriously tombstoned
817         self.assertTrue('dNSTombstoned' not in records[0] or
818                         str(record['dNSTombstoned']) == 'FALSE')
819
820     def test_dns_wildcards(self):
821         """
822         Ensure that DNS wild card entries can be added deleted and queried
823         """
824         num_failures = 0
825         failure_msgs = []
826         records = [("*.", "MISS", "A", "1.1.1.1"),
827                    ("*.SAMDOM", "MISS.SAMDOM", "A", "1.1.1.2")]
828         for (name, miss, dnstype, record) in records:
829             try:
830                 result, out, err = self.runsubcmd("dns", "add",
831                                                   os.environ["SERVER"],
832                                                   self.zone, name,
833                                                   dnstype, record,
834                                                   self.creds_string)
835                 self.assertCmdSuccess(
836                     result,
837                     out,
838                     err,
839                     ("Failed to add record %s (%s) with type %s."
840                      % (name, record, dnstype)))
841
842                 result, out, err = self.runsubcmd("dns", "query",
843                                                   os.environ["SERVER"],
844                                                   self.zone, name,
845                                                   dnstype,
846                                                   self.creds_string)
847                 self.assertCmdSuccess(
848                     result,
849                     out,
850                     err,
851                     ("Failed to query record %s with qualifier %s."
852                      % (record, dnstype)))
853
854                 # dns tool does not perform dns wildcard search if the name
855                 # does not match
856                 result, out, err = self.runsubcmd("dns", "query",
857                                                   os.environ["SERVER"],
858                                                   self.zone, miss,
859                                                   dnstype,
860                                                   self.creds_string)
861                 self.assertCmdFail(
862                     result,
863                     ("Failed to query record %s with qualifier %s."
864                      % (record, dnstype)))
865
866                 result, out, err = self.runsubcmd("dns", "delete",
867                                                   os.environ["SERVER"],
868                                                   self.zone, name,
869                                                   dnstype, record,
870                                                   self.creds_string)
871                 self.assertCmdSuccess(
872                     result,
873                     out,
874                     err,
875                     ("Failed to remove record %s with type %s."
876                      % (record, dnstype)))
877             except AssertionError as e:
878                 num_failures = num_failures + 1
879                 failure_msgs.append(e)
880
881         if num_failures > 0:
882             for msg in failure_msgs:
883                 print(msg)
884             self.fail("Failed to accept valid commands. %d total failures."
885                       "Errors above." % num_failures)