s3:utils: let smbstatus report anonymous signing/encryption explicitly
[samba.git] / python / samba / tests / dcerpc / dnsserver.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for samba.dcerpc.dnsserver"""
19
20 import os
21 import ldb
22
23 from samba.auth import system_session
24 from samba.samdb import SamDB
25 from samba.ndr import ndr_unpack
26 from samba.dcerpc import dnsp, dnsserver, security
27 from samba.tests import RpcInterfaceTestCase, env_get_var_value
28 from samba.dnsserver import record_from_string, flag_from_string, ARecord
29 from samba import sd_utils, descriptor
30 from samba import WERRORError, werror
31
32
33 class DnsserverTests(RpcInterfaceTestCase):
34
35     @classmethod
36     def setUpClass(cls):
37         super().setUpClass()
38
39         good_dns = ["SAMDOM.EXAMPLE.COM",
40                     "1.EXAMPLE.COM",
41                     "%sEXAMPLE.COM" % ("1." * 100),
42                     "EXAMPLE",
43                     "\n.COM",
44                     "!@#$%^&*()_",
45                     "HIGH\xFFBYTE",
46                     "@.EXAMPLE.COM",
47                     "."]
48         bad_dns = ["...",
49                    ".EXAMPLE.COM",
50                    ".EXAMPLE.",
51                    "",
52                    "SAMDOM..EXAMPLE.COM"]
53
54         good_mx = ["SAMDOM.EXAMPLE.COM 65535"]
55         bad_mx = []
56
57         good_srv = ["SAMDOM.EXAMPLE.COM 65535 65535 65535"]
58         bad_srv = []
59
60         for bad_dn in bad_dns:
61             bad_mx.append("%s 1" % bad_dn)
62             bad_srv.append("%s 0 0 0" % bad_dn)
63         for good_dn in good_dns:
64             good_mx.append("%s 1" % good_dn)
65             good_srv.append("%s 0 0 0" % good_dn)
66
67         cls.good_records = {
68             "A": ["192.168.0.1",
69                   "255.255.255.255"],
70             "AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
71                      "0000:0000:0000:0000:0000:0000:0000:0000",
72                      "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
73                      "1234:1234:1234::",
74                      "1234:1234:1234:1234:1234::",
75                      "1234:5678:9ABC:DEF0::",
76                      "0000:0000::0000",
77                      "1234::5678:9ABC:0000:0000:0000:0000",
78                      "::1",
79                      "::",
80                      "1:1:1:1:1:1:1:1"],
81             "PTR": good_dns,
82             "CNAME": good_dns,
83             "NS": good_dns,
84             "MX": good_mx,
85             "SRV": good_srv,
86             "TXT": ["text", "", "@#!", "\n"]
87         }
88
89         cls.bad_records = {
90             "A": ["192.168.0.500",
91                   "255.255.255.255/32"],
92             "AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
93                      "0000:0000:0000:0000:0000:0000:0000:0000/1",
94                      "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
95                      "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
96                      "1234:5678:9ABC:DEF0:1234:5678:9ABC",
97                      "1111::1111::1111"],
98             "PTR": bad_dns,
99             "CNAME": bad_dns,
100             "NS": bad_dns,
101             "MX": bad_mx,
102             "SRV": bad_srv
103         }
104
105         # Because we use uint16_t for these numbers, we can't
106         # actually create these records.
107         invalid_mx = ["SAMDOM.EXAMPLE.COM -1",
108                       "SAMDOM.EXAMPLE.COM 65536",
109                       "%s 1" % ("A" * 256)]
110         invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
111                        "SAMDOM.EXAMPLE.COM 0 0 65536",
112                        "SAMDOM.EXAMPLE.COM 65536 0 0"]
113         cls.invalid_records = {
114             "MX": invalid_mx,
115             "SRV": invalid_srv
116         }
117
118     def setUp(self):
119         super().setUp()
120         self.server = os.environ["DC_SERVER"]
121         self.zone = env_get_var_value("REALM").lower()
122         self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
123                                         self.get_loadparm(),
124                                         self.get_credentials())
125
126         self.samdb = SamDB(url="ldap://%s" % os.environ["DC_SERVER_IP"],
127                            lp=self.get_loadparm(),
128                            session_info=system_session(),
129                            credentials=self.get_credentials())
130
131         self.custom_zone = "zone"
132         zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
133         zone_create_info.pszZoneName = self.custom_zone
134         zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
135         zone_create_info.fAging = 0
136         zone_create_info.fDsIntegrated = 1
137         zone_create_info.fLoadExisting = 1
138         zone_create_info.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
139
140         self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
141                                    0,
142                                    self.server,
143                                    None,
144                                    0,
145                                    'ZoneCreate',
146                                    dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
147                                    zone_create_info)
148
149     def tearDown(self):
150         self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
151                                    0,
152                                    self.server,
153                                    self.custom_zone,
154                                    0,
155                                    'DeleteZoneFromDs',
156                                    dnsserver.DNSSRV_TYPEID_NULL,
157                                    None)
158         super().tearDown()
159
160     def test_enum_is_sorted(self):
161         """
162         Confirm the zone is sorted
163         """
164
165         record_str = "192.168.50.50"
166         record_type_str = "A"
167         self.add_record(self.custom_zone, "atestrecord-1", record_type_str, record_str)
168         self.add_record(self.custom_zone, "atestrecord-2", record_type_str, record_str)
169         self.add_record(self.custom_zone, "atestrecord-3", record_type_str, record_str)
170         self.add_record(self.custom_zone, "atestrecord-4", record_type_str, record_str)
171         self.add_record(self.custom_zone, "atestrecord-0", record_type_str, record_str)
172
173         # This becomes an extra A on the zone itself by server-side magic
174         self.add_record(self.custom_zone, self.custom_zone, record_type_str, record_str)
175
176         _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
177                                                  0,
178                                                  self.server,
179                                                  self.custom_zone,
180                                                  "@",
181                                                  None,
182                                                  flag_from_string(record_type_str),
183                                                  dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
184                                                  None,
185                                                  None)
186
187         self.assertEqual(len(result.rec), 6)
188         self.assertEqual(result.rec[0].dnsNodeName.str, "")
189         self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0")
190         self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1")
191         self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2")
192         self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3")
193         self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4")
194
195     def test_enum_is_sorted_with_zone_dup(self):
196         """
197         Confirm the zone is sorted
198         """
199
200         record_str = "192.168.50.50"
201         record_type_str = "A"
202         self.add_record(self.custom_zone, "atestrecord-1", record_type_str, record_str)
203         self.add_record(self.custom_zone, "atestrecord-2", record_type_str, record_str)
204         self.add_record(self.custom_zone, "atestrecord-3", record_type_str, record_str)
205         self.add_record(self.custom_zone, "atestrecord-4", record_type_str, record_str)
206         self.add_record(self.custom_zone, "atestrecord-0", record_type_str, record_str)
207
208         # This triggers a bug in old Samba
209         self.add_record(self.custom_zone, self.custom_zone + "1", record_type_str, record_str)
210
211         dn, record = self.get_record_from_db(self.custom_zone, self.custom_zone + "1")
212
213         new_dn = ldb.Dn(self.samdb, str(dn))
214         new_dn.set_component(0, "dc", self.custom_zone)
215         self.samdb.rename(dn, new_dn)
216
217         _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
218                                                  0,
219                                                  self.server,
220                                                  self.custom_zone,
221                                                  "@",
222                                                  None,
223                                                  flag_from_string(record_type_str),
224                                                  dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
225                                                  None,
226                                                  None)
227
228         self.assertEqual(len(result.rec), 7)
229         self.assertEqual(result.rec[0].dnsNodeName.str, "")
230         self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0")
231         self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1")
232         self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2")
233         self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3")
234         self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4")
235
236         # Windows doesn't reload the zone fast enough, but doesn't
237         # have the bug anyway, it will sort last on both names (where
238         # it should)
239         if result.rec[6].dnsNodeName.str != (self.custom_zone + "1"):
240             self.assertEqual(result.rec[6].dnsNodeName.str, self.custom_zone)
241
242     def test_enum_is_sorted_children_prefix_first(self):
243         """
244         Confirm the zone returns the selected prefix first but no more
245         as Samba is flappy for the full sort
246         """
247
248         record_str = "192.168.50.50"
249         record_type_str = "A"
250         self.add_record(self.custom_zone, "atestrecord-1.a.b", record_type_str, record_str)
251         self.add_record(self.custom_zone, "atestrecord-2.a.b", record_type_str, record_str)
252         self.add_record(self.custom_zone, "atestrecord-3.a.b", record_type_str, record_str)
253         self.add_record(self.custom_zone, "atestrecord-4.a.b", record_type_str, record_str)
254         self.add_record(self.custom_zone, "atestrecord-0.a.b", record_type_str, record_str)
255
256         # Not expected to be returned
257         self.add_record(self.custom_zone, "atestrecord-0.b.b", record_type_str, record_str)
258
259         _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
260                                                  0,
261                                                  self.server,
262                                                  self.custom_zone,
263                                                  "a.b",
264                                                  None,
265                                                  flag_from_string(record_type_str),
266                                                  dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
267                                                  None,
268                                                  None)
269
270         self.assertEqual(len(result.rec), 6)
271         self.assertEqual(result.rec[0].dnsNodeName.str, "")
272
273     def test_enum_is_sorted_children(self):
274         """
275         Confirm the zone is sorted
276         """
277
278         record_str = "192.168.50.50"
279         record_type_str = "A"
280         self.add_record(self.custom_zone, "atestrecord-1.a.b", record_type_str, record_str)
281         self.add_record(self.custom_zone, "atestrecord-2.a.b", record_type_str, record_str)
282         self.add_record(self.custom_zone, "atestrecord-3.a.b", record_type_str, record_str)
283         self.add_record(self.custom_zone, "atestrecord-4.a.b", record_type_str, record_str)
284         self.add_record(self.custom_zone, "atestrecord-0.a.b", record_type_str, record_str)
285
286         # Not expected to be returned
287         self.add_record(self.custom_zone, "atestrecord-0.b.b", record_type_str, record_str)
288
289         _, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
290                                                  0,
291                                                  self.server,
292                                                  self.custom_zone,
293                                                  "a.b",
294                                                  None,
295                                                  flag_from_string(record_type_str),
296                                                  dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
297                                                  None,
298                                                  None)
299
300         self.assertEqual(len(result.rec), 6)
301         self.assertEqual(result.rec[0].dnsNodeName.str, "")
302         self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0")
303         self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1")
304         self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2")
305         self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3")
306         self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4")
307
308     # This test fails against Samba (but passes against Windows),
309     # because Samba does not return the record when we enum records.
310     # Records can be given DNS_RANK_NONE when the zone they are in
311     # does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be
312     # deleted, however, we do not consider this urgent to fix and
313     # so this test is a knownfail.
314     def test_rank_none(self):
315         """
316         See what happens when we set a record's rank to
317         DNS_RANK_NONE.
318         """
319
320         record_str = "192.168.50.50"
321         record_type_str = "A"
322         self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
323
324         dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
325         record.rank = 0  # DNS_RANK_NONE
326         res = self.samdb.dns_replace_by_dn(dn, [record])
327         if res is not None:
328             self.fail("Unable to update dns record to have DNS_RANK_NONE.")
329
330         self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
331         self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
332         self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
333         self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
334
335     def test_dns_tombstoned_zero_timestamp(self):
336         """What happens with a zero EntombedTime tombstone?"""
337         # A zero-timestamp tombstone record has a special meaning for
338         # dns_common_replace(), which is the function exposed by
339         # samdb.dns_replace_by_dn(), and which is *NOT* a general
340         # purpose record replacement function but a specialised part
341         # of the dns update mechanism (for both DLZ and internal).
342         #
343         # In the earlier stages of handling updates, a record that
344         # needs to be deleted is set to be a tombstone with a zero
345         # timestamp. dns_common_replace() notices this specific
346         # marker, and if there are no other records, marks the node as
347         # tombstoned, in the process adding a "real" tombstone.
348         #
349         # If the tombstone has a non-zero timestamp, as you'll see in
350         # the next test, dns_common_replace will decide that the node
351         # is already tombstoned, and that no action needs to be taken.
352         #
353         # This test has worked historically, entirely by accident, as
354         # changing the wType appears to
355
356         record_str = "192.168.50.50"
357         self.add_record(self.custom_zone, "testrecord", 'A', record_str)
358
359         dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
360         record.wType = dnsp.DNS_TYPE_TOMBSTONE
361         record.data = 0
362         self.samdb.dns_replace_by_dn(dn, [record])
363
364         # there should be no A record, and one TOMBSTONE record.
365         self.assert_num_records(self.custom_zone, "testrecord", 'A', 0)
366         # we can't make assertions about the tombstone count based on
367         # RPC calls, as there are no tombstones in RPCs (there is
368         # "DNS_TYPE_ZERO" instead). Nor do tombstones show up if we
369         # use DNS_TYPE_ALL.
370         self.assert_num_records(self.custom_zone, "testrecord", 'ALL', 0)
371
372         # But we can use LDAP:
373         records = self.ldap_get_records(self.custom_zone, "testrecord")
374         self.assertEqual(len(records), 1)
375         r = records[0]
376         self.assertEqual(r.wType, dnsp.DNS_TYPE_TOMBSTONE)
377         self.assertGreater(r.data, 1e17) # ~ October 1916
378
379         # this should fail, because no A records.
380         self.delete_record(self.custom_zone, "testrecord", 'A', record_str,
381                            assertion=False)
382
383     def test_dns_tombstoned_nonzero_timestamp(self):
384         """See what happens when we set a record to be tombstoned with an
385         EntombedTime timestamp.
386         """
387         # Because this tombstone has a non-zero EntombedTime,
388         # dns_common_replace() will decide the node was already
389         # tombstoned and there is nothing to be done, leaving the A
390         # record where it was.
391
392         record_str = "192.168.50.50"
393         self.add_record(self.custom_zone, "testrecord", 'A', record_str)
394
395         dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
396         record.wType = dnsp.DNS_TYPE_TOMBSTONE
397         record.data = 0x123456789A
398         self.samdb.dns_replace_by_dn(dn, [record])
399
400         # there should be the A record and no TOMBSTONE
401         self.assert_num_records(self.custom_zone, "testrecord", 'A', 1)
402         self.assert_num_records(self.custom_zone, "testrecord", 'TOMBSTONE', 0)
403         # this should succeed
404         self.delete_record(self.custom_zone, "testrecord", 'A', record_str,
405                            assertion=True)
406         self.assert_num_records(self.custom_zone, "testrecord", 'TOMBSTONE', 0)
407         self.assert_num_records(self.custom_zone, "testrecord", 'A', 0)
408
409     def get_record_from_db(self, zone_name, record_name):
410         """
411         Returns (dn of record, record)
412         """
413
414         zones = self.samdb.search(base="DC=DomainDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
415                                   expression="(objectClass=dnsZone)",
416                                   attrs=["cn"])
417
418         zone_dn = None
419         for zone in zones:
420             if "DC=%s," % zone_name in str(zone.dn):
421                 zone_dn = zone.dn
422                 break
423
424         if zone_dn is None:
425             raise AssertionError("Couldn't find zone '%s'." % zone_name)
426
427         records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
428                                     expression="(objectClass=dnsNode)",
429                                     attrs=["dnsRecord"])
430
431         for old_packed_record in records:
432             if record_name in str(old_packed_record.dn):
433                 rec = ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0])
434                 return (old_packed_record.dn, rec)
435
436     def ldap_get_records(self, zone, name):
437         zone_dn = (f"DC={zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
438                    f"{self.samdb.get_default_basedn()}")
439
440         expr = f"(&(objectClass=dnsNode)(name={name}))"
441         nodes = self.samdb.search(base=zone_dn,
442                                   scope=ldb.SCOPE_SUBTREE,
443                                   expression=expr,
444                                   attrs=["dnsRecord"])
445
446         records = nodes[0].get('dnsRecord')
447         return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records]
448
449     def test_duplicate_matching(self):
450         """
451         Make sure that records which should be distinct from each other or duplicate
452         to each other behave as expected.
453         """
454
455         distinct_dns = [("SAMDOM.EXAMPLE.COM",
456                          "SAMDOM.EXAMPLE.CO",
457                          "EXAMPLE.COM", "SAMDOM.EXAMPLE")]
458         duplicate_dns = [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"),
459                          ("EXAMPLE.", "EXAMPLE")]
460
461         # Every tuple has entries which should be considered duplicate to one another.
462         duplicates = {
463             "AAAA": [("AAAA::", "aaaa::"),
464                      ("AAAA::", "AAAA:0000::"),
465                      ("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"),
466                      ("AAAA::", "AAAA:0:0:0:0:0:0:0"),
467                      ("0123::", "123::"),
468                      ("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")],
469         }
470
471         # Every tuple has entries which should be considered distinct from one another.
472         distinct = {
473             "A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")],
474             "AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"),
475                      ("1000::", "::1000"),
476                      ("::1", "::11", "::1111"),
477                      ("1234::", "0234::")],
478             "SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1",
479                      "SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")],
480             "MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")],
481             "TXT": [("A RECORD", "B RECORD", "a record")]
482         }
483
484         for record_type_str in ("PTR", "CNAME", "NS"):
485             distinct[record_type_str] = distinct_dns
486             duplicates[record_type_str] = duplicate_dns
487
488         for record_type_str in duplicates:
489             for duplicate_tuple in duplicates[record_type_str]:
490                 # Attempt to add duplicates and make sure that all after the first fails
491                 self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
492                 for record in duplicate_tuple:
493                     self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
494                     self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
495                 self.delete_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
496
497                 # Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds
498                 for record in duplicate_tuple:
499                     self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
500                     self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
501
502         for record_type_str in distinct:
503             for distinct_tuple in distinct[record_type_str]:
504                 # Attempt to add distinct and make sure that they all succeed within a tuple
505                 i = 0
506                 for record in distinct_tuple:
507                     i = i + 1
508                     try:
509                         self.add_record(self.custom_zone, "testrecord", record_type_str, record)
510                         # All records should have been added.
511                         self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=i)
512                     except AssertionError as e:
513                         raise AssertionError("Failed to add %s, which should be distinct from all others in the set. "
514                                              "Original error: %s\nDistinct set: %s." % (record, e, distinct_tuple))
515                 for record in distinct_tuple:
516                     self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
517                     # CNAMEs should not have been added, since they conflict.
518                     if record_type_str == 'CNAME':
519                         continue
520
521                 # Add the first distinct and attempt to remove all of the others, making sure this fails
522                 # Windows fails this test. This is probably due to weird tombstoning behavior.
523                 self.add_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
524                 for record in distinct_tuple:
525                     if record == distinct_tuple[0]:
526                         continue
527                     try:
528                         self.delete_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
529                     except AssertionError as e:
530                         raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s"
531                                              % (distinct_tuple[0], record, e))
532                 self.delete_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
533
534     def test_accept_valid_commands(self):
535         """
536         Make sure that we can add, update and delete a variety
537         of valid records.
538         """
539         for record_type_str in self.good_records:
540             for record_str in self.good_records[record_type_str]:
541                 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
542                 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
543                 self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
544
545     def check_params(self, wDataLength, rank, flags, dwTtlSeconds, dwReserved, data,
546                      wType, dwTimeStamp=0, zone="zone", rec_name="testrecord"):
547         res = self.get_record_from_db(zone, rec_name)
548         self.assertIsNotNone(res, "Expected record %s but was not found over LDAP." % data)
549         (rec_dn, rec) = res
550         self.assertEqual(wDataLength, rec.wDataLength, "Unexpected data length for record %s. Got %s, expected %s." % (data, rec.wDataLength, wDataLength))
551         self.assertEqual(rank, rec.rank, "Unexpected rank for record %s. Got %s, expected %s." % (data, rec.rank, rank))
552         self.assertEqual(flags, rec.flags, "Unexpected flags for record %s. Got %s, expected %s." % (data, rec.flags, flags))
553         self.assertEqual(dwTtlSeconds, rec.dwTtlSeconds, "Unexpected time to live for record %s. Got %s, expected %s." % (data, rec.dwTtlSeconds, dwTtlSeconds))
554         self.assertEqual(dwReserved, rec.dwReserved, "Unexpected dwReserved for record %s. Got %s, expected %s." % (data, rec.dwReserved, dwReserved))
555         self.assertEqual(data.lower(), rec.data.lower(), "Unexpected data for record %s. Got %s, expected %s." % (data, rec.data.lower(), data.lower()))
556         self.assertEqual(wType, rec.wType, "Unexpected wType for record %s. Got %s, expected %s." % (data, rec.wType, wType))
557         self.assertEqual(dwTimeStamp, rec.dwTimeStamp, "Unexpected timestamp for record %s. Got %s, expected %s." % (data, rec.dwTimeStamp, dwTimeStamp))
558
559     def test_record_params(self):
560         """
561         Make sure that, when we add records to the database,
562         they're added with reasonable parameters.
563         """
564         self.add_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
565         self.check_params(4, 240, 0, 900, 0, "192.168.50.50", 1)
566         self.delete_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
567         self.add_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
568         self.check_params(16, 240, 0, 900, 0, "AAAA:AAAA:0000:0000:0000:0000:0000:0000", 28)
569         self.delete_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
570         self.add_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
571         self.check_params(13, 240, 0, 900, 0, "cnamedest", 5)
572         self.delete_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
573
574     def test_reject_invalid_commands(self):
575         """
576         Make sure that we can't add a variety of invalid records,
577         and that we can't update valid records to invalid ones.
578         """
579         num_failures = 0
580         for record_type_str in self.bad_records:
581             for record_str in self.bad_records[record_type_str]:
582                 # Attempt to add the bad record, which should fail. Then, attempt to query for and delete
583                 # it. Since it shouldn't exist, these should fail too.
584                 try:
585                     self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
586                     self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
587                     self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
588                 except AssertionError as e:
589                     print(e)
590                     num_failures = num_failures + 1
591
592         # Also try to update valid records to invalid ones, making sure this fails
593         for record_type_str in self.bad_records:
594             for record_str in self.bad_records[record_type_str]:
595                 good_record_str = self.good_records[record_type_str][0]
596                 self.add_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
597                 try:
598                     self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
599                 except AssertionError as e:
600                     print(e)
601                     num_failures = num_failures + 1
602                 self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
603
604         self.assertTrue(num_failures == 0, "Failed to reject invalid commands. Total failures: %d." % num_failures)
605
606     def test_add_duplicate_different_type(self):
607         """
608         Attempt to add some values which have the same name as
609         existing ones, just a different type.
610         """
611         num_failures = 0
612         for record_type_str_1 in self.good_records:
613             record1 = self.good_records[record_type_str_1][0]
614             self.add_record(self.custom_zone, "testrecord", record_type_str_1, record1)
615             for record_type_str_2 in self.good_records:
616                 if record_type_str_1 == record_type_str_2:
617                     continue
618
619                 record2 = self.good_records[record_type_str_2][0]
620
621                 has_a = record_type_str_1 == 'A' or record_type_str_2 == 'A'
622                 has_aaaa = record_type_str_1 == 'AAAA' or record_type_str_2 == 'AAAA'
623                 has_cname = record_type_str_1 == 'CNAME' or record_type_str_2 == 'CNAME'
624                 has_ptr = record_type_str_1 == 'PTR' or record_type_str_2 == 'PTR'
625                 has_mx = record_type_str_1 == 'MX' or record_type_str_2 == 'MX'
626                 has_srv = record_type_str_1 == 'SRV' or record_type_str_2 == 'SRV'
627                 has_txt = record_type_str_1 == 'TXT' or record_type_str_2 == 'TXT'
628
629                 # If we attempt to add any record except A or AAAA when we already have an NS record,
630                 # the add should fail.
631                 add_error_ok = False
632                 if record_type_str_1 == 'NS' and not has_a and not has_aaaa:
633                     add_error_ok = True
634                 # If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail.
635                 if record_type_str_2 == 'CNAME' and (has_ptr or has_mx or has_a or has_aaaa):
636                     add_error_ok = True
637                 # If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail.
638                 # If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail.
639                 if has_cname and (has_a or has_aaaa or has_srv or has_txt):
640                     add_error_ok = True
641
642                 try:
643                     self.add_record(self.custom_zone, "testrecord", record_type_str_2, record2)
644                     if add_error_ok:
645                         num_failures = num_failures + 1
646                         print("Expected error when adding %s while a %s existed."
647                               % (record_type_str_2, record_type_str_1))
648                 except AssertionError as e:
649                     if not add_error_ok:
650                         num_failures = num_failures + 1
651                         print("Didn't expect error when adding %s while a %s existed."
652                               % (record_type_str_2, record_type_str_1))
653
654                 if not add_error_ok:
655                     # In the "normal" case, we expect the add to work and us to have one of each type of record afterwards.
656                     expected_num_type_1 = 1
657                     expected_num_type_2 = 1
658
659                     # If we have an MX record, a PTR record should replace it when added.
660                     # If we have a PTR record, an MX record should replace it when added.
661                     if has_ptr and has_mx:
662                         expected_num_type_1 = 0
663
664                     # If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added.
665                     if (has_cname or has_srv or has_txt) and (record_type_str_2 == 'PTR' or record_type_str_2 == 'MX'):
666                         expected_num_type_1 = 0
667
668                     if (record_type_str_1 == 'NS' and (has_a or has_aaaa)):
669                         expected_num_type_2 = 0
670
671                     try:
672                         self.assert_num_records(self.custom_zone, "testrecord", record_type_str_1, expected_num=expected_num_type_1)
673                     except AssertionError as e:
674                         num_failures = num_failures + 1
675                         print("Expected %s %s records after adding a %s record and a %s record already existed."
676                               % (expected_num_type_1, record_type_str_1, record_type_str_2, record_type_str_1))
677                     try:
678                         self.assert_num_records(self.custom_zone, "testrecord", record_type_str_2, expected_num=expected_num_type_2)
679                     except AssertionError as e:
680                         num_failures = num_failures + 1
681                         print("Expected %s %s records after adding a %s record and a %s record already existed."
682                               % (expected_num_type_2, record_type_str_2, record_type_str_2, record_type_str_1))
683
684                 try:
685                     self.delete_record(self.custom_zone, "testrecord", record_type_str_2, record2)
686                 except AssertionError as e:
687                     pass
688
689             self.delete_record(self.custom_zone, "testrecord", record_type_str_1, record1)
690
691         self.assertTrue(num_failures == 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures)
692
693     # Windows fails this test in the same way we do.
694     def _test_cname(self):
695         """
696         Test some special properties of CNAME records.
697         """
698
699         # RFC 1912: When there is a CNAME record, there must not be any other records with the same alias
700         cname_record = self.good_records["CNAME"][1]
701         self.add_record(self.custom_zone, "testrecord", "CNAME", cname_record)
702
703         for record_type_str in self.good_records:
704             other_record = self.good_records[record_type_str][0]
705             self.add_record(self.custom_zone, "testrecord", record_type_str, other_record, assertion=False)
706             self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
707
708         # RFC 2181: MX & NS records must not be allowed to point to a CNAME alias
709         mx_record = "testrecord 1"
710         ns_record = "testrecord"
711
712         self.add_record(self.custom_zone, "mxrec", "MX", mx_record, assertion=False)
713         self.add_record(self.custom_zone, "nsrec", "NS", ns_record, assertion=False)
714
715         self.delete_record(self.custom_zone, "testrecord", "CNAME", cname_record)
716
717     def test_add_duplicate_value(self):
718         """
719         Make sure that we can't add duplicate values of any type.
720         """
721         for record_type_str in self.good_records:
722             record = self.good_records[record_type_str][0]
723
724             self.add_record(self.custom_zone, "testrecord", record_type_str, record)
725             self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
726             self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
727             self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
728
729     def test_add_similar_value(self):
730         """
731         Attempt to add values with the same name and type in the same
732         zone. This should work, and should result in both values
733         existing (except with some types).
734         """
735         for record_type_str in self.good_records:
736             for i in range(1, len(self.good_records[record_type_str])):
737                 record1 = self.good_records[record_type_str][i - 1]
738                 record2 = self.good_records[record_type_str][i]
739
740                 if record_type_str == 'CNAME':
741                     continue
742                 # We expect CNAME records to override one another, as
743                 # an alias can only map to one CNAME record.
744                 # Also, on Windows, when the empty string is added and
745                 # another record is added afterwards, the empty string
746                 # will be silently overridden by the new one, so it
747                 # fails this test for the empty string.
748                 expected_num = 1 if record_type_str == 'CNAME' else 2
749
750                 self.add_record(self.custom_zone, "testrecord", record_type_str, record1)
751                 self.add_record(self.custom_zone, "testrecord", record_type_str, record2)
752                 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=expected_num)
753                 self.delete_record(self.custom_zone, "testrecord", record_type_str, record1)
754                 self.delete_record(self.custom_zone, "testrecord", record_type_str, record2)
755
756     def assert_record(self, zone, name, record_type_str, expected_record_str,
757                       assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
758         """
759         Asserts whether or not the given record with the given type exists in the
760         given zone.
761         """
762         try:
763             _, result = self.query_records(zone, name, record_type_str)
764         except RuntimeError as e:
765             if assertion:
766                 raise AssertionError("Record '%s' of type '%s' was not present when it should have been."
767                                      % (expected_record_str, record_type_str))
768             else:
769                 return
770
771         found = False
772         for record in result.rec[0].records:
773             if record.data == expected_record_str:
774                 found = True
775                 break
776
777         if found and not assertion:
778             raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str, record_type_str))
779         elif not found and assertion:
780             raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str))
781
782     def assert_num_records(self, zone, name, record_type_str, expected_num=1,
783                            client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
784         """
785         Asserts that there are a given amount of records with the given type in
786         the given zone.
787         """
788         try:
789             _, result = self.query_records(zone, name, record_type_str)
790             num_results = len(result.rec[0].records)
791             if not num_results == expected_num:
792                 raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected."
793                                      % (num_results, record_type_str, name, expected_num))
794         except RuntimeError:
795             if not expected_num == 0:
796                 raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected."
797                                      % (record_type_str, name, expected_num))
798
799     def query_records(self, zone, name, record_type_str, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
800         return self.conn.DnssrvEnumRecords2(client_version,
801                                             0,
802                                             self.server,
803                                             zone,
804                                             name,
805                                             None,
806                                             flag_from_string(record_type_str),
807                                             dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver.DNS_RPC_VIEW_NO_CHILDREN,
808                                             None,
809                                             None)
810
811     def add_record(self, zone, name, record_type_str, record_str,
812                    assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
813         """
814         Attempts to add a map from the given name to a record of the given type,
815         in the given zone.
816         Also asserts whether or not the add was successful.
817         This can also update existing records if they have the same name.
818         """
819         record = record_from_string(record_type_str, record_str, sep=' ')
820         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
821         add_rec_buf.rec = record
822
823         try:
824             self.conn.DnssrvUpdateRecord2(client_version,
825                                           0,
826                                           self.server,
827                                           zone,
828                                           name,
829                                           add_rec_buf,
830                                           None)
831             if not assertion:
832                 raise AssertionError("Successfully added record '%s' of type '%s', which should have failed."
833                                      % (record_str, record_type_str))
834         except RuntimeError as e:
835             if assertion:
836                 raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'."
837                                      % (record_str, record_type_str, str(e)))
838
839     def delete_record(self, zone, name, record_type_str, record_str,
840                       assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
841         """
842         Attempts to delete a record with the given name, record and record type
843         from the given zone.
844         Also asserts whether or not the deletion was successful.
845         """
846         record = record_from_string(record_type_str, record_str, sep=' ')
847         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
848         del_rec_buf.rec = record
849
850         try:
851             self.conn.DnssrvUpdateRecord2(client_version,
852                                           0,
853                                           self.server,
854                                           zone,
855                                           name,
856                                           None,
857                                           del_rec_buf)
858             if not assertion:
859                 raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str, record_type_str))
860         except RuntimeError as e:
861             if assertion:
862                 raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e)))
863
864     def test_query2(self):
865         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
866                                                 0,
867                                                 self.server,
868                                                 None,
869                                                 'ServerInfo')
870         self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid)
871
872         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET,
873                                                 0,
874                                                 self.server,
875                                                 None,
876                                                 'ServerInfo')
877         self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid)
878
879         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
880                                                 0,
881                                                 self.server,
882                                                 None,
883                                                 'ServerInfo')
884         self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid)
885
886
887     # This test is to confirm that we do not support multizone operations,
888     # which are designated by a non-zero dwContext value (the 3rd argument
889     # to DnssrvOperation).
890     def test_operation_invalid(self):
891         non_zone = 'a-zone-that-does-not-exist'
892         typeid = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
893         name_and_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
894         name_and_param.pszNodeName = 'AllowUpdate'
895         name_and_param.dwParam = dnsp.DNS_ZONE_UPDATE_SECURE
896         try:
897             res = self.conn.DnssrvOperation(self.server,
898                                             non_zone,
899                                             1,
900                                             'ResetDwordProperty',
901                                             typeid,
902                                             name_and_param)
903         except WERRORError as e:
904             if e.args[0] == werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
905                 return
906
907         # We should always encounter a DOES_NOT_EXIST error.
908         self.fail()
909
910     # This test is to confirm that we do not support multizone operations,
911     # which are designated by a non-zero dwContext value (the 5th argument
912     # to DnssrvOperation2).
913     def test_operation2_invalid(self):
914         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
915         non_zone = 'a-zone-that-does-not-exist'
916         typeid = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
917         name_and_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
918         name_and_param.pszNodeName = 'AllowUpdate'
919         name_and_param.dwParam = dnsp.DNS_ZONE_UPDATE_SECURE
920         try:
921             res = self.conn.DnssrvOperation2(client_version,
922                                              0,
923                                              self.server,
924                                              non_zone,
925                                              1,
926                                              'ResetDwordProperty',
927                                              typeid,
928                                              name_and_param)
929         except WERRORError as e:
930             if e.args[0] == werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
931                 return
932
933         # We should always encounter a DOES_NOT_EXIST error.
934         self.fail()
935
936     def test_operation2(self):
937         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
938         rev_zone = '1.168.192.in-addr.arpa'
939
940         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
941         zone_create.pszZoneName = rev_zone
942         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
943         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
944         zone_create.fAging = 0
945         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
946
947         # Create zone
948         self.conn.DnssrvOperation2(client_version,
949                                    0,
950                                    self.server,
951                                    None,
952                                    0,
953                                    'ZoneCreate',
954                                    dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
955                                    zone_create)
956
957         request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
958                           dnsserver.DNS_ZONE_REQUEST_PRIMARY)
959         _, zones = self.conn.DnssrvComplexOperation2(client_version,
960                                                      0,
961                                                      self.server,
962                                                      None,
963                                                      'EnumZones',
964                                                      dnsserver.DNSSRV_TYPEID_DWORD,
965                                                      request_filter)
966         self.assertEqual(1, zones.dwZoneCount)
967
968         # Delete zone
969         self.conn.DnssrvOperation2(client_version,
970                                    0,
971                                    self.server,
972                                    rev_zone,
973                                    0,
974                                    'DeleteZoneFromDs',
975                                    dnsserver.DNSSRV_TYPEID_NULL,
976                                    None)
977
978         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
979                                                           0,
980                                                           self.server,
981                                                           None,
982                                                           'EnumZones',
983                                                           dnsserver.DNSSRV_TYPEID_DWORD,
984                                                           request_filter)
985         self.assertEqual(0, zones.dwZoneCount)
986
987     def test_complexoperation2(self):
988         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
989         request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
990                           dnsserver.DNS_ZONE_REQUEST_PRIMARY)
991
992         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
993                                                           0,
994                                                           self.server,
995                                                           None,
996                                                           'EnumZones',
997                                                           dnsserver.DNSSRV_TYPEID_DWORD,
998                                                           request_filter)
999         self.assertEqual(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
1000         self.assertEqual(3, zones.dwZoneCount)
1001
1002         request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
1003                           dnsserver.DNS_ZONE_REQUEST_PRIMARY)
1004         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
1005                                                           0,
1006                                                           self.server,
1007                                                           None,
1008                                                           'EnumZones',
1009                                                           dnsserver.DNSSRV_TYPEID_DWORD,
1010                                                           request_filter)
1011         self.assertEqual(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
1012         self.assertEqual(0, zones.dwZoneCount)
1013
1014     def test_enumrecords2(self):
1015         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1016         record_type = dnsp.DNS_TYPE_NS
1017         select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
1018                         dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
1019         _, roothints = self.conn.DnssrvEnumRecords2(client_version,
1020                                                     0,
1021                                                     self.server,
1022                                                     '..RootHints',
1023                                                     '.',
1024                                                     None,
1025                                                     record_type,
1026                                                     select_flags,
1027                                                     None,
1028                                                     None)
1029         self.assertEqual(14, roothints.count)  # 1 NS + 13 A records (a-m)
1030
1031     def test_updaterecords2(self):
1032         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
1033         record_type = dnsp.DNS_TYPE_A
1034         select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
1035         name = 'dummy'
1036         rec = ARecord('1.2.3.4')
1037         rec2 = ARecord('5.6.7.8')
1038
1039         # Add record
1040         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1041         add_rec_buf.rec = rec
1042         self.conn.DnssrvUpdateRecord2(client_version,
1043                                       0,
1044                                       self.server,
1045                                       self.zone,
1046                                       name,
1047                                       add_rec_buf,
1048                                       None)
1049
1050         _, result = self.conn.DnssrvEnumRecords2(client_version,
1051                                                  0,
1052                                                  self.server,
1053                                                  self.zone,
1054                                                  name,
1055                                                  None,
1056                                                  record_type,
1057                                                  select_flags,
1058                                                  None,
1059                                                  None)
1060         self.assertEqual(1, result.count)
1061         self.assertEqual(1, result.rec[0].wRecordCount)
1062         self.assertEqual(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
1063         self.assertEqual('1.2.3.4', result.rec[0].records[0].data)
1064
1065         # Update record
1066         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1067         add_rec_buf.rec = rec2
1068         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1069         del_rec_buf.rec = rec
1070         self.conn.DnssrvUpdateRecord2(client_version,
1071                                       0,
1072                                       self.server,
1073                                       self.zone,
1074                                       name,
1075                                       add_rec_buf,
1076                                       del_rec_buf)
1077
1078         buflen, result = self.conn.DnssrvEnumRecords2(client_version,
1079                                                       0,
1080                                                       self.server,
1081                                                       self.zone,
1082                                                       name,
1083                                                       None,
1084                                                       record_type,
1085                                                       select_flags,
1086                                                       None,
1087                                                       None)
1088         self.assertEqual(1, result.count)
1089         self.assertEqual(1, result.rec[0].wRecordCount)
1090         self.assertEqual(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
1091         self.assertEqual('5.6.7.8', result.rec[0].records[0].data)
1092
1093         # Delete record
1094         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1095         del_rec_buf.rec = rec2
1096         self.conn.DnssrvUpdateRecord2(client_version,
1097                                       0,
1098                                       self.server,
1099                                       self.zone,
1100                                       name,
1101                                       None,
1102                                       del_rec_buf)
1103
1104         self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2,
1105                           client_version,
1106                           0,
1107                           self.server,
1108                           self.zone,
1109                           name,
1110                           None,
1111                           record_type,
1112                           select_flags,
1113                           None,
1114                           None)
1115
1116     # The following tests do not pass against Samba because the owner and
1117     # group are not consistent with Windows, as well as some ACEs.
1118     #
1119     # The following ACE are also required for 2012R2:
1120     #
1121     # (OA;CIIO;WP;ea1b7b93-5e48-46d5-bc6c-4df4fda78a35;bf967a86-0de6-11d0-a285-00aa003049e2;PS)
1122     # (OA;OICI;RPWP;3f78c3e5-f79a-46bd-a0b8-9d18116ddc79;;PS)"
1123     #
1124     # [TPM + Allowed-To-Act-On-Behalf-Of-Other-Identity]
1125     def test_security_descriptor_msdcs_zone(self):
1126         """
1127         Make sure that security descriptors of the msdcs zone is
1128         as expected.
1129         """
1130
1131         zones = self.samdb.search(base="DC=ForestDnsZones,%s" % self.samdb.get_default_basedn(),
1132                                   scope=ldb.SCOPE_SUBTREE,
1133                                   expression="(&(objectClass=dnsZone)(name=_msdcs*))",
1134                                   attrs=["nTSecurityDescriptor", "objectClass"])
1135         self.assertEqual(len(zones), 1)
1136         self.assertIn("nTSecurityDescriptor", zones[0])
1137         tmp = zones[0]["nTSecurityDescriptor"][0]
1138         utils = sd_utils.SDUtils(self.samdb)
1139         sd = ndr_unpack(security.descriptor, tmp)
1140
1141         domain_sid = security.dom_sid(self.samdb.get_domain_sid())
1142
1143         res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
1144                                 expression="(sAMAccountName=DnsAdmins)",
1145                                 attrs=["objectSid"])
1146
1147         dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
1148
1149         packed_sd = descriptor.sddl2binary("O:SYG:BA"
1150                                            "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
1151                                            "(A;;CC;;;AU)"
1152                                            "(A;;RPLCLORC;;;WD)"
1153                                            "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
1154                                            "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1155                                            domain_sid, {"DnsAdmins": dns_admin})
1156         expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
1157
1158         diff = descriptor.get_diff_sds(expected_sd, sd, domain_sid)
1159         self.assertEqual(diff, '', "SD of msdcs zone different to expected.\n"
1160                          "Difference was:\n%s\nExpected: %s\nGot: %s" %
1161                          (diff, expected_sd.as_sddl(utils.domain_sid),
1162                           sd.as_sddl(utils.domain_sid)))
1163
1164     def test_security_descriptor_forest_zone(self):
1165         """
1166         Make sure that security descriptors of forest dns zones are
1167         as expected.
1168         """
1169         forest_zone = "test_forest_zone"
1170         zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
1171         zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
1172         zone_create_info.fAging = 0
1173         zone_create_info.fDsIntegrated = 1
1174         zone_create_info.fLoadExisting = 1
1175
1176         zone_create_info.pszZoneName = forest_zone
1177         zone_create_info.dwDpFlags = dnsserver.DNS_DP_FOREST_DEFAULT
1178
1179         self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1180                                    0,
1181                                    self.server,
1182                                    None,
1183                                    0,
1184                                    'ZoneCreate',
1185                                    dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
1186                                    zone_create_info)
1187
1188         partition_dn = self.samdb.get_default_basedn()
1189         partition_dn.add_child("DC=ForestDnsZones")
1190         zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
1191                                   expression="(name=%s)" % forest_zone,
1192                                   attrs=["nTSecurityDescriptor"])
1193         self.assertEqual(len(zones), 1)
1194         current_dn = zones[0].dn
1195         self.assertIn("nTSecurityDescriptor", zones[0])
1196         tmp = zones[0]["nTSecurityDescriptor"][0]
1197         utils = sd_utils.SDUtils(self.samdb)
1198         sd = ndr_unpack(security.descriptor, tmp)
1199
1200         domain_sid = security.dom_sid(self.samdb.get_domain_sid())
1201
1202         res = self.samdb.search(base=self.samdb.get_default_basedn(),
1203                                 scope=ldb.SCOPE_SUBTREE,
1204                                 expression="(sAMAccountName=DnsAdmins)",
1205                                 attrs=["objectSid"])
1206
1207         dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
1208
1209         packed_sd = descriptor.sddl2binary("O:DAG:DA"
1210                                            "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
1211                                            "(A;;CC;;;AU)"
1212                                            "(A;;RPLCLORC;;;WD)"
1213                                            "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
1214                                            "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1215                                            domain_sid, {"DnsAdmins": dns_admin})
1216         expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
1217
1218         packed_msdns = descriptor.get_dns_forest_microsoft_dns_descriptor(domain_sid,
1219                                                                           {"DnsAdmins": dns_admin})
1220         expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
1221
1222         packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
1223         expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
1224                                                               packed_part_sd))
1225         try:
1226             msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
1227             security_desc_dict = [(current_dn.get_linearized(), expected_sd),
1228                                   (msdns_dn.get_linearized(), expected_msdns_sd),
1229                                   (partition_dn.get_linearized(), expected_part_sd)]
1230
1231             for (key, sec_desc) in security_desc_dict:
1232                 zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
1233                                           attrs=["nTSecurityDescriptor"])
1234                 self.assertIn("nTSecurityDescriptor", zones[0])
1235                 tmp = zones[0]["nTSecurityDescriptor"][0]
1236                 utils = sd_utils.SDUtils(self.samdb)
1237
1238                 sd = ndr_unpack(security.descriptor, tmp)
1239                 diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
1240
1241                 self.assertEqual(diff, '', "Security descriptor of forest DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1242                                  % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))
1243
1244         finally:
1245             self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1246                                        0,
1247                                        self.server,
1248                                        forest_zone,
1249                                        0,
1250                                        'DeleteZoneFromDs',
1251                                        dnsserver.DNSSRV_TYPEID_NULL,
1252                                        None)
1253
1254     def test_security_descriptor_domain_zone(self):
1255         """
1256         Make sure that security descriptors of domain dns zones are
1257         as expected.
1258         """
1259
1260         partition_dn = self.samdb.get_default_basedn()
1261         partition_dn.add_child("DC=DomainDnsZones")
1262         zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
1263                                   expression="(name=%s)" % self.custom_zone,
1264                                   attrs=["nTSecurityDescriptor"])
1265         self.assertEqual(len(zones), 1)
1266         current_dn = zones[0].dn
1267         self.assertIn("nTSecurityDescriptor", zones[0])
1268         tmp = zones[0]["nTSecurityDescriptor"][0]
1269         utils = sd_utils.SDUtils(self.samdb)
1270         sd = ndr_unpack(security.descriptor, tmp)
1271         sddl = sd.as_sddl(utils.domain_sid)
1272
1273         domain_sid = security.dom_sid(self.samdb.get_domain_sid())
1274
1275         res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
1276                                 expression="(sAMAccountName=DnsAdmins)",
1277                                 attrs=["objectSid"])
1278
1279         dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
1280
1281         packed_sd = descriptor.sddl2binary("O:DAG:DA"
1282                                            "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
1283                                            "(A;;CC;;;AU)"
1284                                            "(A;;RPLCLORC;;;WD)"
1285                                            "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
1286                                            "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1287                                            domain_sid, {"DnsAdmins": dns_admin})
1288         expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
1289
1290         packed_msdns = descriptor.get_dns_domain_microsoft_dns_descriptor(domain_sid,
1291                                                                           {"DnsAdmins": dns_admin})
1292         expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
1293
1294         packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
1295         expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
1296                                                               packed_part_sd))
1297
1298         msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
1299         security_desc_dict = [(current_dn.get_linearized(), expected_sd),
1300                               (msdns_dn.get_linearized(), expected_msdns_sd),
1301                               (partition_dn.get_linearized(), expected_part_sd)]
1302
1303         for (key, sec_desc) in security_desc_dict:
1304             zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
1305                                       attrs=["nTSecurityDescriptor"])
1306             self.assertIn("nTSecurityDescriptor", zones[0])
1307             tmp = zones[0]["nTSecurityDescriptor"][0]
1308             utils = sd_utils.SDUtils(self.samdb)
1309
1310             sd = ndr_unpack(security.descriptor, tmp)
1311             diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
1312
1313             self.assertEqual(diff, '', "Security descriptor of domain DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1314                              % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))