1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 """Tests for samba.dcerpc.dnsserver"""
24 from samba.auth import system_session
25 from samba.samdb import SamDB
26 from samba.ndr import ndr_unpack, ndr_pack
27 from samba.dcerpc import dnsp, dnsserver, security
28 from samba.tests import RpcInterfaceTestCase, env_get_var_value
29 from samba.netcmd.dns import ARecord, AAAARecord, PTRRecord, CNameRecord, NSRecord, MXRecord, SRVRecord, TXTRecord
30 from samba import sd_utils, descriptor
32 class DnsserverTests(RpcInterfaceTestCase):
36 good_dns = ["SAMDOM.EXAMPLE.COM",
38 "%sEXAMPLE.COM" % ("1." * 100),
49 "SAMDOM..EXAMPLE.COM"]
51 good_mx = ["SAMDOM.EXAMPLE.COM 65535"]
54 good_srv = ["SAMDOM.EXAMPLE.COM 65535 65535 65535"]
57 for bad_dn in bad_dns:
58 bad_mx.append("%s 1" % bad_dn)
59 bad_srv.append("%s 0 0 0" % bad_dn)
60 for good_dn in good_dns:
61 good_mx.append("%s 1" % good_dn)
62 good_srv.append("%s 0 0 0" % good_dn)
67 "AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
68 "0000:0000:0000:0000:0000:0000:0000:0000",
69 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
71 "1234:1234:1234:1234:1234::",
72 "1234:5678:9ABC:DEF0::",
74 "1234::5678:9ABC:0000:0000:0000:0000",
83 "TXT": ["text", "", "@#!", "\n"]
87 "A": ["192.168.0.500",
88 "255.255.255.255/32"],
89 "AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
90 "0000:0000:0000:0000:0000:0000:0000:0000/1",
91 "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
92 "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
93 "1234:5678:9ABC:DEF0:1234:5678:9ABC",
102 # Because we use uint16_t for these numbers, we can't
103 # actually create these records.
104 invalid_mx = ["SAMDOM.EXAMPLE.COM -1",
105 "SAMDOM.EXAMPLE.COM 65536",
107 invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
108 "SAMDOM.EXAMPLE.COM 0 0 65536",
109 "SAMDOM.EXAMPLE.COM 65536 0 0"]
110 cls.invalid_records = {
116 super(DnsserverTests, self).setUp()
117 self.server = os.environ["DC_SERVER"]
118 self.zone = env_get_var_value("REALM").lower()
119 self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
121 self.get_credentials())
123 self.samdb = SamDB(url="ldap://%s" % os.environ["DC_SERVER_IP"],
124 lp=self.get_loadparm(),
125 session_info=system_session(),
126 credentials=self.get_credentials())
129 self.custom_zone = "zone"
130 zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
131 zone_create_info.pszZoneName = self.custom_zone
132 zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
133 zone_create_info.fAging = 0
134 zone_create_info.fDsIntegrated = 1
135 zone_create_info.fLoadExisting = 1
136 zone_create_info.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
138 self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
144 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
148 self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
154 dnsserver.DNSSRV_TYPEID_NULL,
156 super(DnsserverTests, self).tearDown()
158 # This test fails against Samba (but passes against Windows),
159 # because Samba does not return the record when we enum records.
160 # Records can be given DNS_RANK_NONE when the zone they are in
161 # does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be
162 # deleted, however, we do not consider this urgent to fix and
163 # so this test is a knownfail.
164 def test_rank_none(self):
166 See what happens when we set a record's rank to
170 record_str = "192.168.50.50"
171 record_type_str = "A"
172 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
174 dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
175 record.rank = 0 # DNS_RANK_NONE
176 res = self.samdb.dns_replace_by_dn(dn, [record])
178 self.fail("Unable to update dns record to have DNS_RANK_NONE.")
180 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
181 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
182 self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
183 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
185 def test_dns_tombstoned(self):
187 See what happens when we set a record to be tombstoned.
190 record_str = "192.168.50.50"
191 record_type_str = "A"
192 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
194 dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
195 record.wType = dnsp.DNS_TYPE_TOMBSTONE
196 res = self.samdb.dns_replace_by_dn(dn, [record])
198 self.fail("Unable to update dns record to be tombstoned.")
200 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
201 self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
202 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
204 def get_record_from_db(self, zone_name, record_name):
206 Returns (dn of record, record)
209 zones = self.samdb.search(base="DC=DomainDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
210 expression="(objectClass=dnsZone)",
215 if "DC=%s," % zone_name in str(zone.dn):
220 raise AssertionError("Couldn't find zone '%s'." % zone_name)
222 records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
223 expression="(objectClass=dnsNode)",
226 for old_packed_record in records:
227 if record_name in str(old_packed_record.dn):
228 rec = ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0])
229 return (old_packed_record.dn, rec)
231 def test_duplicate_matching(self):
233 Make sure that records which should be distinct from each other or duplicate
234 to each other behave as expected.
237 distinct_dns = [("SAMDOM.EXAMPLE.COM",
239 "EXAMPLE.COM", "SAMDOM.EXAMPLE")]
240 duplicate_dns = [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"),
241 ("EXAMPLE.", "EXAMPLE")]
243 # Every tuple has entries which should be considered duplicate to one another.
245 "AAAA": [("AAAA::", "aaaa::"),
246 ("AAAA::", "AAAA:0000::"),
247 ("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"),
248 ("AAAA::", "AAAA:0:0:0:0:0:0:0"),
250 ("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")],
253 # Every tuple has entries which should be considered distinct from one another.
255 "A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")],
256 "AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"),
257 ("1000::", "::1000"),
258 ("::1", "::11", "::1111"),
259 ("1234::", "0234::")],
260 "SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1",
261 "SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")],
262 "MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")],
263 "TXT": [("A RECORD", "B RECORD", "a record")]
266 for record_type_str in ("PTR", "CNAME", "NS"):
267 distinct[record_type_str] = distinct_dns
268 duplicates[record_type_str] = duplicate_dns
270 for record_type_str in duplicates:
271 for duplicate_tuple in duplicates[record_type_str]:
272 # Attempt to add duplicates and make sure that all after the first fails
273 self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
274 for record in duplicate_tuple:
275 self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
276 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
277 self.delete_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
279 # Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds
280 for record in duplicate_tuple:
281 self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
282 self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
284 for record_type_str in distinct:
285 for distinct_tuple in distinct[record_type_str]:
286 # Attempt to add distinct and make sure that they all succeed within a tuple
288 for record in distinct_tuple:
291 self.add_record(self.custom_zone, "testrecord", record_type_str, record)
292 # All records should have been added.
293 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=i)
294 except AssertionError as e:
295 raise AssertionError("Failed to add %s, which should be distinct from all others in the set. "
296 "Original error: %s\nDistinct set: %s." % (record, e, distinct_tuple))
297 for record in distinct_tuple:
298 self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
299 # CNAMEs should not have been added, since they conflict.
300 if record_type_str == 'CNAME':
303 # Add the first distinct and attempt to remove all of the others, making sure this fails
304 # Windows fails this test. This is probably due to weird tombstoning behavior.
305 self.add_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
306 for record in distinct_tuple:
307 if record == distinct_tuple[0]:
310 self.delete_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
311 except AssertionError as e:
312 raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s"
313 % (distinct_tuple[0], record, e))
314 self.delete_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
316 def test_accept_valid_commands(self):
318 Make sure that we can add, update and delete a variety
321 for record_type_str in self.good_records:
322 for record_str in self.good_records[record_type_str]:
323 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
324 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
325 self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
327 def check_params(self, wDataLength, rank, flags, dwTtlSeconds, dwReserved, data,
328 wType, dwTimeStamp=0, zone="zone", rec_name="testrecord"):
329 res = self.get_record_from_db(zone, rec_name)
330 self.assertIsNotNone(res, "Expected record %s but was not found over LDAP." % data)
332 self.assertEqual(wDataLength, rec.wDataLength, "Unexpected data length for record %s. Got %s, expected %s." % (data, rec.wDataLength, wDataLength))
333 self.assertEqual(rank, rec.rank, "Unexpected rank for record %s. Got %s, expected %s." % (data, rec.rank, rank))
334 self.assertEqual(flags, rec.flags, "Unexpected flags for record %s. Got %s, expected %s." % (data, rec.flags, flags))
335 self.assertEqual(dwTtlSeconds, rec.dwTtlSeconds, "Unexpected time to live for record %s. Got %s, expected %s." % (data, rec.dwTtlSeconds, dwTtlSeconds))
336 self.assertEqual(dwReserved, rec.dwReserved, "Unexpected dwReserved for record %s. Got %s, expected %s." % (data, rec.dwReserved, dwReserved))
337 self.assertEqual(data.lower(), rec.data.lower(), "Unexpected data for record %s. Got %s, expected %s." % (data, rec.data.lower(), data.lower()))
338 self.assertEqual(wType, rec.wType, "Unexpected wType for record %s. Got %s, expected %s." % (data, rec.wType, wType))
339 self.assertEqual(dwTimeStamp, rec.dwTimeStamp, "Unexpected timestamp for record %s. Got %s, expected %s." % (data, rec.dwTimeStamp, dwTimeStamp))
341 def test_record_params(self):
343 Make sure that, when we add records to the database,
344 they're added with reasonable parameters.
346 self.add_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
347 self.check_params(4, 240, 0, 900, 0, "192.168.50.50", 1)
348 self.delete_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
349 self.add_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
350 self.check_params(16, 240, 0, 900, 0, "AAAA:AAAA:0000:0000:0000:0000:0000:0000", 28)
351 self.delete_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
352 self.add_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
353 self.check_params(13, 240, 0, 900, 0, "cnamedest", 5)
354 self.delete_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
356 def test_reject_invalid_commands(self):
358 Make sure that we can't add a variety of invalid records,
359 and that we can't update valid records to invalid ones.
362 for record_type_str in self.bad_records:
363 for record_str in self.bad_records[record_type_str]:
364 # Attempt to add the bad record, which should fail. Then, attempt to query for and delete
365 # it. Since it shouldn't exist, these should fail too.
367 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
368 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
369 self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
370 except AssertionError as e:
372 num_failures = num_failures + 1
374 # Also try to update valid records to invalid ones, making sure this fails
375 for record_type_str in self.bad_records:
376 for record_str in self.bad_records[record_type_str]:
377 good_record_str = self.good_records[record_type_str][0]
378 self.add_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
380 self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
381 except AssertionError as e:
383 num_failures = num_failures + 1
384 self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
386 self.assertTrue(num_failures == 0, "Failed to reject invalid commands. Total failures: %d." % num_failures)
388 def test_add_duplicate_different_type(self):
390 Attempt to add some values which have the same name as
391 existing ones, just a different type.
394 for record_type_str_1 in self.good_records:
395 record1 = self.good_records[record_type_str_1][0]
396 self.add_record(self.custom_zone, "testrecord", record_type_str_1, record1)
397 for record_type_str_2 in self.good_records:
398 if record_type_str_1 == record_type_str_2:
401 record2 = self.good_records[record_type_str_2][0]
403 has_a = record_type_str_1 == 'A' or record_type_str_2 == 'A'
404 has_aaaa = record_type_str_1 == 'AAAA' or record_type_str_2 == 'AAAA'
405 has_cname = record_type_str_1 == 'CNAME' or record_type_str_2 == 'CNAME'
406 has_ptr = record_type_str_1 == 'PTR' or record_type_str_2 == 'PTR'
407 has_mx = record_type_str_1 == 'MX' or record_type_str_2 == 'MX'
408 has_srv = record_type_str_1 == 'SRV' or record_type_str_2 == 'SRV'
409 has_txt = record_type_str_1 == 'TXT' or record_type_str_2 == 'TXT'
411 # If we attempt to add any record except A or AAAA when we already have an NS record,
412 # the add should fail.
414 if record_type_str_1 == 'NS' and not has_a and not has_aaaa:
416 # If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail.
417 if record_type_str_2 == 'CNAME' and (has_ptr or has_mx or has_a or has_aaaa):
419 # If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail.
420 # If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail.
421 if has_cname and (has_a or has_aaaa or has_srv or has_txt):
425 self.add_record(self.custom_zone, "testrecord", record_type_str_2, record2)
427 num_failures = num_failures + 1
428 print("Expected error when adding %s while a %s existed."
429 % (record_type_str_2, record_type_str_1))
430 except AssertionError as e:
432 num_failures = num_failures + 1
433 print("Didn't expect error when adding %s while a %s existed."
434 % (record_type_str_2, record_type_str_1))
437 # In the "normal" case, we expect the add to work and us to have one of each type of record afterwards.
438 expected_num_type_1 = 1
439 expected_num_type_2 = 1
441 # If we have an MX record, a PTR record should replace it when added.
442 # If we have a PTR record, an MX record should replace it when added.
443 if has_ptr and has_mx:
444 expected_num_type_1 = 0
446 # If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added.
447 if (has_cname or has_srv or has_txt) and (record_type_str_2 == 'PTR' or record_type_str_2 == 'MX'):
448 expected_num_type_1 = 0
450 if (record_type_str_1 == 'NS' and (has_a or has_aaaa)):
451 expected_num_type_2 = 0
454 self.assert_num_records(self.custom_zone, "testrecord", record_type_str_1, expected_num=expected_num_type_1)
455 except AssertionError as e:
456 num_failures = num_failures + 1
457 print("Expected %s %s records after adding a %s record and a %s record already existed."
458 % (expected_num_type_1, record_type_str_1, record_type_str_2, record_type_str_1))
460 self.assert_num_records(self.custom_zone, "testrecord", record_type_str_2, expected_num=expected_num_type_2)
461 except AssertionError as e:
462 num_failures = num_failures + 1
463 print("Expected %s %s records after adding a %s record and a %s record already existed."
464 % (expected_num_type_2, record_type_str_2, record_type_str_2, record_type_str_1))
467 self.delete_record(self.custom_zone, "testrecord", record_type_str_2, record2)
468 except AssertionError as e:
471 self.delete_record(self.custom_zone, "testrecord", record_type_str_1, record1)
473 self.assertTrue(num_failures == 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures)
475 # Windows fails this test in the same way we do.
476 def _test_cname(self):
478 Test some special properties of CNAME records.
481 # RFC 1912: When there is a CNAME record, there must not be any other records with the same alias
482 cname_record = self.good_records["CNAME"][1]
483 self.add_record(self.custom_zone, "testrecord", "CNAME", cname_record)
485 for record_type_str in self.good_records:
486 other_record = self.good_records[record_type_str][0]
487 self.add_record(self.custom_zone, "testrecord", record_type_str, other_record, assertion=False)
488 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
490 # RFC 2181: MX & NS records must not be allowed to point to a CNAME alias
491 mx_record = "testrecord 1"
492 ns_record = "testrecord"
494 self.add_record(self.custom_zone, "mxrec", "MX", mx_record, assertion=False)
495 self.add_record(self.custom_zone, "nsrec", "NS", ns_record, assertion=False)
497 self.delete_record(self.custom_zone, "testrecord", "CNAME", cname_record)
499 def test_add_duplicate_value(self):
501 Make sure that we can't add duplicate values of any type.
503 for record_type_str in self.good_records:
504 record = self.good_records[record_type_str][0]
506 self.add_record(self.custom_zone, "testrecord", record_type_str, record)
507 self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
508 self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
509 self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
511 def test_add_similar_value(self):
513 Attempt to add values with the same name and type in the same
514 zone. This should work, and should result in both values
515 existing (except with some types).
517 for record_type_str in self.good_records:
518 for i in range(1, len(self.good_records[record_type_str])):
519 record1 = self.good_records[record_type_str][i - 1]
520 record2 = self.good_records[record_type_str][i]
522 if record_type_str == 'CNAME':
524 # We expect CNAME records to override one another, as
525 # an alias can only map to one CNAME record.
526 # Also, on Windows, when the empty string is added and
527 # another record is added afterwards, the empty string
528 # will be silently overridden by the new one, so it
529 # fails this test for the empty string.
530 expected_num = 1 if record_type_str == 'CNAME' else 2
532 self.add_record(self.custom_zone, "testrecord", record_type_str, record1)
533 self.add_record(self.custom_zone, "testrecord", record_type_str, record2)
534 self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=expected_num)
535 self.delete_record(self.custom_zone, "testrecord", record_type_str, record1)
536 self.delete_record(self.custom_zone, "testrecord", record_type_str, record2)
538 def assert_record(self, zone, name, record_type_str, expected_record_str,
539 assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
541 Asserts whether or not the given record with the given type exists in the
545 _, result = self.query_records(zone, name, record_type_str)
546 except RuntimeError as e:
548 raise AssertionError("Record '%s' of type '%s' was not present when it should have been."
549 % (expected_record_str, record_type_str))
554 for record in result.rec[0].records:
555 if record.data == expected_record_str:
559 if found and not assertion:
560 raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str, record_type_str))
561 elif not found and assertion:
562 raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str))
564 def assert_num_records(self, zone, name, record_type_str, expected_num=1,
565 client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
567 Asserts that there are a given amount of records with the given type in
571 _, result = self.query_records(zone, name, record_type_str)
572 num_results = len(result.rec[0].records)
573 if not num_results == expected_num:
574 raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected."
575 % (num_results, record_type_str, name, expected_num))
577 if not expected_num == 0:
578 raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected."
579 % (record_type_str, name, expected_num))
581 def query_records(self, zone, name, record_type_str, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
582 return self.conn.DnssrvEnumRecords2(client_version,
588 self.record_type_int(record_type_str),
589 dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver.DNS_RPC_VIEW_NO_CHILDREN,
593 def record_obj_from_str(self, record_type_str, record_str):
594 if record_type_str == 'A':
595 return ARecord(record_str)
596 elif record_type_str == 'AAAA':
597 return AAAARecord(record_str)
598 elif record_type_str == 'PTR':
599 return PTRRecord(record_str)
600 elif record_type_str == 'CNAME':
601 return CNameRecord(record_str)
602 elif record_type_str == 'NS':
603 return NSRecord(record_str)
604 elif record_type_str == 'MX':
605 split = record_str.split(' ')
606 return MXRecord(split[0], int(split[1]))
607 elif record_type_str == 'SRV':
608 split = record_str.split(' ')
611 priority = int(split[2])
612 weight = int(split[3])
613 return SRVRecord(target, port, priority, weight)
614 elif record_type_str == 'TXT':
615 return TXTRecord(record_str)
617 def record_type_int(self, record_type_str):
618 if record_type_str == 'A':
619 return dnsp.DNS_TYPE_A
620 elif record_type_str == 'AAAA':
621 return dnsp.DNS_TYPE_AAAA
622 elif record_type_str == 'PTR':
623 return dnsp.DNS_TYPE_PTR
624 elif record_type_str == 'CNAME':
625 return dnsp.DNS_TYPE_CNAME
626 elif record_type_str == 'NS':
627 return dnsp.DNS_TYPE_NS
628 elif record_type_str == 'MX':
629 return dnsp.DNS_TYPE_MX
630 elif record_type_str == 'SRV':
631 return dnsp.DNS_TYPE_SRV
632 elif record_type_str == 'TXT':
633 return dnsp.DNS_TYPE_TXT
635 def add_record(self, zone, name, record_type_str, record_str,
636 assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
638 Attempts to add a map from the given name to a record of the given type,
640 Also asserts whether or not the add was successful.
641 This can also update existing records if they have the same name.
643 record = self.record_obj_from_str(record_type_str, record_str)
644 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
645 add_rec_buf.rec = record
648 self.conn.DnssrvUpdateRecord2(client_version,
656 raise AssertionError("Successfully added record '%s' of type '%s', which should have failed."
657 % (record_str, record_type_str))
658 except RuntimeError as e:
660 raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'."
661 % (record_str, record_type_str, str(e)))
663 def delete_record(self, zone, name, record_type_str, record_str,
664 assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
666 Attempts to delete a record with the given name, record and record type
668 Also asserts whether or not the deletion was successful.
670 record = self.record_obj_from_str(record_type_str, record_str)
671 del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
672 del_rec_buf.rec = record
675 self.conn.DnssrvUpdateRecord2(client_version,
683 raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str, record_type_str))
684 except RuntimeError as e:
686 raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e)))
688 def test_query2(self):
689 typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
694 self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid)
696 typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET,
701 self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid)
703 typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
708 self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid)
710 def test_operation2(self):
711 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
712 rev_zone = '1.168.192.in-addr.arpa'
714 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
715 zone_create.pszZoneName = rev_zone
716 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
717 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
718 zone_create.fAging = 0
719 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
722 self.conn.DnssrvOperation2(client_version,
728 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
731 request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
732 dnsserver.DNS_ZONE_REQUEST_PRIMARY)
733 _, zones = self.conn.DnssrvComplexOperation2(client_version,
738 dnsserver.DNSSRV_TYPEID_DWORD,
740 self.assertEquals(1, zones.dwZoneCount)
743 self.conn.DnssrvOperation2(client_version,
749 dnsserver.DNSSRV_TYPEID_NULL,
752 typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
757 dnsserver.DNSSRV_TYPEID_DWORD,
759 self.assertEquals(0, zones.dwZoneCount)
762 def test_complexoperation2(self):
763 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
764 request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
765 dnsserver.DNS_ZONE_REQUEST_PRIMARY)
767 typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
772 dnsserver.DNSSRV_TYPEID_DWORD,
774 self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
775 self.assertEquals(3, zones.dwZoneCount)
777 request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
778 dnsserver.DNS_ZONE_REQUEST_PRIMARY)
779 typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
784 dnsserver.DNSSRV_TYPEID_DWORD,
786 self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
787 self.assertEquals(0, zones.dwZoneCount)
789 def test_enumrecords2(self):
790 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
791 record_type = dnsp.DNS_TYPE_NS
792 select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
793 dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
794 _, roothints = self.conn.DnssrvEnumRecords2(client_version,
804 self.assertEquals(14, roothints.count) # 1 NS + 13 A records (a-m)
806 def test_updaterecords2(self):
807 client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
808 record_type = dnsp.DNS_TYPE_A
809 select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
812 rec = ARecord('1.2.3.4')
813 rec2 = ARecord('5.6.7.8')
816 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
817 add_rec_buf.rec = rec
818 self.conn.DnssrvUpdateRecord2(client_version,
826 _, result = self.conn.DnssrvEnumRecords2(client_version,
836 self.assertEquals(1, result.count)
837 self.assertEquals(1, result.rec[0].wRecordCount)
838 self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
839 self.assertEquals('1.2.3.4', result.rec[0].records[0].data)
842 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
843 add_rec_buf.rec = rec2
844 del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
845 del_rec_buf.rec = rec
846 self.conn.DnssrvUpdateRecord2(client_version,
854 buflen, result = self.conn.DnssrvEnumRecords2(client_version,
864 self.assertEquals(1, result.count)
865 self.assertEquals(1, result.rec[0].wRecordCount)
866 self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
867 self.assertEquals('5.6.7.8', result.rec[0].records[0].data)
870 del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
871 del_rec_buf.rec = rec2
872 self.conn.DnssrvUpdateRecord2(client_version,
880 self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2,
892 # The following tests do not pass against Samba because the owner and
893 # group are not consistent with Windows, as well as some ACEs.
895 # The following ACE are also required for 2012R2:
897 # (OA;CIIO;WP;ea1b7b93-5e48-46d5-bc6c-4df4fda78a35;bf967a86-0de6-11d0-a285-00aa003049e2;PS)
898 # (OA;OICI;RPWP;3f78c3e5-f79a-46bd-a0b8-9d18116ddc79;;PS)"
900 # [TPM + Allowed-To-Act-On-Behalf-Of-Other-Identity]
901 def test_security_descriptor_msdcs_zone(self):
903 Make sure that security descriptors of the msdcs zone is
907 zones = self.samdb.search(base="DC=ForestDnsZones,%s" % self.samdb.get_default_basedn(),
908 scope=ldb.SCOPE_SUBTREE,
909 expression="(&(objectClass=dnsZone)(name=_msdcs*))",
910 attrs=["nTSecurityDescriptor", "objectClass"])
911 self.assertEqual(len(zones), 1)
912 self.assertTrue("nTSecurityDescriptor" in zones[0])
913 tmp = zones[0]["nTSecurityDescriptor"][0]
914 utils = sd_utils.SDUtils(self.samdb)
915 sd = ndr_unpack(security.descriptor, tmp)
917 domain_sid = security.dom_sid(self.samdb.get_domain_sid())
919 res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
920 expression="(sAMAccountName=DnsAdmins)",
923 dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
925 packed_sd = descriptor.sddl2binary("O:SYG:BA" \
926 "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" \
928 "(A;;RPLCLORC;;;WD)" \
929 "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
930 "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
931 domain_sid, {"DnsAdmins": dns_admin})
932 expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
934 diff = descriptor.get_diff_sds(expected_sd, sd, domain_sid)
935 self.assertEqual(diff, '', "SD of msdcs zone different to expected.\n"
936 "Difference was:\n%s\nExpected: %s\nGot: %s" %
937 (diff, expected_sd.as_sddl(utils.domain_sid),
938 sd.as_sddl(utils.domain_sid)))
940 def test_security_descriptor_forest_zone(self):
942 Make sure that security descriptors of forest dns zones are
945 forest_zone = "test_forest_zone"
946 zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
947 zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
948 zone_create_info.fAging = 0
949 zone_create_info.fDsIntegrated = 1
950 zone_create_info.fLoadExisting = 1
952 zone_create_info.pszZoneName = forest_zone
953 zone_create_info.dwDpFlags = dnsserver.DNS_DP_FOREST_DEFAULT
955 self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
961 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
964 partition_dn = self.samdb.get_default_basedn()
965 partition_dn.add_child("DC=ForestDnsZones")
966 zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
967 expression="(name=%s)" % forest_zone,
968 attrs=["nTSecurityDescriptor"])
969 self.assertEqual(len(zones), 1)
970 current_dn = zones[0].dn
971 self.assertTrue("nTSecurityDescriptor" in zones[0])
972 tmp = zones[0]["nTSecurityDescriptor"][0]
973 utils = sd_utils.SDUtils(self.samdb)
974 sd = ndr_unpack(security.descriptor, tmp)
976 domain_sid = security.dom_sid(self.samdb.get_domain_sid())
978 res = self.samdb.search(base=self.samdb.get_default_basedn(),
979 scope=ldb.SCOPE_SUBTREE,
980 expression="(sAMAccountName=DnsAdmins)",
983 dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
985 packed_sd = descriptor.sddl2binary("O:DAG:DA" \
986 "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" \
988 "(A;;RPLCLORC;;;WD)" \
989 "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
990 "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
991 domain_sid, {"DnsAdmins": dns_admin})
992 expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
994 packed_msdns = descriptor.get_dns_forest_microsoft_dns_descriptor(domain_sid,
995 {"DnsAdmins": dns_admin})
996 expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
998 packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
999 expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
1002 msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
1003 security_desc_dict = [(current_dn.get_linearized(), expected_sd),
1004 (msdns_dn.get_linearized(), expected_msdns_sd),
1005 (partition_dn.get_linearized(), expected_part_sd)]
1007 for (key, sec_desc) in security_desc_dict:
1008 zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
1009 attrs=["nTSecurityDescriptor"])
1010 self.assertTrue("nTSecurityDescriptor" in zones[0])
1011 tmp = zones[0]["nTSecurityDescriptor"][0]
1012 utils = sd_utils.SDUtils(self.samdb)
1014 sd = ndr_unpack(security.descriptor, tmp)
1015 diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
1017 self.assertEqual(diff, '', "Security descriptor of forest DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1018 % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))
1021 self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1027 dnsserver.DNSSRV_TYPEID_NULL,
1030 def test_security_descriptor_domain_zone(self):
1032 Make sure that security descriptors of domain dns zones are
1036 partition_dn = self.samdb.get_default_basedn()
1037 partition_dn.add_child("DC=DomainDnsZones")
1038 zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
1039 expression="(name=%s)" % self.custom_zone,
1040 attrs=["nTSecurityDescriptor"])
1041 self.assertEqual(len(zones), 1)
1042 current_dn = zones[0].dn
1043 self.assertTrue("nTSecurityDescriptor" in zones[0])
1044 tmp = zones[0]["nTSecurityDescriptor"][0]
1045 utils = sd_utils.SDUtils(self.samdb)
1046 sd = ndr_unpack(security.descriptor, tmp)
1047 sddl = sd.as_sddl(utils.domain_sid)
1049 domain_sid = security.dom_sid(self.samdb.get_domain_sid())
1051 res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
1052 expression="(sAMAccountName=DnsAdmins)",
1053 attrs=["objectSid"])
1055 dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
1057 packed_sd = descriptor.sddl2binary("O:DAG:DA" \
1058 "D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" \
1060 "(A;;RPLCLORC;;;WD)" \
1061 "(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" \
1062 "(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
1063 domain_sid, {"DnsAdmins": dns_admin})
1064 expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
1066 packed_msdns = descriptor.get_dns_domain_microsoft_dns_descriptor(domain_sid,
1067 {"DnsAdmins": dns_admin})
1068 expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
1070 packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
1071 expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
1074 msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
1075 security_desc_dict = [(current_dn.get_linearized(), expected_sd),
1076 (msdns_dn.get_linearized(), expected_msdns_sd),
1077 (partition_dn.get_linearized(), expected_part_sd)]
1079 for (key, sec_desc) in security_desc_dict:
1080 zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
1081 attrs=["nTSecurityDescriptor"])
1082 self.assertTrue("nTSecurityDescriptor" in zones[0])
1083 tmp = zones[0]["nTSecurityDescriptor"][0]
1084 utils = sd_utils.SDUtils(self.samdb)
1086 sd = ndr_unpack(security.descriptor, tmp)
1087 diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
1089 self.assertEqual(diff, '', "Security descriptor of domain DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
1090 % (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))