import samba
import os
-from tempfile import mkdtemp
import time
import shutil
+import sys
+import subprocess
import samba.tests
from samba.kcc import ldif_import_export, KCC
)
-class LdifUtilTests(samba.tests.TestCase):
+class LdifImportExportTests(samba.tests.TestCaseInTempDir):
def setUp(self):
- super(LdifUtilTests, self).setUp()
+ super(LdifImportExportTests, self).setUp()
self.lp = LoadParm()
self.creds = Credentials()
self.creds.guess(self.lp)
- #self.creds.set_machine_account(self.lp)
- self.tmpdir = mkdtemp()
- def tearDown(self):
- #shutil.rmtree(self.tmpdir)
- pass
+ def remove_files(self, *files):
+ for f in files:
+ assert(f.startswith(self.tempdir))
+ os.unlink(f)
def test_write_search_url(self):
pass
- #write_search_result(samdb, f, res)
def test_ldif_to_samdb(self):
- dburl = os.path.join(self.tmpdir, "ldap")
+ dburl = os.path.join(self.tempdir, "ldap")
samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
MULTISITE_LDIF)
self.assertIsInstance(samdb, SamDB)
dn = ldb.Dn(samdb,
service_name_res[0]["dsServiceName"][0])
self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
+ self.remove_files(dburl)
def test_ldif_to_samdb_forced_local_dsa(self):
for dsa, site in MULTISITE_LDIF_DSAS:
- dburl = os.path.join(self.tmpdir, "ldif-to-samba-forced-local-dsa"
+ dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa"
"-%s" % dsa)
samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
MULTISITE_LDIF,
dn = ldb.Dn(samdb,
service_name_res[0]["dsServiceName"][0])
self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
+ self.remove_files(dburl)
- def samdb_to_ldif_file(self):
- #samdb_to_ldif_file(samdb, dburl, lp, creds, ldif_file):
- pass
+
+ def test_samdb_to_ldif_file(self):
+ dburl = os.path.join(self.tempdir, "ldap")
+ dburl2 = os.path.join(self.tempdir, "ldap_roundtrip")
+ ldif_file = os.path.join(self.tempdir, "ldif")
+ samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
+ MULTISITE_LDIF)
+ self.assertIsInstance(samdb, SamDB)
+ ldif_import_export.samdb_to_ldif_file(samdb, dburl,
+ lp=self.lp, creds=None,
+ ldif_file=ldif_file)
+ self.assertGreater(os.path.getsize(ldif_file), 1000,
+ "LDIF should be larger than 1000 bytes")
+ samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp,
+ ldif_file)
+ self.assertIsInstance(samdb, SamDB)
+ dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
+ "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
+ res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
+ scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
+ self.remove_files(dburl)
+ self.remove_files(dburl2)
+ self.remove_files(ldif_file)
-class KCCMultisiteLdifTests(samba.tests.TestCase):
+class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir):
def setUp(self):
super(KCCMultisiteLdifTests, self).setUp()
self.lp = LoadParm()
self.creds = Credentials()
self.creds.guess(self.lp)
- self.tmpdir = mkdtemp()
- self.tmpdb = os.path.join(self.tmpdir, 'tmpdb')
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
+ def remove_files(self, *files):
+ for f in files:
+ assert(f.startswith(self.tempdir))
+ os.unlink(f)
def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None):
# Note that setting read-only to False won't affect the ldif,
# only the temporary database that is created from it.
my_kcc = KCC(unix_now, readonly=readonly, verify=verify,
dot_file_dir=dot_file_dir)
- tmpdb = os.path.join(self.tmpdir, 'tmpdb')
+ tmpdb = os.path.join(self.tempdir, 'tmpdb')
my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
+ self.remove_files(tmpdb)
return my_kcc
def test_list_dsas(self):
"""Check that the KCC generates graphs that pass its own verify
option.
"""
- my_kcc = self._get_kcc('test-list', verify=True)
- tmpdb = os.path.join(self.tmpdir, 'verify-tmpdb')
+ my_kcc = self._get_kcc('test-verify', verify=True)
+ tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
my_kcc.run("ldap://%s" % tmpdb,
self.lp, self.creds,
attempt_live_connections=False)
+ self.remove_files(tmpdb)
+
+ def test_dotfiles(self):
+ """Check that KCC writes dot_files when asked.
+ """
+ my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir)
+ tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb')
+ files = [tmpdb]
+ my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
+ my_kcc.run("ldap://%s" % tmpdb,
+ self.lp, self.creds,
+ attempt_live_connections=False)
+
+ dot = '/usr/bin/dot'
+ for fn in os.listdir(self.tempdir):
+ if fn.endswith('.dot'):
+ ffn = os.path.join(self.tempdir, fn)
+ if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0:
+ r = subprocess.call([dot, '-Tcanon', ffn])
+ self.assertEqual(r, 0)
+
+ #even if dot is not there, at least check the file is non-empty
+ size = os.stat(ffn).st_size
+ self.assertNotEqual(size, 0)
+ files.append(ffn)
+
+ self.remove_files(*files)