PEP8: fix E303: too many blank lines (2)
[garming/samba-autobuild/.git] / python / samba / tests / dcerpc / dnsserver.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for samba.dcerpc.dnsserver"""
20
21 import os
22 import ldb
23
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.ndr import ndr_unpack, ndr_pack
27 from samba.dcerpc import dnsp, dnsserver, security
28 from samba.tests import RpcInterfaceTestCase, env_get_var_value
29 from samba.netcmd.dns import ARecord, AAAARecord, PTRRecord, CNameRecord, NSRecord, MXRecord, SRVRecord, TXTRecord
30 from samba import sd_utils, descriptor
31
32
33 class DnsserverTests(RpcInterfaceTestCase):
34
35     @classmethod
36     def setUpClass(cls):
37         good_dns = ["SAMDOM.EXAMPLE.COM",
38                     "1.EXAMPLE.COM",
39                     "%sEXAMPLE.COM" % ("1." * 100),
40                     "EXAMPLE",
41                     "\n.COM",
42                     "!@#$%^&*()_",
43                     "HIGH\xFFBYTE",
44                     "@.EXAMPLE.COM",
45                     "."]
46         bad_dns = ["...",
47                    ".EXAMPLE.COM",
48                    ".EXAMPLE.",
49                    "",
50                    "SAMDOM..EXAMPLE.COM"]
51
52         good_mx = ["SAMDOM.EXAMPLE.COM 65535"]
53         bad_mx = []
54
55         good_srv = ["SAMDOM.EXAMPLE.COM 65535 65535 65535"]
56         bad_srv = []
57
58         for bad_dn in bad_dns:
59             bad_mx.append("%s 1" % bad_dn)
60             bad_srv.append("%s 0 0 0" % bad_dn)
61         for good_dn in good_dns:
62             good_mx.append("%s 1" % good_dn)
63             good_srv.append("%s 0 0 0" % good_dn)
64
65         cls.good_records = {
66             "A": ["192.168.0.1",
67                   "255.255.255.255"],
68             "AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
69                      "0000:0000:0000:0000:0000:0000:0000:0000",
70                      "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
71                      "1234:1234:1234::",
72                      "1234:1234:1234:1234:1234::",
73                      "1234:5678:9ABC:DEF0::",
74                      "0000:0000::0000",
75                      "1234::5678:9ABC:0000:0000:0000:0000",
76                      "::1",
77                      "::",
78                      "1:1:1:1:1:1:1:1"],
79             "PTR": good_dns,
80             "CNAME": good_dns,
81             "NS": good_dns,
82             "MX": good_mx,
83             "SRV": good_srv,
84             "TXT": ["text", "", "@#!", "\n"]
85         }
86
87         cls.bad_records = {
88             "A": ["192.168.0.500",
89                   "255.255.255.255/32"],
90             "AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
91                      "0000:0000:0000:0000:0000:0000:0000:0000/1",
92                      "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
93                      "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
94                      "1234:5678:9ABC:DEF0:1234:5678:9ABC",
95                      "1111::1111::1111"],
96             "PTR": bad_dns,
97             "CNAME": bad_dns,
98             "NS": bad_dns,
99             "MX": bad_mx,
100             "SRV": bad_srv
101         }
102
103         # Because we use uint16_t for these numbers, we can't
104         # actually create these records.
105         invalid_mx = ["SAMDOM.EXAMPLE.COM -1",
106                       "SAMDOM.EXAMPLE.COM 65536",
107                       "%s 1" % "A" *256]
108         invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
109                        "SAMDOM.EXAMPLE.COM 0 0 65536",
110                        "SAMDOM.EXAMPLE.COM 65536 0 0"]
111         cls.invalid_records = {
112             "MX": invalid_mx,
113             "SRV": invalid_srv
114         }
115
116     def setUp(self):
117         super(DnsserverTests, self).setUp()
118         self.server = os.environ["DC_SERVER"]
119         self.zone = env_get_var_value("REALM").lower()
120         self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
121                                         self.get_loadparm(),
122                                         self.get_credentials())
123
124         self.samdb = SamDB(url="ldap://%s" % os.environ["DC_SERVER_IP"],
125                            lp=self.get_loadparm(),
126                            session_info=system_session(),
127                            credentials=self.get_credentials())
128
129         self.custom_zone = "zone"
130         zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
131         zone_create_info.pszZoneName = self.custom_zone
132         zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
133         zone_create_info.fAging = 0
134         zone_create_info.fDsIntegrated = 1
135         zone_create_info.fLoadExisting = 1
136         zone_create_info.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
137
138         self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
139                                    0,
140                                    self.server,
141                                    None,
142                                    0,
143                                    'ZoneCreate',
144                                    dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
145                                    zone_create_info)
146
147     def tearDown(self):
148         self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
149                                    0,
150                                    self.server,
151                                    self.custom_zone,
152                                    0,
153                                    'DeleteZoneFromDs',
154                                    dnsserver.DNSSRV_TYPEID_NULL,
155                                    None)
156         super(DnsserverTests, self).tearDown()
157
158     # This test fails against Samba (but passes against Windows),
159     # because Samba does not return the record when we enum records.
160     # Records can be given DNS_RANK_NONE when the zone they are in
161     # does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be
162     # deleted, however, we do not consider this urgent to fix and
163     # so this test is a knownfail.
164     def test_rank_none(self):
165         """
166         See what happens when we set a record's rank to
167         DNS_RANK_NONE.
168         """
169
170         record_str = "192.168.50.50"
171         record_type_str = "A"
172         self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
173
174         dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
175         record.rank = 0  # DNS_RANK_NONE
176         res = self.samdb.dns_replace_by_dn(dn, [record])
177         if res is not None:
178             self.fail("Unable to update dns record to have DNS_RANK_NONE.")
179
180         self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
181         self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
182         self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
183         self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
184
185     def test_dns_tombstoned(self):
186         """
187         See what happens when we set a record to be tombstoned.
188         """
189
190         record_str = "192.168.50.50"
191         record_type_str = "A"
192         self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
193
194         dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
195         record.wType = dnsp.DNS_TYPE_TOMBSTONE
196         res = self.samdb.dns_replace_by_dn(dn, [record])
197         if res is not None:
198             self.fail("Unable to update dns record to be tombstoned.")
199
200         self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
201         self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
202         self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
203
204     def get_record_from_db(self, zone_name, record_name):
205         """
206         Returns (dn of record, record)
207         """
208
209         zones = self.samdb.search(base="DC=DomainDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
210                                   expression="(objectClass=dnsZone)",
211                                   attrs=["cn"])
212
213         zone_dn = None
214         for zone in zones:
215             if "DC=%s," % zone_name in str(zone.dn):
216                 zone_dn = zone.dn
217                 break
218
219         if zone_dn is None:
220             raise AssertionError("Couldn't find zone '%s'." % zone_name)
221
222         records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
223                                     expression="(objectClass=dnsNode)",
224                                     attrs=["dnsRecord"])
225
226         for old_packed_record in records:
227             if record_name in str(old_packed_record.dn):
228                 rec = ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0])
229                 return (old_packed_record.dn, rec)
230
231     def test_duplicate_matching(self):
232         """
233         Make sure that records which should be distinct from each other or duplicate
234         to each other behave as expected.
235         """
236
237         distinct_dns = [("SAMDOM.EXAMPLE.COM",
238                          "SAMDOM.EXAMPLE.CO",
239                          "EXAMPLE.COM", "SAMDOM.EXAMPLE")]
240         duplicate_dns = [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"),
241                          ("EXAMPLE.", "EXAMPLE")]
242
243         # Every tuple has entries which should be considered duplicate to one another.
244         duplicates = {
245             "AAAA": [("AAAA::", "aaaa::"),
246                      ("AAAA::", "AAAA:0000::"),
247                      ("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"),
248                      ("AAAA::", "AAAA:0:0:0:0:0:0:0"),
249                      ("0123::", "123::"),
250                      ("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")],
251         }
252
253         # Every tuple has entries which should be considered distinct from one another.
254         distinct = {
255             "A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")],
256             "AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"),
257                      ("1000::", "::1000"),
258                      ("::1", "::11", "::1111"),
259                      ("1234::", "0234::")],
260             "SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1",
261                      "SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")],
262             "MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")],
263             "TXT": [("A RECORD", "B RECORD", "a record")]
264         }
265
266         for record_type_str in ("PTR", "CNAME", "NS"):
267             distinct[record_type_str] = distinct_dns
268             duplicates[record_type_str] = duplicate_dns
269
270         for record_type_str in duplicates:
271             for duplicate_tuple in duplicates[record_type_str]:
272                 # Attempt to add duplicates and make sure that all after the first fails
273                 self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
274                 for record in duplicate_tuple:
275                     self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
276                     self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
277                 self.delete_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
278
279                 # Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds
280                 for record in duplicate_tuple:
281                     self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
282                     self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
283
284         for record_type_str in distinct:
285             for distinct_tuple in distinct[record_type_str]:
286                 # Attempt to add distinct and make sure that they all succeed within a tuple
287                 i = 0
288                 for record in distinct_tuple:
289                     i = i + 1
290                     try:
291                         self.add_record(self.custom_zone, "testrecord", record_type_str, record)
292                         # All records should have been added.
293                         self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=i)
294                     except AssertionError as e:
295                         raise AssertionError("Failed to add %s, which should be distinct from all others in the set. "
296                                              "Original error: %s\nDistinct set: %s." % (record, e, distinct_tuple))
297                 for record in distinct_tuple:
298                     self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
299                     # CNAMEs should not have been added, since they conflict.
300                     if record_type_str == 'CNAME':
301                         continue
302
303                 # Add the first distinct and attempt to remove all of the others, making sure this fails
304                 # Windows fails this test. This is probably due to weird tombstoning behavior.
305                 self.add_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
306                 for record in distinct_tuple:
307                     if record == distinct_tuple[0]:
308                         continue
309                     try:
310                         self.delete_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
311                     except AssertionError as e:
312                         raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s"
313                                              % (distinct_tuple[0], record, e))
314                 self.delete_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
315
316     def test_accept_valid_commands(self):
317         """
318         Make sure that we can add, update and delete a variety
319         of valid records.
320         """
321         for record_type_str in self.good_records:
322             for record_str in self.good_records[record_type_str]:
323                 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
324                 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
325                 self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
326
327     def check_params(self, wDataLength, rank, flags, dwTtlSeconds, dwReserved, data,
328                      wType, dwTimeStamp=0, zone="zone", rec_name="testrecord"):
329         res = self.get_record_from_db(zone, rec_name)
330         self.assertIsNotNone(res, "Expected record %s but was not found over LDAP." % data)
331         (rec_dn, rec) = res
332         self.assertEqual(wDataLength, rec.wDataLength, "Unexpected data length for record %s. Got %s, expected %s." % (data, rec.wDataLength, wDataLength))
333         self.assertEqual(rank, rec.rank, "Unexpected rank for record %s. Got %s, expected %s." % (data, rec.rank, rank))
334         self.assertEqual(flags, rec.flags, "Unexpected flags for record %s. Got %s, expected %s." % (data, rec.flags, flags))
335         self.assertEqual(dwTtlSeconds, rec.dwTtlSeconds, "Unexpected time to live for record %s. Got %s, expected %s." % (data, rec.dwTtlSeconds, dwTtlSeconds))
336         self.assertEqual(dwReserved, rec.dwReserved, "Unexpected dwReserved for record %s. Got %s, expected %s." % (data, rec.dwReserved, dwReserved))
337         self.assertEqual(data.lower(), rec.data.lower(), "Unexpected data for record %s. Got %s, expected %s." % (data, rec.data.lower(), data.lower()))
338         self.assertEqual(wType, rec.wType, "Unexpected wType for record %s. Got %s, expected %s." % (data, rec.wType, wType))
339         self.assertEqual(dwTimeStamp, rec.dwTimeStamp, "Unexpected timestamp for record %s. Got %s, expected %s." % (data, rec.dwTimeStamp, dwTimeStamp))
340
341     def test_record_params(self):
342         """
343         Make sure that, when we add records to the database,
344         they're added with reasonable parameters.
345         """
346         self.add_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
347         self.check_params(4, 240, 0, 900, 0, "192.168.50.50", 1)
348         self.delete_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
349         self.add_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
350         self.check_params(16, 240, 0, 900, 0, "AAAA:AAAA:0000:0000:0000:0000:0000:0000", 28)
351         self.delete_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
352         self.add_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
353         self.check_params(13, 240, 0, 900, 0, "cnamedest", 5)
354         self.delete_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
355
356     def test_reject_invalid_commands(self):
357         """
358         Make sure that we can't add a variety of invalid records,
359         and that we can't update valid records to invalid ones.
360         """
361         num_failures = 0
362         for record_type_str in self.bad_records:
363             for record_str in self.bad_records[record_type_str]:
364                 # Attempt to add the bad record, which should fail. Then, attempt to query for and delete
365                 # it. Since it shouldn't exist, these should fail too.
366                 try:
367                     self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
368                     self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
369                     self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
370                 except AssertionError as e:
371                     print(e)
372                     num_failures = num_failures + 1
373
374         # Also try to update valid records to invalid ones, making sure this fails
375         for record_type_str in self.bad_records:
376             for record_str in self.bad_records[record_type_str]:
377                 good_record_str = self.good_records[record_type_str][0]
378                 self.add_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
379                 try:
380                     self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
381                 except AssertionError as e:
382                     print(e)
383                     num_failures = num_failures + 1
384                 self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
385
386         self.assertTrue(num_failures == 0, "Failed to reject invalid commands. Total failures: %d." % num_failures)
387
388     def test_add_duplicate_different_type(self):
389         """
390         Attempt to add some values which have the same name as
391         existing ones, just a different type.
392         """
393         num_failures = 0
394         for record_type_str_1 in self.good_records:
395             record1 = self.good_records[record_type_str_1][0]
396             self.add_record(self.custom_zone, "testrecord", record_type_str_1, record1)
397             for record_type_str_2 in self.good_records:
398                 if record_type_str_1 == record_type_str_2:
399                     continue
400
401                 record2 = self.good_records[record_type_str_2][0]
402
403                 has_a = record_type_str_1 == 'A' or record_type_str_2 == 'A'
404                 has_aaaa = record_type_str_1 == 'AAAA' or record_type_str_2 == 'AAAA'
405                 has_cname = record_type_str_1 == 'CNAME' or record_type_str_2 == 'CNAME'
406                 has_ptr = record_type_str_1 == 'PTR' or record_type_str_2 == 'PTR'
407                 has_mx = record_type_str_1 == 'MX' or record_type_str_2 == 'MX'
408                 has_srv = record_type_str_1 == 'SRV' or record_type_str_2 == 'SRV'
409                 has_txt = record_type_str_1 == 'TXT' or record_type_str_2 == 'TXT'
410
411                 # If we attempt to add any record except A or AAAA when we already have an NS record,
412                 # the add should fail.
413                 add_error_ok = False
414                 if record_type_str_1 == 'NS' and not has_a and not has_aaaa:
415                     add_error_ok = True
416                 # If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail.
417                 if record_type_str_2 == 'CNAME' and (has_ptr or has_mx or has_a or has_aaaa):
418                     add_error_ok = True
419                 # If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail.
420                 # If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail.
421                 if has_cname and (has_a or has_aaaa or has_srv or has_txt):
422                     add_error_ok = True
423
424                 try:
425                     self.add_record(self.custom_zone, "testrecord", record_type_str_2, record2)
426                     if add_error_ok:
427                         num_failures = num_failures + 1
428                         print("Expected error when adding %s while a %s existed."
429                               % (record_type_str_2, record_type_str_1))
430                 except AssertionError as e:
431                     if not add_error_ok:
432                         num_failures = num_failures + 1
433                         print("Didn't expect error when adding %s while a %s existed."
434                               % (record_type_str_2, record_type_str_1))
435
436                 if not add_error_ok:
437                     # In the "normal" case, we expect the add to work and us to have one of each type of record afterwards.
438                     expected_num_type_1 = 1
439                     expected_num_type_2 = 1
440
441                     # If we have an MX record, a PTR record should replace it when added.
442                     # If we have a PTR record, an MX record should replace it when added.
443                     if has_ptr and has_mx:
444                         expected_num_type_1 = 0
445
446                     # If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added.
447                     if (has_cname or has_srv or has_txt) and (record_type_str_2 == 'PTR' or record_type_str_2 == 'MX'):
448                         expected_num_type_1 = 0
449
450                     if (record_type_str_1 == 'NS' and (has_a or has_aaaa)):
451                         expected_num_type_2 = 0
452
453                     try:
454                         self.assert_num_records(self.custom_zone, "testrecord", record_type_str_1, expected_num=expected_num_type_1)
455                     except AssertionError as e:
456                         num_failures = num_failures + 1
457                         print("Expected %s %s records after adding a %s record and a %s record already existed."
458                               % (expected_num_type_1, record_type_str_1, record_type_str_2, record_type_str_1))
459                     try:
460                         self.assert_num_records(self.custom_zone, "testrecord", record_type_str_2, expected_num=expected_num_type_2)
461                     except AssertionError as e:
462                         num_failures = num_failures + 1
463                         print("Expected %s %s records after adding a %s record and a %s record already existed."
464                               % (expected_num_type_2, record_type_str_2, record_type_str_2, record_type_str_1))
465
466                 try:
467                     self.delete_record(self.custom_zone, "testrecord", record_type_str_2, record2)
468                 except AssertionError as e:
469                     pass
470
471             self.delete_record(self.custom_zone, "testrecord", record_type_str_1, record1)
472
473         self.assertTrue(num_failures == 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures)
474
475     # Windows fails this test in the same way we do.
476     def _test_cname(self):
477         """
478         Test some special properties of CNAME records.
479         """
480
481         # RFC 1912: When there is a CNAME record, there must not be any other records with the same alias
482         cname_record = self.good_records["CNAME"][1]
483         self.add_record(self.custom_zone, "testrecord", "CNAME", cname_record)
484
485         for record_type_str in self.good_records:
486             other_record = self.good_records[record_type_str][0]
487             self.add_record(self.custom_zone, "testrecord", record_type_str, other_record, assertion=False)
488             self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
489
490         # RFC 2181: MX & NS records must not be allowed to point to a CNAME alias
491         mx_record = "testrecord 1"
492         ns_record = "testrecord"
493
494         self.add_record(self.custom_zone, "mxrec", "MX", mx_record, assertion=False)
495         self.add_record(self.custom_zone, "nsrec", "NS", ns_record, assertion=False)
496
497         self.delete_record(self.custom_zone, "testrecord", "CNAME", cname_record)
498
499     def test_add_duplicate_value(self):
500         """
501         Make sure that we can't add duplicate values of any type.
502         """
503         for record_type_str in self.good_records:
504             record = self.good_records[record_type_str][0]
505
506             self.add_record(self.custom_zone, "testrecord", record_type_str, record)
507             self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
508             self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
509             self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
510
511     def test_add_similar_value(self):
512         """
513         Attempt to add values with the same name and type in the same
514         zone. This should work, and should result in both values
515         existing (except with some types).
516         """
517         for record_type_str in self.good_records:
518             for i in range(1, len(self.good_records[record_type_str])):
519                 record1 = self.good_records[record_type_str][i - 1]
520                 record2 = self.good_records[record_type_str][i]
521
522                 if record_type_str == 'CNAME':
523                     continue
524                 # We expect CNAME records to override one another, as
525                 # an alias can only map to one CNAME record.
526                 # Also, on Windows, when the empty string is added and
527                 # another record is added afterwards, the empty string
528                 # will be silently overridden by the new one, so it
529                 # fails this test for the empty string.
530                 expected_num = 1 if record_type_str == 'CNAME' else 2
531
532                 self.add_record(self.custom_zone, "testrecord", record_type_str, record1)
533                 self.add_record(self.custom_zone, "testrecord", record_type_str, record2)
534                 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=expected_num)
535                 self.delete_record(self.custom_zone, "testrecord", record_type_str, record1)
536                 self.delete_record(self.custom_zone, "testrecord", record_type_str, record2)
537
538     def assert_record(self, zone, name, record_type_str, expected_record_str,
539                       assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
540         """
541         Asserts whether or not the given record with the given type exists in the
542         given zone.
543         """
544         try:
545             _, result = self.query_records(zone, name, record_type_str)
546         except RuntimeError as e:
547             if assertion:
548                 raise AssertionError("Record '%s' of type '%s' was not present when it should have been."
549                                      % (expected_record_str, record_type_str))
550             else:
551                 return
552
553         found = False
554         for record in result.rec[0].records:
555             if record.data == expected_record_str:
556                 found = True
557                 break
558
559         if found and not assertion:
560             raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str, record_type_str))
561         elif not found and assertion:
562             raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str))
563
564     def assert_num_records(self, zone, name, record_type_str, expected_num=1,
565                            client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
566         """
567         Asserts that there are a given amount of records with the given type in
568         the given zone.
569         """
570         try:
571             _, result = self.query_records(zone, name, record_type_str)
572             num_results = len(result.rec[0].records)
573             if not num_results == expected_num:
574                 raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected."
575                                      % (num_results, record_type_str, name, expected_num))
576         except RuntimeError:
577             if not expected_num == 0:
578                 raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected."
579                                      % (record_type_str, name, expected_num))
580
581     def query_records(self, zone, name, record_type_str, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
582         return self.conn.DnssrvEnumRecords2(client_version,
583                                             0,
584                                             self.server,
585                                             zone,
586                                             name,
587                                             None,
588                                             self.record_type_int(record_type_str),
589                                             dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver.DNS_RPC_VIEW_NO_CHILDREN,
590                                             None,
591                                             None)
592
593     def record_obj_from_str(self, record_type_str, record_str):
594         if record_type_str == 'A':
595             return ARecord(record_str)
596         elif record_type_str == 'AAAA':
597             return AAAARecord(record_str)
598         elif record_type_str == 'PTR':
599             return PTRRecord(record_str)
600         elif record_type_str == 'CNAME':
601             return CNameRecord(record_str)
602         elif record_type_str == 'NS':
603             return NSRecord(record_str)
604         elif record_type_str == 'MX':
605             split = record_str.split(' ')
606             return MXRecord(split[0], int(split[1]))
607         elif record_type_str == 'SRV':
608             split = record_str.split(' ')
609             target = split[0]
610             port = int(split[1])
611             priority = int(split[2])
612             weight = int(split[3])
613             return SRVRecord(target, port, priority, weight)
614         elif record_type_str == 'TXT':
615             return TXTRecord(record_str)
616
617     def record_type_int(self, record_type_str):
618         if record_type_str == 'A':
619             return dnsp.DNS_TYPE_A
620         elif record_type_str == 'AAAA':
621             return dnsp.DNS_TYPE_AAAA
622         elif record_type_str == 'PTR':
623             return dnsp.DNS_TYPE_PTR
624         elif record_type_str == 'CNAME':
625             return dnsp.DNS_TYPE_CNAME
626         elif record_type_str == 'NS':
627             return dnsp.DNS_TYPE_NS
628         elif record_type_str == 'MX':
629             return dnsp.DNS_TYPE_MX
630         elif record_type_str == 'SRV':
631             return dnsp.DNS_TYPE_SRV
632         elif record_type_str == 'TXT':
633             return dnsp.DNS_TYPE_TXT
634
635     def add_record(self, zone, name, record_type_str, record_str,
636                    assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
637         """
638         Attempts to add a map from the given name to a record of the given type,
639         in the given zone.
640         Also asserts whether or not the add was successful.
641         This can also update existing records if they have the same name.
642         """
643         record = self.record_obj_from_str(record_type_str, record_str)
644         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
645         add_rec_buf.rec = record
646
647         try:
648             self.conn.DnssrvUpdateRecord2(client_version,
649                                           0,
650                                           self.server,
651                                           zone,
652                                           name,
653                                           add_rec_buf,
654                                           None)
655             if not assertion:
656                 raise AssertionError("Successfully added record '%s' of type '%s', which should have failed."
657                                      % (record_str, record_type_str))
658         except RuntimeError as e:
659             if assertion:
660                 raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'."
661                                      % (record_str, record_type_str, str(e)))
662
663     def delete_record(self, zone, name, record_type_str, record_str,
664                       assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
665         """
666         Attempts to delete a record with the given name, record and record type
667         from the given zone.
668         Also asserts whether or not the deletion was successful.
669         """
670         record = self.record_obj_from_str(record_type_str, record_str)
671         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
672         del_rec_buf.rec = record
673
674         try:
675             self.conn.DnssrvUpdateRecord2(client_version,
676                                           0,
677                                           self.server,
678                                           zone,
679                                           name,
680                                           None,
681                                           del_rec_buf)
682             if not assertion:
683                 raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str, record_type_str))
684         except RuntimeError as e:
685             if assertion:
686                 raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e)))
687
688     def test_query2(self):
689         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
690                                                 0,
691                                                 self.server,
692                                                 None,
693                                                 'ServerInfo')
694         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid)
695
696         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET,
697                                                 0,
698                                                 self.server,
699                                                 None,
700                                                 'ServerInfo')
701         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid)
702
703         typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
704                                                 0,
705                                                 self.server,
706                                                 None,
707                                                 'ServerInfo')
708         self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid)
709
710     def test_operation2(self):
711         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
712         rev_zone = '1.168.192.in-addr.arpa'
713
714         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
715         zone_create.pszZoneName = rev_zone
716         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
717         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
718         zone_create.fAging = 0
719         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
720
721         # Create zone
722         self.conn.DnssrvOperation2(client_version,
723                                    0,
724                                    self.server,
725                                    None,
726                                    0,
727                                    'ZoneCreate',
728                                    dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
729                                    zone_create)
730
731         request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
732                           dnsserver.DNS_ZONE_REQUEST_PRIMARY)
733         _, zones = self.conn.DnssrvComplexOperation2(client_version,
734                                                      0,
735                                                      self.server,
736                                                      None,
737                                                      'EnumZones',
738                                                      dnsserver.DNSSRV_TYPEID_DWORD,
739                                                      request_filter)
740         self.assertEquals(1, zones.dwZoneCount)
741
742         # Delete zone
743         self.conn.DnssrvOperation2(client_version,
744                                    0,
745                                    self.server,
746                                    rev_zone,
747                                    0,
748                                    'DeleteZoneFromDs',
749                                    dnsserver.DNSSRV_TYPEID_NULL,
750                                    None)
751
752         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
753                                                           0,
754                                                           self.server,
755                                                           None,
756                                                           'EnumZones',
757                                                           dnsserver.DNSSRV_TYPEID_DWORD,
758                                                           request_filter)
759         self.assertEquals(0, zones.dwZoneCount)
760
761     def test_complexoperation2(self):
762         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
763         request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
764                           dnsserver.DNS_ZONE_REQUEST_PRIMARY)
765
766         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
767                                                           0,
768                                                           self.server,
769                                                           None,
770                                                           'EnumZones',
771                                                           dnsserver.DNSSRV_TYPEID_DWORD,
772                                                           request_filter)
773         self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
774         self.assertEquals(3, zones.dwZoneCount)
775
776         request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
777                           dnsserver.DNS_ZONE_REQUEST_PRIMARY)
778         typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
779                                                           0,
780                                                           self.server,
781                                                           None,
782                                                           'EnumZones',
783                                                           dnsserver.DNSSRV_TYPEID_DWORD,
784                                                           request_filter)
785         self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
786         self.assertEquals(0, zones.dwZoneCount)
787
788     def test_enumrecords2(self):
789         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
790         record_type = dnsp.DNS_TYPE_NS
791         select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
792                         dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
793         _, roothints = self.conn.DnssrvEnumRecords2(client_version,
794                                                     0,
795                                                     self.server,
796                                                     '..RootHints',
797                                                     '.',
798                                                     None,
799                                                     record_type,
800                                                     select_flags,
801                                                     None,
802                                                     None)
803         self.assertEquals(14, roothints.count)  # 1 NS + 13 A records (a-m)
804
805     def test_updaterecords2(self):
806         client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
807         record_type = dnsp.DNS_TYPE_A
808         select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
809
810         name = 'dummy'
811         rec = ARecord('1.2.3.4')
812         rec2 = ARecord('5.6.7.8')
813
814         # Add record
815         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
816         add_rec_buf.rec = rec
817         self.conn.DnssrvUpdateRecord2(client_version,
818                                       0,
819                                       self.server,
820                                       self.zone,
821                                       name,
822                                       add_rec_buf,
823                                       None)
824
825         _, result = self.conn.DnssrvEnumRecords2(client_version,
826                                                  0,
827                                                  self.server,
828                                                  self.zone,
829                                                  name,
830                                                  None,
831                                                  record_type,
832                                                  select_flags,
833                                                  None,
834                                                  None)
835         self.assertEquals(1, result.count)
836         self.assertEquals(1, result.rec[0].wRecordCount)
837         self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
838         self.assertEquals('1.2.3.4', result.rec[0].records[0].data)
839
840         # Update record
841         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
842         add_rec_buf.rec = rec2
843         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
844         del_rec_buf.rec = rec
845         self.conn.DnssrvUpdateRecord2(client_version,
846                                       0,
847                                       self.server,
848                                       self.zone,
849                                       name,
850                                       add_rec_buf,
851                                       del_rec_buf)
852
853         buflen, result = self.conn.DnssrvEnumRecords2(client_version,
854                                                       0,
855                                                       self.server,
856                                                       self.zone,
857                                                       name,
858                                                       None,
859                                                       record_type,
860                                                       select_flags,
861                                                       None,
862                                                       None)
863         self.assertEquals(1, result.count)
864         self.assertEquals(1, result.rec[0].wRecordCount)
865         self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
866         self.assertEquals('5.6.7.8', result.rec[0].records[0].data)
867
868         # Delete record
869         del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
870         del_rec_buf.rec = rec2
871         self.conn.DnssrvUpdateRecord2(client_version,
872                                       0,
873                                       self.server,
874                                       self.zone,
875                                       name,
876                                       None,
877                                       del_rec_buf)
878
879         self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2,
880                           client_version,
881                           0,
882                           self.server,
883                           self.zone,
884                           name,
885                           None,
886                           record_type,
887                           select_flags,
888                           None,
889                           None)
890
891     # The following tests do not pass against Samba because the owner and
892     # group are not consistent with Windows, as well as some ACEs.
893     #
894     # The following ACE are also required for 2012R2:
895     #
896     # (OA;CIIO;WP;ea1b7b93-5e48-46d5-bc6c-4df4fda78a35;bf967a86-0de6-11d0-a285-00aa003049e2;PS)
897     # (OA;OICI;RPWP;3f78c3e5-f79a-46bd-a0b8-9d18116ddc79;;PS)"
898     #
899     # [TPM + Allowed-To-Act-On-Behalf-Of-Other-Identity]
900     def test_security_descriptor_msdcs_zone(self):
901         """
902         Make sure that security descriptors of the msdcs zone is
903         as expected.
904         """
905
906         zones = self.samdb.search(base="DC=ForestDnsZones,%s" % self.samdb.get_default_basedn(),
907                                   scope=ldb.SCOPE_SUBTREE,
908                                   expression="(&(objectClass=dnsZone)(name=_msdcs*))",
909                                   attrs=["nTSecurityDescriptor", "objectClass"])
910         self.assertEqual(len(zones), 1)
911         self.assertTrue("nTSecurityDescriptor" in zones[0])
912         tmp = zones[0]["nTSecurityDescriptor"][0]
913         utils = sd_utils.SDUtils(self.samdb)
914         sd = ndr_unpack(security.descriptor, tmp)
915
916         domain_sid = security.dom_sid(self.samdb.get_domain_sid())
917
918         res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
919                                 expression="(sAMAccountName=DnsAdmins)",
920                                 attrs=["objectSid"])
921
922         dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
923
924         packed_sd = descriptor.sddl2binary("O:SYG:BA" \
925                                            "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" \
926                                            "(A;;CC;;;AU)" \
927                                            "(A;;RPLCLORC;;;WD)" \
928                                            "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
929                                            "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
930                                            domain_sid, {"DnsAdmins": dns_admin})
931         expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
932
933         diff = descriptor.get_diff_sds(expected_sd, sd, domain_sid)
934         self.assertEqual(diff, '', "SD of msdcs zone different to expected.\n"
935                          "Difference was:\n%s\nExpected: %s\nGot: %s" %
936                          (diff, expected_sd.as_sddl(utils.domain_sid),
937                           sd.as_sddl(utils.domain_sid)))
938
939     def test_security_descriptor_forest_zone(self):
940         """
941         Make sure that security descriptors of forest dns zones are
942         as expected.
943         """
944         forest_zone = "test_forest_zone"
945         zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
946         zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
947         zone_create_info.fAging = 0
948         zone_create_info.fDsIntegrated = 1
949         zone_create_info.fLoadExisting = 1
950
951         zone_create_info.pszZoneName = forest_zone
952         zone_create_info.dwDpFlags = dnsserver.DNS_DP_FOREST_DEFAULT
953
954         self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
955                                    0,
956                                    self.server,
957                                    None,
958                                    0,
959                                    'ZoneCreate',
960                                    dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
961                                    zone_create_info)
962
963         partition_dn = self.samdb.get_default_basedn()
964         partition_dn.add_child("DC=ForestDnsZones")
965         zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
966                                   expression="(name=%s)" % forest_zone,
967                                   attrs=["nTSecurityDescriptor"])
968         self.assertEqual(len(zones), 1)
969         current_dn = zones[0].dn
970         self.assertTrue("nTSecurityDescriptor" in zones[0])
971         tmp = zones[0]["nTSecurityDescriptor"][0]
972         utils = sd_utils.SDUtils(self.samdb)
973         sd = ndr_unpack(security.descriptor, tmp)
974
975         domain_sid = security.dom_sid(self.samdb.get_domain_sid())
976
977         res = self.samdb.search(base=self.samdb.get_default_basedn(),
978                                 scope=ldb.SCOPE_SUBTREE,
979                                 expression="(sAMAccountName=DnsAdmins)",
980                                 attrs=["objectSid"])
981
982         dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
983
984         packed_sd = descriptor.sddl2binary("O:DAG:DA" \
985                                            "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" \
986                                            "(A;;CC;;;AU)" \
987                                            "(A;;RPLCLORC;;;WD)" \
988                                            "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
989                                            "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
990                                            domain_sid, {"DnsAdmins": dns_admin})
991         expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
992
993         packed_msdns = descriptor.get_dns_forest_microsoft_dns_descriptor(domain_sid,
994                                                                           {"DnsAdmins": dns_admin})
995         expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
996
997         packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
998         expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
999                                                               packed_part_sd))
1000         try:
1001             msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
1002             security_desc_dict = [(current_dn.get_linearized(), expected_sd),
1003                                   (msdns_dn.get_linearized(), expected_msdns_sd),
1004                                   (partition_dn.get_linearized(), expected_part_sd)]
1005
1006             for (key, sec_desc) in security_desc_dict:
1007                 zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
1008                                           attrs=["nTSecurityDescriptor"])
1009                 self.assertTrue("nTSecurityDescriptor" in zones[0])
1010                 tmp = zones[0]["nTSecurityDescriptor"][0]
1011                 utils = sd_utils.SDUtils(self.samdb)
1012
1013                 sd = ndr_unpack(security.descriptor, tmp)
1014                 diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
1015
1016                 self.assertEqual(diff, '', "Security descriptor of forest DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1017                                  % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))
1018
1019         finally:
1020             self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1021                                        0,
1022                                        self.server,
1023                                        forest_zone,
1024                                        0,
1025                                        'DeleteZoneFromDs',
1026                                        dnsserver.DNSSRV_TYPEID_NULL,
1027                                        None)
1028
1029     def test_security_descriptor_domain_zone(self):
1030         """
1031         Make sure that security descriptors of domain dns zones are
1032         as expected.
1033         """
1034
1035         partition_dn = self.samdb.get_default_basedn()
1036         partition_dn.add_child("DC=DomainDnsZones")
1037         zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
1038                                   expression="(name=%s)" % self.custom_zone,
1039                                   attrs=["nTSecurityDescriptor"])
1040         self.assertEqual(len(zones), 1)
1041         current_dn = zones[0].dn
1042         self.assertTrue("nTSecurityDescriptor" in zones[0])
1043         tmp = zones[0]["nTSecurityDescriptor"][0]
1044         utils = sd_utils.SDUtils(self.samdb)
1045         sd = ndr_unpack(security.descriptor, tmp)
1046         sddl = sd.as_sddl(utils.domain_sid)
1047
1048         domain_sid = security.dom_sid(self.samdb.get_domain_sid())
1049
1050         res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
1051                                 expression="(sAMAccountName=DnsAdmins)",
1052                                 attrs=["objectSid"])
1053
1054         dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
1055
1056         packed_sd = descriptor.sddl2binary("O:DAG:DA" \
1057                                            "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" \
1058                                            "(A;;CC;;;AU)" \
1059                                            "(A;;RPLCLORC;;;WD)" \
1060                                            "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
1061                                            "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1062                                            domain_sid, {"DnsAdmins": dns_admin})
1063         expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
1064
1065         packed_msdns = descriptor.get_dns_domain_microsoft_dns_descriptor(domain_sid,
1066                                                                           {"DnsAdmins": dns_admin})
1067         expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
1068
1069         packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
1070         expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
1071                                                               packed_part_sd))
1072
1073         msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
1074         security_desc_dict = [(current_dn.get_linearized(), expected_sd),
1075                               (msdns_dn.get_linearized(), expected_msdns_sd),
1076                               (partition_dn.get_linearized(), expected_part_sd)]
1077
1078         for (key, sec_desc) in security_desc_dict:
1079             zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
1080                                       attrs=["nTSecurityDescriptor"])
1081             self.assertTrue("nTSecurityDescriptor" in zones[0])
1082             tmp = zones[0]["nTSecurityDescriptor"][0]
1083             utils = sd_utils.SDUtils(self.samdb)
1084
1085             sd = ndr_unpack(security.descriptor, tmp)
1086             diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
1087
1088             self.assertEqual(diff, '', "Security descriptor of domain DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1089                              % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))