traffic: new version of model with packet_rate, version number
[samba.git] / python / samba / tests / domain_backup.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org>
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 from samba import provision, param
18 import tarfile
19 import os
20 import shutil
21 from samba.tests import (env_loadparm, create_test_ou, BlackboxProcessError,
22                          BlackboxTestCase, connect_samdb)
23 import ldb
24 from samba.samdb import SamDB
25 from samba.auth import system_session
26 from samba import Ldb, dn_from_dns_name
27 from samba.netcmd.fsmo import get_fsmo_roleowner
28 import re
29 from samba import sites
30 from samba.dsdb import _dsdb_load_udv_v2
31
32
33 def get_prim_dom(secrets_path, lp):
34     secrets_ldb = Ldb(secrets_path, session_info=system_session(), lp=lp)
35     return secrets_ldb.search(base="CN=Primary Domains",
36                               attrs=['objectClass', 'samAccountName',
37                                      'secret', 'msDS-KeyVersionNumber'],
38                               scope=ldb.SCOPE_SUBTREE,
39                               expression="(objectClass=kerberosSecret)")
40
41 # The backup tests require that a completely clean LoadParm object gets used
42 # for the restore. Otherwise the same global LP gets re-used, and the LP
43 # settings can bleed from one test case to another.
44 # To do this, these tests should use check_output(), which executes the command
45 # in a separate process (as opposed to runcmd(), runsubcmd()).
46 # So although this is a samba-tool test, we don't inherit from SambaToolCmdTest
47 # so that we never inadvertently use .runcmd() by accident.
48 class DomainBackupBase(BlackboxTestCase):
49
50     def setUp(self):
51         super(DomainBackupBase, self).setUp()
52
53         server = os.environ["DC_SERVER"]
54         self.user_auth = "-U%s%%%s" % (os.environ["DC_USERNAME"],
55                                        os.environ["DC_PASSWORD"])
56
57         # LDB connection to the original server being backed up
58         self.ldb = connect_samdb("ldap://%s" % server)
59         self.new_server = "BACKUPSERV"
60         self.server = server.upper()
61         self.base_cmd = None
62         self.backup_markers = ['sidForRestore', 'backupDate']
63         self.restore_domain = os.environ["DOMAIN"]
64         self.restore_realm = os.environ["REALM"]
65         self.backend = None
66
67     def use_backend(self, backend):
68         """Explicitly set the DB backend that the backup should use"""
69         self.backend = backend
70         self.base_cmd += ["--backend-store=" + backend]
71
72     def get_expected_partitions(self, samdb):
73         basedn = str(samdb.get_default_basedn())
74         config_dn = "CN=Configuration,%s" % basedn
75         return [basedn, config_dn, "CN=Schema,%s" % config_dn,
76                 "DC=DomainDnsZones,%s" % basedn,
77                 "DC=ForestDnsZones,%s" % basedn]
78
79     def assert_partitions_present(self, samdb):
80         """Asserts all expected partitions are present in the backup samdb"""
81         res = samdb.search(base="", scope=ldb.SCOPE_BASE,
82                            attrs=['namingContexts'])
83         actual_ncs = [str(r) for r in res[0].get('namingContexts')]
84
85         expected_ncs = self.get_expected_partitions(samdb)
86
87         for nc in expected_ncs:
88             self.assertTrue(nc in actual_ncs,
89                             "%s not in %s" % (nc, str(actual_ncs)))
90
91     def assert_repl_uptodate_vector(self, samdb):
92         """Asserts an replUpToDateVector entry exists for the original DC"""
93         orig_invoc_id = self.ldb.get_invocation_id()
94         expected_ncs = self.get_expected_partitions(samdb)
95
96         # loop through the partitions and check the upToDateness vector
97         for nc in expected_ncs:
98             found = False
99             for cursor in _dsdb_load_udv_v2(samdb, nc):
100                 if orig_invoc_id == str(cursor.source_dsa_invocation_id):
101                     found = True
102                     break
103             self.assertTrue(found, "Couldn't find UDTV for original DC")
104
105     def assert_dcs_present(self, samdb, expected_server, expected_count=None):
106         """Checks that the expected server is present in the restored DB"""
107         search_expr = "(&(objectClass=Server)(serverReference=*))"
108         res = samdb.search(samdb.get_config_basedn(),
109                            scope=ldb.SCOPE_SUBTREE,
110                            expression=search_expr)
111         server_found = False
112         for msg in res:
113             if expected_server in str(msg.dn):
114                 server_found = True
115
116         self.assertTrue(server_found,
117                         "Could not find %s server" % expected_server)
118
119         if expected_count:
120             self.assertTrue(len(res) == expected_count)
121
122     def restore_dir(self):
123         extract_dir = os.path.join(self.tempdir, 'tree')
124         if not os.path.exists(extract_dir):
125             os.mkdir(extract_dir)
126             self.addCleanup(shutil.rmtree, extract_dir)
127         return extract_dir
128
129     def untar_backup(self, backup_file):
130         """Untar the backup file's raw contents (i.e. not a proper restore)"""
131         extract_dir = self.restore_dir()
132         with tarfile.open(backup_file) as tf:
133             tf.extractall(extract_dir)
134
135     def _test_backup_untar(self, primary_domain_secrets=0):
136         """Creates a backup, untars the raw files, and sanity-checks the DB"""
137         backup_file = self.create_backup()
138         self.untar_backup(backup_file)
139
140         private_dir = os.path.join(self.restore_dir(), "private")
141         samdb_path = os.path.join(private_dir, "sam.ldb")
142         lp = env_loadparm()
143         samdb = SamDB(url=samdb_path, session_info=system_session(), lp=lp)
144
145         # check that backup markers were added to the DB
146         res = samdb.search(base=ldb.Dn(samdb, "@SAMBA_DSDB"),
147                            scope=ldb.SCOPE_BASE,
148                            attrs=self.backup_markers)
149         self.assertEqual(len(res), 1)
150         for marker in self.backup_markers:
151             self.assertIsNotNone(res[0].get(marker),
152                                  "%s backup marker missing" % marker)
153
154         # check the secrets.ldb entry for the primary domain. (Online/clone
155         # backups shouldn't have this, as they never got it during the backup)
156         secrets_path = os.path.join(private_dir, "secrets.ldb")
157         res = get_prim_dom(secrets_path, lp)
158         self.assertEqual(len(res), primary_domain_secrets)
159
160         # sanity-check that all the partitions got backed up
161         self.assert_partitions_present(samdb)
162
163     def _test_backup_restore(self):
164         """Does a backup/restore, with specific checks of the resulting DB"""
165         backup_file = self.create_backup()
166         self.restore_backup(backup_file)
167         lp = self.check_restored_smbconf()
168         self.check_restored_database(lp)
169
170     def _test_backup_restore_no_secrets(self):
171         """Does a backup/restore with secrets excluded from the resulting DB"""
172
173         # exclude secrets when we create the backup
174         backup_file = self.create_backup(extra_args=["--no-secrets"])
175         self.restore_backup(backup_file)
176         lp = self.check_restored_smbconf()
177
178         # assert that we don't find user secrets in the DB
179         self.check_restored_database(lp, expect_secrets=False)
180
181     def _test_backup_restore_into_site(self):
182         """Does a backup and restores into a non-default site"""
183
184         # create a new non-default site
185         sitename = "Test-Site-For-Backups"
186         sites.create_site(self.ldb, self.ldb.get_config_basedn(), sitename)
187         self.addCleanup(sites.delete_site, self.ldb,
188                         self.ldb.get_config_basedn(), sitename)
189
190         # restore the backup DC into the site we just created
191         backup_file = self.create_backup()
192         self.restore_backup(backup_file, ["--site=" + sitename])
193
194         lp = self.check_restored_smbconf()
195         restored_ldb = self.check_restored_database(lp)
196
197         # check the restored DC was added to the site we created, i.e. there's
198         # an entry matching the new DC sitting underneath the site DN
199         site_dn = "CN={0},CN=Sites,{1}".format(sitename,
200                                                restored_ldb.get_config_basedn())
201         match_server = "(&(objectClass=server)(cn={0}))".format(self.new_server)
202         res = restored_ldb.search(site_dn, scope=ldb.SCOPE_SUBTREE,
203                                   expression=match_server)
204         self.assertTrue(len(res) == 1,
205                         "Failed to find new DC under site")
206
207     def create_smbconf(self, settings):
208         """Creates a very basic smb.conf to pass to the restore tool"""
209
210         # without the testenv config's settings, the NTACL backup_restore()
211         # operation will fail (because we're not root). So first suck in all
212         # testenv's settings, so we retain these in the new config. Note we
213         # use a non-global LP so that these settings don't leak into other
214         # places we use LoadParms
215         testenv_conf = os.environ["SMB_CONF_PATH"]
216         local_lp = param.LoadParm(filename_for_non_global_lp=testenv_conf)
217
218         # add the new settings to the LP, then write the settings to file
219         for key, val in settings.items():
220             local_lp.set(key, val)
221
222         new_smbconf = os.path.join(self.tempdir, "smb.conf")
223         local_lp.dump(False, new_smbconf)
224
225         self.addCleanup(os.remove, new_smbconf)
226         return new_smbconf
227
228     def _test_backup_restore_with_conf(self):
229         """Checks smb.conf values passed to the restore are retained"""
230         backup_file = self.create_backup()
231
232         # create an smb.conf that we pass to the restore. The netbios/state
233         # dir should get overridden by the restore, the other settings should
234         # trickle through into the restored dir's smb.conf
235         settings = {'state directory': '/var/run',
236                     'netbios name': 'FOOBAR',
237                     'workgroup': 'NOTMYDOMAIN',
238                     'realm': 'NOT.MY.REALM'}
239         assert_settings = {'drs: max link sync': '275',
240                            'prefork children': '7'}
241         settings.update(assert_settings)
242         smbconf = self.create_smbconf(settings)
243
244         self.restore_backup(backup_file, ["--configfile=" + smbconf])
245
246         # this will check netbios name/state dir
247         lp = self.check_restored_smbconf()
248         self.check_restored_database(lp)
249
250         # check the remaining settings are still intact
251         for key, val in assert_settings.items():
252             self.assertEqual(str(lp.get(key)), val,
253                              "'%s' was '%s' in smb.conf" % (key, lp.get(key)))
254
255     def check_restored_smbconf(self):
256         """Sanity-check important smb.conf values are restored correctly"""
257         smbconf = os.path.join(self.restore_dir(), "etc", "smb.conf")
258         bkp_lp = param.LoadParm(filename_for_non_global_lp=smbconf)
259         self.assertEqual(bkp_lp.get('netbios name'), self.new_server)
260         self.assertEqual(bkp_lp.get('workgroup'), self.restore_domain)
261         self.assertEqual(bkp_lp.get('realm'), self.restore_realm.upper())
262
263         # we restore with a fixed directory structure, so we can sanity-check
264         # that the core filepaths settings are what we expect them to be
265         private_dir = os.path.join(self.restore_dir(), "private")
266         self.assertEqual(bkp_lp.get('private dir'), private_dir)
267         state_dir = os.path.join(self.restore_dir(), "state")
268         self.assertEqual(bkp_lp.get('state directory'), state_dir)
269         return bkp_lp
270
271     def check_restored_database(self, bkp_lp, expect_secrets=True):
272         paths = provision.provision_paths_from_lp(bkp_lp, bkp_lp.get("realm"))
273
274         bkp_pd = get_prim_dom(paths.secrets, bkp_lp)
275         self.assertEqual(len(bkp_pd), 1)
276         acn = bkp_pd[0].get('samAccountName')
277         self.assertIsNotNone(acn)
278         self.assertEqual(str(acn[0]), self.new_server + '$')
279         self.assertIsNotNone(bkp_pd[0].get('secret'))
280
281         samdb = SamDB(url=paths.samdb, session_info=system_session(),
282                       lp=bkp_lp, credentials=self.get_credentials())
283
284         # check that the backup markers have been removed from the restored DB
285         res = samdb.search(base=ldb.Dn(samdb, "@SAMBA_DSDB"),
286                            scope=ldb.SCOPE_BASE,
287                            attrs=self.backup_markers)
288         self.assertEqual(len(res), 1)
289         for marker in self.backup_markers:
290             self.assertIsNone(res[0].get(marker),
291                               "%s backup-marker left behind" % marker)
292
293         # check that the repsFrom and repsTo values have been removed
294         # from the restored DB
295         res = samdb.search(base=samdb.get_default_basedn(),
296                            scope=ldb.SCOPE_BASE,
297                            attrs=['repsFrom', 'repsTo'])
298         self.assertEqual(len(res), 1)
299         self.assertIsNone(res[0].get('repsFrom'))
300         self.assertIsNone(res[0].get('repsTo'))
301
302         res = samdb.search(base=samdb.get_config_basedn(),
303                            scope=ldb.SCOPE_BASE,
304                            attrs=['repsFrom', 'repsTo'])
305         self.assertEqual(len(res), 1)
306         self.assertIsNone(res[0].get('repsFrom'))
307         self.assertIsNone(res[0].get('repsTo'))
308
309         # check the DB is using the backend we supplied
310         if self.backend:
311             res = samdb.search(base="@PARTITION", scope=ldb.SCOPE_BASE,
312                                attrs=["backendStore"])
313             backend = str(res[0].get("backendStore"))
314             self.assertEqual(backend, self.backend)
315
316         # check the restored DB has the expected partitions/DC/FSMO roles
317         self.assert_partitions_present(samdb)
318         self.assert_dcs_present(samdb, self.new_server, expected_count=1)
319         self.assert_fsmo_roles(samdb, self.new_server, self.server)
320         self.assert_secrets(samdb, expect_secrets=expect_secrets)
321
322         # check we still have an uptodateness vector for the original DC
323         self.assert_repl_uptodate_vector(samdb)
324         return samdb
325
326     def assert_user_secrets(self, samdb, username, expect_secrets):
327         """Asserts that a user has/doesn't have secrets as expected"""
328         basedn = str(samdb.get_default_basedn())
329         user_dn = "CN=%s,CN=users,%s" % (username, basedn)
330
331         if expect_secrets:
332             self.assertIsNotNone(samdb.searchone("unicodePwd", user_dn))
333         else:
334             # the search should throw an exception because the secrets
335             # attribute isn't actually there
336             self.assertRaises(KeyError, samdb.searchone, "unicodePwd", user_dn)
337
338     def assert_secrets(self, samdb, expect_secrets):
339         """Check the user secrets in the restored DB match what's expected"""
340
341         # check secrets for the built-in testenv users match what's expected
342         test_users = ["alice", "bob", "jane"]
343         for user in test_users:
344             self.assert_user_secrets(samdb, user, expect_secrets)
345
346     def assert_fsmo_roles(self, samdb, server, exclude_server):
347         """Asserts the expected server is the FSMO role owner"""
348         domain_dn = samdb.domain_dn()
349         forest_dn = dn_from_dns_name(samdb.forest_dns_name())
350         fsmos = {'infrastructure': "CN=Infrastructure," + domain_dn,
351                  'naming': "CN=Partitions,%s" % samdb.get_config_basedn(),
352                  'schema': str(samdb.get_schema_basedn()),
353                  'rid': "CN=RID Manager$,CN=System," + domain_dn,
354                  'pdc': domain_dn,
355                  'domaindns':
356                      "CN=Infrastructure,DC=DomainDnsZones," + domain_dn,
357                  'forestdns':
358                      "CN=Infrastructure,DC=ForestDnsZones," + forest_dn}
359         for role, dn in fsmos.items():
360             owner = get_fsmo_roleowner(samdb, ldb.Dn(samdb, dn), role)
361             self.assertTrue("CN={0},".format(server) in owner.extended_str(),
362                             "Expected %s to own FSMO role %s" % (server, role))
363             self.assertTrue("CN={0},".format(exclude_server)
364                             not in owner.extended_str(),
365                             "%s found as FSMO %s role owner" % (server, role))
366
367     def cleanup_tempdir(self):
368         for filename in os.listdir(self.tempdir):
369             filepath = os.path.join(self.tempdir, filename)
370             shutil.rmtree(filepath)
371
372     def run_cmd(self, args):
373         """Executes a samba-tool backup/restore command"""
374
375         cmd = " ".join(args)
376         print("Executing: samba-tool %s" % cmd)
377         try:
378             # note: it's important we run the cmd in a separate process here
379             out = self.check_output("samba-tool " + cmd)
380         except BlackboxProcessError as e:
381             # if the command failed, it may have left behind temporary files.
382             # We're going to fail the test, but first cleanup any temp files so
383             # that we skip the TestCaseInTempDir._remove_tempdir() assertions
384             self.cleanup_tempdir()
385             self.fail("Error calling samba-tool: %s" % e)
386         print(out)
387
388     def create_backup(self, extra_args=None):
389         """Runs the backup cmd to produce a backup file for the testenv DC"""
390         # Run the backup command and check we got one backup tar file
391         args = self.base_cmd + ["--targetdir=" + self.tempdir]
392         if extra_args:
393             args += extra_args
394
395         self.run_cmd(args)
396
397         # find the filename of the backup-file generated
398         tar_files = []
399         for fn in os.listdir(self.tempdir):
400             if (fn.startswith("samba-backup-") and fn.endswith(".tar.bz2")):
401                 tar_files.append(fn)
402
403         self.assertTrue(len(tar_files) == 1,
404                         "Domain backup created %u tar files" % len(tar_files))
405
406         # clean up the backup file once the test finishes
407         backup_file = os.path.join(self.tempdir, tar_files[0])
408         self.addCleanup(os.remove, backup_file)
409         return backup_file
410
411     def restore_backup(self, backup_file, extra_args=None):
412         """Restores the samba directory files from a given backup"""
413         # Run the restore command
414         extract_dir = self.restore_dir()
415         args = ["domain", "backup", "restore", "--backup-file=" + backup_file,
416                 "--targetdir=" + extract_dir,
417                 "--newservername=" + self.new_server]
418         if extra_args:
419             args += extra_args
420
421         self.run_cmd(args)
422
423         # sanity-check the restore doesn't modify the original DC by mistake
424         self.assert_partitions_present(self.ldb)
425         self.assert_dcs_present(self.ldb, self.server)
426         self.assert_fsmo_roles(self.ldb, self.server, self.new_server)
427
428
429 class DomainBackupOnline(DomainBackupBase):
430
431     def setUp(self):
432         super(DomainBackupOnline, self).setUp()
433         self.base_cmd = ["domain", "backup", "online",
434                          "--server=" + self.server, self.user_auth]
435
436     # run the common test cases above using online backups
437     def test_backup_untar(self):
438         self._test_backup_untar()
439
440     def test_backup_restore(self):
441         self.use_backend("tdb")
442         self._test_backup_restore()
443
444     def test_backup_restore_with_conf(self):
445         self.use_backend("mdb")
446         self._test_backup_restore_with_conf()
447
448     def test_backup_restore_no_secrets(self):
449         self.use_backend("tdb")
450         self._test_backup_restore_no_secrets()
451
452     def test_backup_restore_into_site(self):
453         self.use_backend("mdb")
454         self._test_backup_restore_into_site()
455
456
457 class DomainBackupRename(DomainBackupBase):
458
459     # run the above test cases using a rename backup
460     def setUp(self):
461         super(DomainBackupRename, self).setUp()
462         self.new_server = "RENAMESERV"
463         self.restore_domain = "NEWDOMAIN"
464         self.restore_realm = "rename.test.net"
465         self.new_basedn = "DC=rename,DC=test,DC=net"
466         self.base_cmd = ["domain", "backup", "rename", self.restore_domain,
467                          self.restore_realm, "--server=" + self.server,
468                          self.user_auth]
469         self.backup_markers += ['backupRename']
470
471     # run the common test case code for backup-renames
472     def test_backup_untar(self):
473         self._test_backup_untar()
474
475     def test_backup_restore(self):
476         self.use_backend("mdb")
477         self._test_backup_restore()
478
479     def test_backup_restore_with_conf(self):
480         self.use_backend("tdb")
481         self._test_backup_restore_with_conf()
482
483     def test_backup_restore_no_secrets(self):
484         self.use_backend("mdb")
485         self._test_backup_restore_no_secrets()
486
487     def test_backup_restore_into_site(self):
488         self.use_backend("tdb")
489         self._test_backup_restore_into_site()
490
491     def test_backup_invalid_args(self):
492         """Checks that rename commands with invalid args are rejected"""
493
494         # try a "rename" using the same realm as the DC currently has
495         rename_cmd = "samba-tool domain backup rename "
496         bad_cmd = "{cmd} {domain} {realm}".format(cmd=rename_cmd,
497                                                   domain=self.restore_domain,
498                                                   realm=os.environ["REALM"])
499         self.assertRaises(BlackboxProcessError, self.check_output, bad_cmd)
500
501         # try a "rename" using the same domain as the DC currently has
502         bad_cmd = "{cmd} {domain} {realm}".format(cmd=rename_cmd,
503                                                   domain=os.environ["DOMAIN"],
504                                                   realm=self.restore_realm)
505         self.assertRaises(BlackboxProcessError, self.check_output, bad_cmd)
506
507     def add_link(self, attr, source, target):
508         m = ldb.Message()
509         m.dn = ldb.Dn(self.ldb, source)
510         m[attr] = ldb.MessageElement(target, ldb.FLAG_MOD_ADD, attr)
511         self.ldb.modify(m)
512
513     def test_one_way_links(self):
514         """Sanity-check that a rename handles one-way links correctly"""
515
516         # Do some initial setup on the DC before back it up:
517         # create an OU to hold the test objects we'll create
518         test_ou = create_test_ou(self.ldb, "rename_test")
519         self.addCleanup(self.ldb.delete, test_ou, ["tree_delete:1"])
520
521         # create the source and target objects and link them together.
522         # We use addressBookRoots2 here because it's a one-way link
523         src_dn = "CN=link_src,%s" % test_ou
524         self.ldb.add({"dn": src_dn,
525                       "objectclass": "msExchConfigurationContainer"})
526         target_dn = "OU=link_tgt,%s" % test_ou
527         self.ldb.add({"dn": target_dn, "objectclass": "organizationalunit"})
528         link_attr = "addressBookRoots2"
529         self.add_link(link_attr, src_dn, target_dn)
530
531         # add a second link target that's in a different partition
532         server_dn = ("CN=testrename,CN=Servers,CN=Default-First-Site-Name,"
533                      "CN=Sites,%s" % str(self.ldb.get_config_basedn()))
534         self.ldb.add({"dn": server_dn, "objectclass": "server"})
535         self.addCleanup(self.ldb.delete, server_dn)
536         self.add_link(link_attr, src_dn, server_dn)
537
538         # do the backup/restore
539         backup_file = self.create_backup()
540         self.restore_backup(backup_file)
541         lp = self.check_restored_smbconf()
542         restored_ldb = self.check_restored_database(lp)
543
544         # work out what the new DNs should be
545         old_basedn = str(self.ldb.get_default_basedn())
546         new_target_dn = re.sub(old_basedn + '$', self.new_basedn, target_dn)
547         new_src_dn = re.sub(old_basedn + '$', self.new_basedn, src_dn)
548         new_server_dn = re.sub(old_basedn + '$', self.new_basedn, server_dn)
549
550         # check the links exist in the renamed DB with the correct DNs
551         res = restored_ldb.search(base=new_src_dn, scope=ldb.SCOPE_BASE,
552                                   attrs=[link_attr])
553         self.assertEqual(len(res), 1,
554                          "Failed to find renamed link source object")
555         self.assertTrue(link_attr in res[0], "Missing link attribute")
556         link_values = [str(x) for x in res[0][link_attr]]
557         self.assertTrue(new_target_dn in link_values)
558         self.assertTrue(new_server_dn in link_values)
559
560     # extra checks we run on the restored DB in the rename case
561     def check_restored_database(self, lp, expect_secrets=True):
562         # run the common checks over the restored DB
563         common_test = super(DomainBackupRename, self)
564         samdb = common_test.check_restored_database(lp, expect_secrets)
565
566         # check we have actually renamed the DNs
567         basedn = str(samdb.get_default_basedn())
568         self.assertEqual(basedn, self.new_basedn)
569
570         # check the partition and netBIOS name match the new domain
571         partitions_dn = samdb.get_partitions_dn()
572         nc_name = ldb.binary_encode(str(basedn))
573         res = samdb.search(base=partitions_dn, scope=ldb.SCOPE_ONELEVEL,
574                            attrs=["nETBIOSName", "cn"],
575                            expression='ncName=%s' % nc_name)
576         self.assertEqual(len(res), 1,
577                          "Looking up partition's NetBIOS name failed")
578         self.assertEqual(str(res[0].get("nETBIOSName")), self.restore_domain)
579         self.assertEqual(str(res[0].get("cn")), self.restore_domain)
580
581         # check the DC has the correct dnsHostname
582         realm = self.restore_realm
583         dn = "CN=%s,OU=Domain Controllers,%s" % (self.new_server,
584                                                  self.new_basedn)
585         res = samdb.search(base=dn, scope=ldb.SCOPE_BASE,
586                            attrs=["dNSHostName"])
587         self.assertEqual(len(res), 1,
588                          "Looking up new DC's dnsHostname failed")
589         expected_val = "%s.%s" % (self.new_server.lower(), realm)
590         self.assertEqual(str(res[0].get("dNSHostName")), expected_val)
591
592         # check the DNS zones for the new realm are present
593         dn = "DC=%s,CN=MicrosoftDNS,DC=DomainDnsZones,%s" % (realm, basedn)
594         res = samdb.search(base=dn, scope=ldb.SCOPE_BASE)
595         self.assertEqual(len(res), 1, "Lookup of new domain's DNS zone failed")
596
597         forestdn = samdb.get_root_basedn().get_linearized()
598         dn = "DC=_msdcs.%s,CN=MicrosoftDNS,DC=ForestDnsZones,%s" % (realm,
599                                                                     forestdn)
600         res = samdb.search(base=dn, scope=ldb.SCOPE_BASE)
601         self.assertEqual(len(res), 1, "Lookup of new domain's DNS zone failed")
602         return samdb
603
604
605 class DomainBackupOffline(DomainBackupBase):
606
607     def setUp(self):
608         super(DomainBackupOffline, self).setUp()
609         self.base_cmd = ["domain", "backup", "offline"]
610
611     def test_backup_untar(self):
612         self._test_backup_untar(primary_domain_secrets=1)
613
614     def test_backup_restore_with_conf(self):
615         self._test_backup_restore_with_conf()
616
617     def test_backup_restore(self):
618         self._test_backup_restore()
619
620     def test_backup_restore_into_site(self):
621         self._test_backup_restore_into_site()