selftest: Add tests for samdb_to_ldif_file
[obnox/samba/samba-obnox.git] / python / samba / tests / kcc / ldif_import_export.py
index d7716649a06a9f945387baeaadf7f5fd3517843e..f3352e20cbb0e01e189b49ff453cac67daf21c3c 100644 (file)
 
 import samba
 import os
-from tempfile import mkdtemp
 import time
 import shutil
+import sys
+import subprocess
 
 import samba.tests
 from samba.kcc import ldif_import_export, KCC
@@ -64,25 +65,23 @@ MULTISITE_LDIF_DSAS = (
 )
 
 
-class LdifUtilTests(samba.tests.TestCase):
+class LdifImportExportTests(samba.tests.TestCaseInTempDir):
     def setUp(self):
-        super(LdifUtilTests, self).setUp()
+        super(LdifImportExportTests, self).setUp()
         self.lp = LoadParm()
         self.creds = Credentials()
         self.creds.guess(self.lp)
-        #self.creds.set_machine_account(self.lp)
-        self.tmpdir = mkdtemp()
 
-    def tearDown(self):
-        #shutil.rmtree(self.tmpdir)
-        pass
+    def remove_files(self, *files):
+        for f in files:
+            assert(f.startswith(self.tempdir))
+            os.unlink(f)
 
     def test_write_search_url(self):
         pass
-        #write_search_result(samdb, f, res)
 
     def test_ldif_to_samdb(self):
-        dburl = os.path.join(self.tmpdir, "ldap")
+        dburl = os.path.join(self.tempdir, "ldap")
         samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
                                                  MULTISITE_LDIF)
         self.assertIsInstance(samdb, SamDB)
@@ -101,10 +100,11 @@ class LdifUtilTests(samba.tests.TestCase):
         dn = ldb.Dn(samdb,
                     service_name_res[0]["dsServiceName"][0])
         self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
+        self.remove_files(dburl)
 
     def test_ldif_to_samdb_forced_local_dsa(self):
         for dsa, site in MULTISITE_LDIF_DSAS:
-            dburl = os.path.join(self.tmpdir, "ldif-to-samba-forced-local-dsa"
+            dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa"
                                  "-%s" % dsa)
             samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
                                                      MULTISITE_LDIF,
@@ -124,31 +124,53 @@ class LdifUtilTests(samba.tests.TestCase):
             dn = ldb.Dn(samdb,
                         service_name_res[0]["dsServiceName"][0])
             self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
+            self.remove_files(dburl)
 
-    def samdb_to_ldif_file(self):
-        #samdb_to_ldif_file(samdb, dburl, lp, creds, ldif_file):
-        pass
+
+    def test_samdb_to_ldif_file(self):
+        dburl = os.path.join(self.tempdir, "ldap")
+        dburl2 = os.path.join(self.tempdir, "ldap_roundtrip")
+        ldif_file = os.path.join(self.tempdir, "ldif")
+        samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
+                                                 MULTISITE_LDIF)
+        self.assertIsInstance(samdb, SamDB)
+        ldif_import_export.samdb_to_ldif_file(samdb, dburl,
+                                              lp=self.lp, creds=None,
+                                              ldif_file=ldif_file)
+        self.assertGreater(os.path.getsize(ldif_file), 1000,
+                           "LDIF should be larger than 1000 bytes")
+        samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp,
+                                                 ldif_file)
+        self.assertIsInstance(samdb, SamDB)
+        dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
+               "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
+        res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
+                           scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
+        self.remove_files(dburl)
+        self.remove_files(dburl2)
+        self.remove_files(ldif_file)
 
 
-class KCCMultisiteLdifTests(samba.tests.TestCase):
+class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir):
     def setUp(self):
         super(KCCMultisiteLdifTests, self).setUp()
         self.lp = LoadParm()
         self.creds = Credentials()
         self.creds.guess(self.lp)
-        self.tmpdir = mkdtemp()
-        self.tmpdb = os.path.join(self.tmpdir, 'tmpdb')
 
-    def tearDown(self):
-        shutil.rmtree(self.tmpdir)
+    def remove_files(self, *files):
+        for f in files:
+            assert(f.startswith(self.tempdir))
+            os.unlink(f)
 
     def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None):
         # Note that setting read-only to False won't affect the ldif,
         # only the temporary database that is created from it.
         my_kcc = KCC(unix_now, readonly=readonly, verify=verify,
                      dot_file_dir=dot_file_dir)
-        tmpdb = os.path.join(self.tmpdir, 'tmpdb')
+        tmpdb = os.path.join(self.tempdir, 'tmpdb')
         my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
+        self.remove_files(tmpdb)
         return my_kcc
 
     def test_list_dsas(self):
@@ -161,10 +183,37 @@ class KCCMultisiteLdifTests(samba.tests.TestCase):
         """Check that the KCC generates graphs that pass its own verify
         option.
         """
-        my_kcc = self._get_kcc('test-list', verify=True)
-        tmpdb = os.path.join(self.tmpdir, 'verify-tmpdb')
+        my_kcc = self._get_kcc('test-verify', verify=True)
+        tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
         my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
 
         my_kcc.run("ldap://%s" % tmpdb,
                    self.lp, self.creds,
                    attempt_live_connections=False)
+        self.remove_files(tmpdb)
+
+    def test_dotfiles(self):
+        """Check that KCC writes dot_files when asked.
+        """
+        my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir)
+        tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb')
+        files = [tmpdb]
+        my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
+        my_kcc.run("ldap://%s" % tmpdb,
+                   self.lp, self.creds,
+                   attempt_live_connections=False)
+
+        dot = '/usr/bin/dot'
+        for fn in os.listdir(self.tempdir):
+            if fn.endswith('.dot'):
+                ffn = os.path.join(self.tempdir, fn)
+                if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0:
+                    r = subprocess.call([dot, '-Tcanon', ffn])
+                    self.assertEqual(r, 0)
+
+                #even if dot is not there, at least check the file is non-empty
+                size = os.stat(ffn).st_size
+                self.assertNotEqual(size, 0)
+                files.append(ffn)
+
+        self.remove_files(*files)