PEP8: fix E241: multiple spaces after ','
[amitay/samba.git] / python / samba / tests / dcerpc / dnsserver.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for samba.dcerpc.dnsserver"""
20
21 import os
22 import ldb
23
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.ndr import ndr_unpack, ndr_pack
27 from samba.dcerpc import dnsp, dnsserver, security
28 from samba.tests import RpcInterfaceTestCase, env_get_var_value
29 from samba.netcmd.dns import ARecord, AAAARecord, PTRRecord, CNameRecord, NSRecord, MXRecord, SRVRecord, TXTRecord
30 from samba import sd_utils, descriptor
31
32 class DnsserverTests(RpcInterfaceTestCase):
33
34     @classmethod
35     def setUpClass(cls):
36         good_dns = ["SAMDOM.EXAMPLE.COM",
37                     "1.EXAMPLE.COM",
38                     "%sEXAMPLE.COM" % ("1." * 100),
39                     "EXAMPLE",
40                     "\n.COM",
41                     "!@#$%^&*()_",
42                     "HIGH\xFFBYTE",
43                     "@.EXAMPLE.COM",
44                     "."]
45         bad_dns = ["...",
46                    ".EXAMPLE.COM",
47                    ".EXAMPLE.",
48                    "",
49                    "SAMDOM..EXAMPLE.COM"]
50
51         good_mx = ["SAMDOM.EXAMPLE.COM 65535"]
52         bad_mx = []
53
54         good_srv = ["SAMDOM.EXAMPLE.COM 65535 65535 65535"]
55         bad_srv = []
56
57         for bad_dn in bad_dns:
58             bad_mx.append("%s 1" % bad_dn)
59             bad_srv.append("%s 0 0 0" % bad_dn)
60         for good_dn in good_dns:
61             good_mx.append("%s 1" % good_dn)
62             good_srv.append("%s 0 0 0" % good_dn)
63
64         cls.good_records = {
65             "A": ["192.168.0.1",
66                   "255.255.255.255"],
67             "AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
68                      "0000:0000:0000:0000:0000:0000:0000:0000",
69                      "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
70                      "1234:1234:1234::",
71                      "1234:1234:1234:1234:1234::",
72                      "1234:5678:9ABC:DEF0::",
73                      "0000:0000::0000",
74                      "1234::5678:9ABC:0000:0000:0000:0000",
75                      "::1",
76                      "::",
77                      "1:1:1:1:1:1:1:1"],
78             "PTR": good_dns,
79             "CNAME": good_dns,
80             "NS": good_dns,
81             "MX": good_mx,
82             "SRV": good_srv,
83             "TXT": ["text", "", "@#!", "\n"]
84         }
85
86         cls.bad_records = {
87             "A": ["192.168.0.500",
88                   "255.255.255.255/32"],
89             "AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
90                      "0000:0000:0000:0000:0000:0000:0000:0000/1",
91                      "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
92                      "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
93                      "1234:5678:9ABC:DEF0:1234:5678:9ABC",
94                      "1111::1111::1111"],
95             "PTR": bad_dns,
96             "CNAME": bad_dns,
97             "NS": bad_dns,
98             "MX": bad_mx,
99             "SRV": bad_srv
100         }
101
102         # Because we use uint16_t for these numbers, we can't
103         # actually create these records.
104         invalid_mx = ["SAMDOM.EXAMPLE.COM -1",
105                       "SAMDOM.EXAMPLE.COM 65536",
106                       "%s 1" % "A" *256]
107         invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
108                        "SAMDOM.EXAMPLE.COM 0 0 65536",
109                        "SAMDOM.EXAMPLE.COM 65536 0 0"]
110         cls.invalid_records = {
111             "MX": invalid_mx,
112             "SRV": invalid_srv
113         }
114
115     def setUp(self):
116         super(DnsserverTests, self).setUp()
117         self.server = os.environ["DC_SERVER"]
118         self.zone = env_get_var_value("REALM").lower()
119         self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
120                                         self.get_loadparm(),
121                                         self.get_credentials())
122
123         self.samdb = SamDB(url="ldap://%s" % os.environ["DC_SERVER_IP"],
124                            lp = self.get_loadparm(),
125                            session_info=system_session(),
126                            credentials=self.get_credentials())
127
128
129         self.custom_zone = "zone"
130         zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
131         zone_create_info.pszZoneName = self.custom_zone
132         zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
133         zone_create_info.fAging = 0
134         zone_create_info.fDsIntegrated = 1
135         zone_create_info.fLoadExisting = 1
136         zone_create_info.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
137
138         self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
139                                    0,
140                                    self.server,
141                                    None,
142                                    0,
143                                    'ZoneCreate',
144                                    dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
145                                    zone_create_info)
146
147     def tearDown(self):
148         self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
149                                    0,
150                                    self.server,
151                                    self.custom_zone,
152                                    0,
153                                    'DeleteZoneFromDs',
154                                    dnsserver.DNSSRV_TYPEID_NULL,
155                                    None)
156         super(DnsserverTests, self).tearDown()
157
158     # This test fails against Samba (but passes against Windows),
159     # because Samba does not return the record when we enum records.
160     # Records can be given DNS_RANK_NONE when the zone they are in
161     # does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be
162     # deleted, however, we do not consider this urgent to fix and
163     # so this test is a knownfail.
164     def test_rank_none(self):
165         """
166         See what happens when we set a record's rank to
167         DNS_RANK_NONE.
168         """
169
170         record_str = "192.168.50.50"
171         record_type_str = "A"
172         self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
173
174         dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
175         record.rank = 0 # DNS_RANK_NONE
176         res = self.samdb.dns_replace_by_dn(dn, [record])
177         if res is not None:
178             self.fail("Unable to update dns record to have DNS_RANK_NONE.")
179
180         self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
181         self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
182         self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
183         self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
184
185     def test_dns_tombstoned(self):
186         """
187         See what happens when we set a record to be tombstoned.
188         """
189
190         record_str = "192.168.50.50"
191         record_type_str = "A"
192         self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
193
194         dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
195         record.wType = dnsp.DNS_TYPE_TOMBSTONE
196         res = self.samdb.dns_replace_by_dn(dn, [record])
197         if res is not None:
198             self.fail("Unable to update dns record to be tombstoned.")
199
200         self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
201         self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
202         self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
203
204     def get_record_from_db(self, zone_name, record_name):
205         """
206         Returns (dn of record, record)
207         """
208
209         zones = self.samdb.search(base="DC=DomainDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
210                                   expression="(objectClass=dnsZone)",
211                                   attrs=["cn"])
212
213         zone_dn = None
214         for zone in zones:
215             if "DC=%s," % zone_name in str(zone.dn):
216                 zone_dn = zone.dn
217                 break
218
219         if zone_dn is None:
220             raise AssertionError("Couldn't find zone '%s'." % zone_name)
221
222         records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
223                                     expression="(objectClass=dnsNode)",
224                                     attrs=["dnsRecord"])
225
226         for old_packed_record in records:
227             if record_name in str(old_packed_record.dn):
228                 rec = ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0])
229                 return (old_packed_record.dn, rec)
230
231     def test_duplicate_matching(self):
232         """
233         Make sure that records which should be distinct from each other or duplicate
234         to each other behave as expected.
235         """
236
237         distinct_dns = [("SAMDOM.EXAMPLE.COM",
238                          "SAMDOM.EXAMPLE.CO",
239                          "EXAMPLE.COM", "SAMDOM.EXAMPLE")]
240         duplicate_dns = [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"),
241                          ("EXAMPLE.", "EXAMPLE")]
242
243         # Every tuple has entries which should be considered duplicate to one another.
244         duplicates = {
245             "AAAA": [("AAAA::", "aaaa::"),
246                      ("AAAA::", "AAAA:0000::"),
247                      ("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"),
248                      ("AAAA::", "AAAA:0:0:0:0:0:0:0"),
249                      ("0123::", "123::"),
250                      ("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")],
251         }
252
253         # Every tuple has entries which should be considered distinct from one another.
254         distinct = {
255             "A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")],
256             "AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"),
257                      ("1000::", "::1000"),
258                      ("::1", "::11", "::1111"),
259                      ("1234::", "0234::")],
260             "SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1",
261                      "SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")],
262             "MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")],
263             "TXT": [("A RECORD", "B RECORD", "a record")]
264         }
265
266         for record_type_str in ("PTR", "CNAME", "NS"):
267             distinct[record_type_str] = distinct_dns
268             duplicates[record_type_str] = duplicate_dns
269
270         for record_type_str in duplicates:
271             for duplicate_tuple in duplicates[record_type_str]:
272                 # Attempt to add duplicates and make sure that all after the first fails
273                 self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
274                 for record in duplicate_tuple:
275                     self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
276                     self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
277                 self.delete_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
278
279                 # Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds
280                 for record in duplicate_tuple:
281                     self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
282                     self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
283
284         for record_type_str in distinct:
285             for distinct_tuple in distinct[record_type_str]:
286                 # Attempt to add distinct and make sure that they all succeed within a tuple
287                 i = 0
288                 for record in distinct_tuple:
289                     i = i + 1
290                     try:
291                         self.add_record(self.custom_zone, "testrecord", record_type_str, record)
292                         # All records should have been added.
293                         self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=i)
294                     except AssertionError as e:
295                         raise AssertionError("Failed to add %s, which should be distinct from all others in the set. "
296                                              "Original error: %s\nDistinct set: %s." % (record, e, distinct_tuple))
297                 for record in distinct_tuple:
298                     self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
299                     # CNAMEs should not have been added, since they conflict.
300                     if record_type_str == 'CNAME':
301                         continue
302
303                 # Add the first distinct and attempt to remove all of the others, making sure this fails
304                 # Windows fails this test. This is probably due to weird tombstoning behavior.
305                 self.add_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
306                 for record in distinct_tuple:
307                     if record == distinct_tuple[0]:
308                         continue
309                     try:
310                         self.delete_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
311                     except AssertionError as e:
312                         raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s"
313                                              % (distinct_tuple[0], record, e))
314                 self.delete_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
315
316     def test_accept_valid_commands(self):
317         """
318         Make sure that we can add, update and delete a variety
319         of valid records.
320         """
321         for record_type_str in self.good_records:
322             for record_str in self.good_records[record_type_str]:
323                 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
324                 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
325                 self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
326
327     def check_params(self, wDataLength, rank, flags, dwTtlSeconds, dwReserved, data,
328                      wType, dwTimeStamp=0, zone="zone", rec_name="testrecord"):
329         res = self.get_record_from_db(zone, rec_name)
330         self.assertIsNotNone(res, "Expected record %s but was not found over LDAP." % data)
331         (rec_dn, rec) = res
332         self.assertEqual(wDataLength, rec.wDataLength, "Unexpected data length for record %s. Got %s, expected %s." % (data, rec.wDataLength, wDataLength))
333         self.assertEqual(rank, rec.rank, "Unexpected rank for record %s. Got %s, expected %s." % (data, rec.rank, rank))
334         self.assertEqual(flags, rec.flags, "Unexpected flags for record %s. Got %s, expected %s." % (data, rec.flags, flags))
335         self.assertEqual(dwTtlSeconds, rec.dwTtlSeconds, "Unexpected time to live for record %s. Got %s, expected %s." % (data, rec.dwTtlSeconds, dwTtlSeconds))
336         self.assertEqual(dwReserved, rec.dwReserved, "Unexpected dwReserved for record %s. Got %s, expected %s." % (data, rec.dwReserved, dwReserved))
337         self.assertEqual(data.lower(), rec.data.lower(), "Unexpected data for record %s. Got %s, expected %s." % (data, rec.data.lower(), data.lower()))
338         self.assertEqual(wType, rec.wType, "Unexpected wType for record %s. Got %s, expected %s." % (data, rec.wType, wType))
339         self.assertEqual(dwTimeStamp, rec.dwTimeStamp, "Unexpected timestamp for record %s. Got %s, expected %s." % (data, rec.dwTimeStamp, dwTimeStamp))
340
341     def test_record_params(self):
342         """
343         Make sure that, when we add records to the database,
344         they're added with reasonable parameters.
345         """
346         self.add_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
347         self.check_params(4, 240, 0, 900, 0, "192.168.50.50", 1)
348         self.delete_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
349         self.add_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
350         self.check_params(16, 240, 0, 900, 0, "AAAA:AAAA:0000:0000:0000:0000:0000:0000", 28)
351         self.delete_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
352         self.add_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
353         self.check_params(13, 240, 0, 900, 0, "cnamedest", 5)
354         self.delete_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
355
356     def test_reject_invalid_commands(self):
357         """
358         Make sure that we can't add a variety of invalid records,
359         and that we can't update valid records to invalid ones.
360         """
361         num_failures = 0
362         for record_type_str in self.bad_records:
363             for record_str in self.bad_records[record_type_str]:
364                 # Attempt to add the bad record, which should fail. Then, attempt to query for and delete
365                 # it. Since it shouldn't exist, these should fail too.
366                 try:
367                     self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
368                     self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
369                     self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
370                 except AssertionError as e:
371                     print(e)
372                     num_failures = num_failures + 1
373
374         # Also try to update valid records to invalid ones, making sure this fails
375         for record_type_str in self.bad_records:
376             for record_str in self.bad_records[record_type_str]:
377                 good_record_str = self.good_records[record_type_str][0]
378                 self.add_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
379                 try:
380                     self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
381                 except AssertionError as e:
382                     print(e)
383                     num_failures = num_failures + 1
384                 self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
385
386         self.assertTrue(num_failures == 0, "Failed to reject invalid commands. Total failures: %d." % num_failures)
387
388     def test_add_duplicate_different_type(self):
389         """
390         Attempt to add some values which have the same name as
391         existing ones, just a different type.
392         """
393         num_failures = 0
394         for record_type_str_1 in self.good_records:
395             record1 = self.good_records[record_type_str_1][0]
396             self.add_record(self.custom_zone, "testrecord", record_type_str_1, record1)
397             for record_type_str_2 in self.good_records:
398                 if record_type_str_1 == record_type_str_2:
399                     continue
400
401                 record2 = self.good_records[record_type_str_2][0]
402
403                 has_a = record_type_str_1 == 'A' or record_type_str_2 == 'A'
404                 has_aaaa = record_type_str_1 == 'AAAA' or record_type_str_2 == 'AAAA'
405                 has_cname = record_type_str_1 == 'CNAME' or record_type_str_2 == 'CNAME'
406                 has_ptr = record_type_str_1 == 'PTR' or record_type_str_2 == 'PTR'
407                 has_mx = record_type_str_1 == 'MX' or record_type_str_2 == 'MX'
408                 has_srv = record_type_str_1 == 'SRV' or record_type_str_2 == 'SRV'
409                 has_txt = record_type_str_1 == 'TXT' or record_type_str_2 == 'TXT'
410
411                 # If we attempt to add any record except A or AAAA when we already have an NS record,
412                 # the add should fail.
413                 add_error_ok = False
414                 if record_type_str_1 == 'NS' and not has_a and not has_aaaa:
415                     add_error_ok = True
416                 # If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail.
417                 if record_type_str_2 == 'CNAME' and (has_ptr or has_mx or has_a or has_aaaa):
418                     add_error_ok = True
419                 # If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail.
420                 # If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail.
421                 if has_cname and (has_a or has_aaaa or has_srv or has_txt):
422                     add_error_ok = True
423
424                 try:
425                     self.add_record(self.custom_zone, "testrecord", record_type_str_2, record2)
426                     if add_error_ok:
427                         num_failures = num_failures + 1
428                         print("Expected error when adding %s while a %s existed."
429                               % (record_type_str_2, record_type_str_1))
430                 except AssertionError as e:
431                     if not add_error_ok:
432                         num_failures = num_failures + 1
433                         print("Didn't expect error when adding %s while a %s existed."
434                               % (record_type_str_2, record_type_str_1))
435
436                 if not add_error_ok:
437                     # In the "normal" case, we expect the add to work and us to have one of each type of record afterwards.
438                     expected_num_type_1 = 1
439                     expected_num_type_2 = 1
440
441                     # If we have an MX record, a PTR record should replace it when added.
442                     # If we have a PTR record, an MX record should replace it when added.
443                     if has_ptr and has_mx:
444                         expected_num_type_1 = 0
445
446                     # If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added.
447                     if (has_cname or has_srv or has_txt) and (record_type_str_2 == 'PTR' or record_type_str_2 == 'MX'):
448                         expected_num_type_1 = 0
449
450                     if (record_type_str_1 == 'NS' and (has_a or has_aaaa)):
451                         expected_num_type_2 = 0
452
453                     try:
454                         self.assert_num_records(self.custom_zone, "testrecord", record_type_str_1, expected_num=expected_num_type_1)
455                     except AssertionError as e:
456                         num_failures = num_failures + 1
457                         print("Expected %s %s records after adding a %s record and a %s record already existed."
458                               % (expected_num_type_1, record_type_str_1, record_type_str_2, record_type_str_1))
459                     try:
460                         self.assert_num_records(self.custom_zone, "testrecord", record_type_str_2, expected_num=expected_num_type_2)
461                     except AssertionError as e:
462                         num_failures = num_failures + 1
463                         print("Expected %s %s records after adding a %s record and a %s record already existed."
464                               % (expected_num_type_2, record_type_str_2, record_type_str_2, record_type_str_1))
465
466                 try:
467                     self.delete_record(self.custom_zone, "testrecord", record_type_str_2, record2)
468                 except AssertionError as e:
469                     pass
470
471             self.delete_record(self.custom_zone, "testrecord", record_type_str_1, record1)
472
473         self.assertTrue(num_failures == 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures)
474
475     # Windows fails this test in the same way we do.
476     def _test_cname(self):
477         """
478         Test some special properties of CNAME records.
479         """
480
481         # RFC 1912: When there is a CNAME record, there must not be any other records with the same alias
482         cname_record = self.good_records["CNAME"][1]
483         self.add_record(self.custom_zone, "testrecord", "CNAME", cname_record)
484
485         for record_type_str in self.good_records:
486             other_record = self.good_records[record_type_str][0]
487             self.add_record(self.custom_zone, "testrecord", record_type_str, other_record, assertion=False)
488             self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
489
490         # RFC 2181: MX & NS records must not be allowed to point to a CNAME alias
491         mx_record = "testrecord 1"
492         ns_record = "testrecord"
493
494         self.add_record(self.custom_zone, "mxrec", "MX", mx_record, assertion=False)
495         self.add_record(self.custom_zone, "nsrec", "NS", ns_record, assertion=False)
496
497         self.delete_record(self.custom_zone, "testrecord", "CNAME", cname_record)
498
499     def test_add_duplicate_value(self):
500         """
501         Make sure that we can't add duplicate values of any type.
502         """
503         for record_type_str in self.good_records:
504             record = self.good_records[record_type_str][0]
505
506             self.add_record(self.custom_zone, "testrecord", record_type_str, record)
507             self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
508             self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
509             self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
510
511     def test_add_similar_value(self):
512         """
513         Attempt to add values with the same name and type in the same
514         zone. This should work, and should result in both values
515         existing (except with some types).
516         """
517         for record_type_str in self.good_records:
518             for i in range(1, len(self.good_records[record_type_str])):
519                 record1 = self.good_records[record_type_str][i - 1]
520                 record2 = self.good_records[record_type_str][i]
521
522                 if record_type_str == 'CNAME':
523                     continue
524                 # We expect CNAME records to override one another, as
525                 # an alias can only map to one CNAME record.
526                 # Also, on Windows, when the empty string is added and
527                 # another record is added afterwards, the empty string
528                 # will be silently overridden by the new one, so it
529                 # fails this test for the empty string.
530                 expected_num = 1 if record_type_str == 'CNAME' else 2
531
532                 self.add_record(self.custom_zone, "testrecord", record_type_str, record1)
533                 self.add_record(self.custom_zone, "testrecord", record_type_str, record2)
534                 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=expected_num)
535                 self.delete_record(self.custom_zone, "testrecord", record_type_str, record1)
536                 self.delete_record(self.custom_zone, "testrecord", record_type_str, record2)
537
538     def assert_record(self, zone, name, record_type_str, expected_record_str,
539                       assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
540         """
541         Asserts whether or not the given record with the given type exists in the
542         given zone.
543         """
544         try:
545             _, result = self.query_records(zone, name, record_type_str)
546         except RuntimeError as e:
547             if assertion:
548                 raise AssertionError("Record '%s' of type '%s' was not present when it should have been."
549                                      % (expected_record_str, record_type_str))
550             else:
551                 return
552
553         found = False
554         for record in result.rec[0].records:
555             if record.data == expected_record_str:
556                 found = True
557                 break
558
559         if found and not assertion:
560             raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str, record_type_str))
561         elif not found and assertion:
562             raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str))
563
564     def assert_num_records(self, zone, name, record_type_str, expected_num=1,
565                            client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
566         """
567         Asserts that there are a given amount of records with the given type in
568         the given zone.
569         """
570         try:
571             _, result = self.query_records(zone, name, record_type_str)
572             num_results = len(result.rec[0].records)
573             if not num_results == expected_num:
574                 raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected."
575                                      % (num_results, record_type_str, name, expected_num))
576         except RuntimeError:
577             if not expected_num == 0:
578                 raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected."
579                                      % (record_type_str, name, expected_num))
580
581     def query_records(self, zone, name, record_type_str, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
582         return self.conn.DnssrvEnumRecords2(client_version,
583                                             0,
584                                             self.server,
585                                             zone,
586                                             name,
587                                             None,
588                                             self.record_type_int(record_type_str),
589                                             dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver.DNS_RPC_VIEW_NO_CHILDREN,
590                                             None,
591                                             None)
592
593     def record_obj_from_str(self, record_type_str, record_str):
594         if record_type_str == 'A':
595             return ARecord(record_str)
596         elif record_type_str == 'AAAA':
597             return AAAARecord(record_str)
598         elif record_type_str == 'PTR':
599             return PTRRecord(record_str)
600         elif record_type_str == 'CNAME':
601             return CNameRecord(record_str)
602         elif record_type_str == 'NS':
603             return NSRecord(record_str)
604         elif record_type_str == 'MX':
605             split = record_str.split(' ')
606             return MXRecord(split[0], int(split[1]))
607         elif record_type_str == 'SRV':
608             split = record_str.split(' ')
609             target = split[0]
610             port = int(split[1])
611             priority = int(split[2])
612             weight = int(split[3])
613             return SRVRecord(target, port, priority, weight)
614         elif record_type_str == 'TXT':
615             return TXTRecord(record_str)
616
617     def record_type_int(self, record_type_str):
618         if record_type_str == 'A':
619             return dnsp.DNS_TYPE_A
620         elif record_type_str == 'AAAA':
621             return dnsp.DNS_TYPE_AAAA
622         elif record_type_str == 'PTR':
623             return dnsp.DNS_TYPE_PTR
624         elif record_type_str == 'CNAME':
625             return dnsp.DNS_TYPE_CNAME
626         elif record_type_str == 'NS':
627             return dnsp.DNS_TYPE_NS
628         elif record_type_str == 'MX':
629             return dnsp.DNS_TYPE_MX
630         elif record_type_str == 'SRV':
631             return dnsp.DNS_TYPE_SRV
632         elif record_type_str == 'TXT':
633             return dnsp.DNS_TYPE_TXT
634
635     def add_record(self, zone, name, record_type_str, record_str,
636                    assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
637         """
638         Attempts to add a map from the given name to a record of the given type,
639         in the given zone.
640         Also asserts whether or not the add was successful.
641         This can also update existing records if they have the same name.
642         """
643         record = self.record_obj_from_str(record_type_str, record_str)
644         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
645         add_rec_buf.rec = record
646
647         try:
648             self.conn.DnssrvUpdateRecord2(client_version,
649                                           0,
650                                           self.server,
651                                           zone,
652                                           name,
653                                           add_rec_buf,
654                                           None)
655             if not assertion:
656                 raise AssertionError("Successfully added record '%s' of type '%s', which should have failed."
657                                      % (record_str, record_type_str))
658         except RuntimeError as e:
659             if assertion:
660                 raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'."
661                                      % (record_str, record_type_str, str(e)))
662
663     def delete_record(self, zone, name, record_type_str, record_str,
664                       assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
665         """
666         Attempts to delete a record with the given name, record and record type
667         from the given zone.
668         Also asserts whether or not the deletion was successful.
669         """
670         record = self.record_obj_from_str(record_type_str, record_str)
671         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
672         del_rec_buf.rec = record
673
674         try:
675             self.conn.DnssrvUpdateRecord2(client_version,
676                                           0,
677                                           self.server,
678                                           zone,
679                                           name,
680                                           None,
681                                           del_rec_buf)
682             if not assertion:
683                 raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str, record_type_str))
684         except RuntimeError as e:
685             if assertion:
686                 raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e)))
687
688     def test_query2(self):
689         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
690                                                 0,
691                                                 self.server,
692                                                 None,
693                                                 'ServerInfo')
694         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid)
695
696         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET,
697                                                 0,
698                                                 self.server,
699                                                 None,
700                                                 'ServerInfo')
701         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid)
702
703         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
704                                                 0,
705                                                 self.server,
706                                                 None,
707                                                 'ServerInfo')
708         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid)
709
710     def test_operation2(self):
711         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
712         rev_zone = '1.168.192.in-addr.arpa'
713
714         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
715         zone_create.pszZoneName = rev_zone
716         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
717         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
718         zone_create.fAging = 0
719         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
720
721         # Create zone
722         self.conn.DnssrvOperation2(client_version,
723                                    0,
724                                    self.server,
725                                    None,
726                                    0,
727                                    'ZoneCreate',
728                                    dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
729                                    zone_create)
730
731         request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
732                           dnsserver.DNS_ZONE_REQUEST_PRIMARY)
733         _, zones = self.conn.DnssrvComplexOperation2(client_version,
734                                                      0,
735                                                      self.server,
736                                                      None,
737                                                      'EnumZones',
738                                                      dnsserver.DNSSRV_TYPEID_DWORD,
739                                                      request_filter)
740         self.assertEquals(1, zones.dwZoneCount)
741
742         # Delete zone
743         self.conn.DnssrvOperation2(client_version,
744                                    0,
745                                    self.server,
746                                    rev_zone,
747                                    0,
748                                    'DeleteZoneFromDs',
749                                    dnsserver.DNSSRV_TYPEID_NULL,
750                                    None)
751
752         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
753                                                           0,
754                                                           self.server,
755                                                           None,
756                                                           'EnumZones',
757                                                           dnsserver.DNSSRV_TYPEID_DWORD,
758                                                           request_filter)
759         self.assertEquals(0, zones.dwZoneCount)
760
761
762     def test_complexoperation2(self):
763         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
764         request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
765                           dnsserver.DNS_ZONE_REQUEST_PRIMARY)
766
767         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
768                                                           0,
769                                                           self.server,
770                                                           None,
771                                                           'EnumZones',
772                                                           dnsserver.DNSSRV_TYPEID_DWORD,
773                                                           request_filter)
774         self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
775         self.assertEquals(3, zones.dwZoneCount)
776
777         request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
778                           dnsserver.DNS_ZONE_REQUEST_PRIMARY)
779         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
780                                                           0,
781                                                           self.server,
782                                                           None,
783                                                           'EnumZones',
784                                                           dnsserver.DNSSRV_TYPEID_DWORD,
785                                                           request_filter)
786         self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
787         self.assertEquals(0, zones.dwZoneCount)
788
789     def test_enumrecords2(self):
790         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
791         record_type = dnsp.DNS_TYPE_NS
792         select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
793                         dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
794         _, roothints = self.conn.DnssrvEnumRecords2(client_version,
795                                                     0,
796                                                     self.server,
797                                                     '..RootHints',
798                                                     '.',
799                                                     None,
800                                                     record_type,
801                                                     select_flags,
802                                                     None,
803                                                     None)
804         self.assertEquals(14, roothints.count)  # 1 NS + 13 A records (a-m)
805
806     def test_updaterecords2(self):
807         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
808         record_type = dnsp.DNS_TYPE_A
809         select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
810
811         name = 'dummy'
812         rec = ARecord('1.2.3.4')
813         rec2 = ARecord('5.6.7.8')
814
815         # Add record
816         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
817         add_rec_buf.rec = rec
818         self.conn.DnssrvUpdateRecord2(client_version,
819                                       0,
820                                       self.server,
821                                       self.zone,
822                                       name,
823                                       add_rec_buf,
824                                       None)
825
826         _, result = self.conn.DnssrvEnumRecords2(client_version,
827                                                  0,
828                                                  self.server,
829                                                  self.zone,
830                                                  name,
831                                                  None,
832                                                  record_type,
833                                                  select_flags,
834                                                  None,
835                                                  None)
836         self.assertEquals(1, result.count)
837         self.assertEquals(1, result.rec[0].wRecordCount)
838         self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
839         self.assertEquals('1.2.3.4', result.rec[0].records[0].data)
840
841         # Update record
842         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
843         add_rec_buf.rec = rec2
844         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
845         del_rec_buf.rec = rec
846         self.conn.DnssrvUpdateRecord2(client_version,
847                                       0,
848                                       self.server,
849                                       self.zone,
850                                       name,
851                                       add_rec_buf,
852                                       del_rec_buf)
853
854         buflen, result = self.conn.DnssrvEnumRecords2(client_version,
855                                                       0,
856                                                       self.server,
857                                                       self.zone,
858                                                       name,
859                                                       None,
860                                                       record_type,
861                                                       select_flags,
862                                                       None,
863                                                       None)
864         self.assertEquals(1, result.count)
865         self.assertEquals(1, result.rec[0].wRecordCount)
866         self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
867         self.assertEquals('5.6.7.8', result.rec[0].records[0].data)
868
869         # Delete record
870         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
871         del_rec_buf.rec = rec2
872         self.conn.DnssrvUpdateRecord2(client_version,
873                                       0,
874                                       self.server,
875                                       self.zone,
876                                       name,
877                                       None,
878                                       del_rec_buf)
879
880         self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2,
881                           client_version,
882                           0,
883                           self.server,
884                           self.zone,
885                           name,
886                           None,
887                           record_type,
888                           select_flags,
889                           None,
890                           None)
891
892     # The following tests do not pass against Samba because the owner and
893     # group are not consistent with Windows, as well as some ACEs.
894     #
895     # The following ACE are also required for 2012R2:
896     #
897     # (OA;CIIO;WP;ea1b7b93-5e48-46d5-bc6c-4df4fda78a35;bf967a86-0de6-11d0-a285-00aa003049e2;PS)
898     # (OA;OICI;RPWP;3f78c3e5-f79a-46bd-a0b8-9d18116ddc79;;PS)"
899     #
900     # [TPM + Allowed-To-Act-On-Behalf-Of-Other-Identity]
901     def test_security_descriptor_msdcs_zone(self):
902         """
903         Make sure that security descriptors of the msdcs zone is
904         as expected.
905         """
906
907         zones = self.samdb.search(base="DC=ForestDnsZones,%s" % self.samdb.get_default_basedn(),
908                                   scope=ldb.SCOPE_SUBTREE,
909                                   expression="(&(objectClass=dnsZone)(name=_msdcs*))",
910                                   attrs=["nTSecurityDescriptor", "objectClass"])
911         self.assertEqual(len(zones), 1)
912         self.assertTrue("nTSecurityDescriptor" in zones[0])
913         tmp = zones[0]["nTSecurityDescriptor"][0]
914         utils = sd_utils.SDUtils(self.samdb)
915         sd = ndr_unpack(security.descriptor, tmp)
916
917         domain_sid = security.dom_sid(self.samdb.get_domain_sid())
918
919         res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
920                                 expression="(sAMAccountName=DnsAdmins)",
921                                 attrs=["objectSid"])
922
923         dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
924
925         packed_sd = descriptor.sddl2binary("O:SYG:BA" \
926                                            "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" \
927                                            "(A;;CC;;;AU)" \
928                                            "(A;;RPLCLORC;;;WD)" \
929                                            "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
930                                            "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
931                                            domain_sid, {"DnsAdmins": dns_admin})
932         expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
933
934         diff = descriptor.get_diff_sds(expected_sd, sd, domain_sid)
935         self.assertEqual(diff, '', "SD of msdcs zone different to expected.\n"
936                          "Difference was:\n%s\nExpected: %s\nGot: %s" %
937                          (diff, expected_sd.as_sddl(utils.domain_sid),
938                           sd.as_sddl(utils.domain_sid)))
939
940     def test_security_descriptor_forest_zone(self):
941         """
942         Make sure that security descriptors of forest dns zones are
943         as expected.
944         """
945         forest_zone = "test_forest_zone"
946         zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
947         zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
948         zone_create_info.fAging = 0
949         zone_create_info.fDsIntegrated = 1
950         zone_create_info.fLoadExisting = 1
951
952         zone_create_info.pszZoneName = forest_zone
953         zone_create_info.dwDpFlags = dnsserver.DNS_DP_FOREST_DEFAULT
954
955         self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
956                                    0,
957                                    self.server,
958                                    None,
959                                    0,
960                                    'ZoneCreate',
961                                    dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
962                                    zone_create_info)
963
964         partition_dn = self.samdb.get_default_basedn()
965         partition_dn.add_child("DC=ForestDnsZones")
966         zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
967                                   expression="(name=%s)" % forest_zone,
968                                   attrs=["nTSecurityDescriptor"])
969         self.assertEqual(len(zones), 1)
970         current_dn = zones[0].dn
971         self.assertTrue("nTSecurityDescriptor" in zones[0])
972         tmp = zones[0]["nTSecurityDescriptor"][0]
973         utils = sd_utils.SDUtils(self.samdb)
974         sd = ndr_unpack(security.descriptor, tmp)
975
976         domain_sid = security.dom_sid(self.samdb.get_domain_sid())
977
978         res = self.samdb.search(base=self.samdb.get_default_basedn(),
979                                 scope=ldb.SCOPE_SUBTREE,
980                                 expression="(sAMAccountName=DnsAdmins)",
981                                 attrs=["objectSid"])
982
983         dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
984
985         packed_sd = descriptor.sddl2binary("O:DAG:DA" \
986                                            "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" \
987                                            "(A;;CC;;;AU)" \
988                                            "(A;;RPLCLORC;;;WD)" \
989                                            "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
990                                            "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
991                                            domain_sid, {"DnsAdmins": dns_admin})
992         expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
993
994         packed_msdns = descriptor.get_dns_forest_microsoft_dns_descriptor(domain_sid,
995                                                                           {"DnsAdmins": dns_admin})
996         expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
997
998         packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
999         expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
1000                                                               packed_part_sd))
1001         try:
1002             msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
1003             security_desc_dict = [(current_dn.get_linearized(), expected_sd),
1004                                   (msdns_dn.get_linearized(), expected_msdns_sd),
1005                                   (partition_dn.get_linearized(), expected_part_sd)]
1006
1007             for (key, sec_desc) in security_desc_dict:
1008                 zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
1009                                           attrs=["nTSecurityDescriptor"])
1010                 self.assertTrue("nTSecurityDescriptor" in zones[0])
1011                 tmp = zones[0]["nTSecurityDescriptor"][0]
1012                 utils = sd_utils.SDUtils(self.samdb)
1013
1014                 sd = ndr_unpack(security.descriptor, tmp)
1015                 diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
1016
1017                 self.assertEqual(diff, '', "Security descriptor of forest DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1018                                  % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))
1019
1020         finally:
1021             self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1022                                        0,
1023                                        self.server,
1024                                        forest_zone,
1025                                        0,
1026                                        'DeleteZoneFromDs',
1027                                        dnsserver.DNSSRV_TYPEID_NULL,
1028                                        None)
1029
1030     def test_security_descriptor_domain_zone(self):
1031         """
1032         Make sure that security descriptors of domain dns zones are
1033         as expected.
1034         """
1035
1036         partition_dn = self.samdb.get_default_basedn()
1037         partition_dn.add_child("DC=DomainDnsZones")
1038         zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
1039                                   expression="(name=%s)" % self.custom_zone,
1040                                   attrs=["nTSecurityDescriptor"])
1041         self.assertEqual(len(zones), 1)
1042         current_dn = zones[0].dn
1043         self.assertTrue("nTSecurityDescriptor" in zones[0])
1044         tmp = zones[0]["nTSecurityDescriptor"][0]
1045         utils = sd_utils.SDUtils(self.samdb)
1046         sd = ndr_unpack(security.descriptor, tmp)
1047         sddl = sd.as_sddl(utils.domain_sid)
1048
1049         domain_sid = security.dom_sid(self.samdb.get_domain_sid())
1050
1051         res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
1052                                 expression="(sAMAccountName=DnsAdmins)",
1053                                 attrs=["objectSid"])
1054
1055         dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
1056
1057         packed_sd = descriptor.sddl2binary("O:DAG:DA" \
1058                                            "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" \
1059                                            "(A;;CC;;;AU)" \
1060                                            "(A;;RPLCLORC;;;WD)" \
1061                                            "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
1062                                            "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1063                                            domain_sid, {"DnsAdmins": dns_admin})
1064         expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
1065
1066         packed_msdns = descriptor.get_dns_domain_microsoft_dns_descriptor(domain_sid,
1067                                                                           {"DnsAdmins": dns_admin})
1068         expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
1069
1070         packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
1071         expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
1072                                                               packed_part_sd))
1073
1074         msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
1075         security_desc_dict = [(current_dn.get_linearized(), expected_sd),
1076                               (msdns_dn.get_linearized(), expected_msdns_sd),
1077                               (partition_dn.get_linearized(), expected_part_sd)]
1078
1079         for (key, sec_desc) in security_desc_dict:
1080             zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
1081                                       attrs=["nTSecurityDescriptor"])
1082             self.assertTrue("nTSecurityDescriptor" in zones[0])
1083             tmp = zones[0]["nTSecurityDescriptor"][0]
1084             utils = sd_utils.SDUtils(self.samdb)
1085
1086             sd = ndr_unpack(security.descriptor, tmp)
1087             diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
1088
1089             self.assertEqual(diff, '', "Security descriptor of domain DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1090                              % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))