PEP8: fix E128: continuation line under-indented for visual indent
[bbaumbach/samba-autobuild/.git] / python / samba / tests / kcc / ldif_import_export.py
1 # Unix SMB/CIFS implementation. Tests for samba.kcc.ldif_import_export.
2 # Copyright (C) Andrew Bartlett 2015
3 #
4 # Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 """Tests for samba.kcc.ldif_import_export"""
21
22 import samba
23 import os
24 import time
25 import shutil
26 import sys
27 import subprocess
28 import logging
29 import samba.tests
30 from samba.kcc import ldif_import_export, KCC
31 from samba import ldb
32 from samba.dcerpc import misc
33
34
35 from samba.param import LoadParm
36 from samba.credentials import Credentials
37 from samba.samdb import SamDB
38
39 unix_now = int(time.time())
40
41 MULTISITE_LDIF = os.path.join(os.environ['SRCDIR_ABS'],
42                               "testdata/ldif-utils-test-multisite.ldif")
43
44
45 # UNCONNECTED_LDIF is a single site, unconnected 5DC database that was
46 # created using samba-tool domain join in testenv.
47 UNCONNECTED_LDIF = os.path.join(os.environ['SRCDIR_ABS'],
48                                 "testdata/unconnected-intrasite.ldif")
49
50 MULTISITE_LDIF_DSAS = (
51     ("CN=WIN08,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
52      "Site-4"),
53     ("CN=WIN07,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
54      "Site-4"),
55     ("CN=WIN06,CN=Servers,CN=Site-3,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
56      "Site-3"),
57     ("CN=WIN09,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
58      "Site-5"),
59     ("CN=WIN10,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
60      "Site-5"),
61     ("CN=WIN02,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
62      "Site-2"),
63     ("CN=WIN04,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
64      "Site-2"),
65     ("CN=WIN03,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
66      "Site-2"),
67     ("CN=WIN05,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
68      "Site-2"),
69     ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
70      "Default-First-Site-Name"),
71 )
72
73
74 class LdifImportExportTests(samba.tests.TestCaseInTempDir):
75     def setUp(self):
76         super(LdifImportExportTests, self).setUp()
77         self.lp = LoadParm()
78         self.creds = Credentials()
79         self.creds.guess(self.lp)
80
81     def remove_files(self, *files):
82         for f in files:
83             assert(f.startswith(self.tempdir))
84             os.unlink(f)
85
86     def test_write_search_url(self):
87         pass
88
89     def test_ldif_to_samdb(self):
90         dburl = os.path.join(self.tempdir, "ldap")
91         samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
92                                                  MULTISITE_LDIF)
93         self.assertIsInstance(samdb, SamDB)
94
95         dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
96                "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
97         res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
98                            scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
99
100         ntds_guid = misc.GUID(samdb.get_ntds_GUID())
101         self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
102
103         service_name_res = samdb.search(base="",
104                                         scope=ldb.SCOPE_BASE,
105                                         attrs=["dsServiceName"])
106         dn = ldb.Dn(samdb,
107                     service_name_res[0]["dsServiceName"][0].decode('utf8'))
108         self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
109         self.remove_files(dburl)
110
111     def test_ldif_to_samdb_forced_local_dsa(self):
112         for dsa, site in MULTISITE_LDIF_DSAS:
113             dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa"
114                                  "-%s" % dsa)
115             samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
116                                                      MULTISITE_LDIF,
117                                                      forced_local_dsa=dsa)
118             self.assertIsInstance(samdb, SamDB)
119             self.assertEqual(samdb.server_site_name(), site)
120
121             res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
122                                scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
123
124             ntds_guid = misc.GUID(samdb.get_ntds_GUID())
125             self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
126
127             service_name_res = samdb.search(base="",
128                                             scope=ldb.SCOPE_BASE,
129                                             attrs=["dsServiceName"])
130             dn = ldb.Dn(samdb,
131                         service_name_res[0]["dsServiceName"][0].decode('utf8'))
132             self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
133             self.remove_files(dburl)
134
135
136     def test_samdb_to_ldif_file(self):
137         dburl = os.path.join(self.tempdir, "ldap")
138         dburl2 = os.path.join(self.tempdir, "ldap_roundtrip")
139         ldif_file = os.path.join(self.tempdir, "ldif")
140         samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
141                                                  MULTISITE_LDIF)
142         self.assertIsInstance(samdb, SamDB)
143         ldif_import_export.samdb_to_ldif_file(samdb, dburl,
144                                               lp=self.lp, creds=None,
145                                               ldif_file=ldif_file)
146         self.assertGreater(os.path.getsize(ldif_file), 1000,
147                            "LDIF should be larger than 1000 bytes")
148         samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp,
149                                                  ldif_file)
150         self.assertIsInstance(samdb, SamDB)
151         dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
152                "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
153         res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
154                            scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
155         self.remove_files(dburl)
156         self.remove_files(dburl2)
157         self.remove_files(ldif_file)
158
159
160 class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir):
161     def setUp(self):
162         super(KCCMultisiteLdifTests, self).setUp()
163         self.lp = LoadParm()
164         self.creds = Credentials()
165         self.creds.guess(self.lp)
166
167     def remove_files(self, *files):
168         for f in files:
169             assert(f.startswith(self.tempdir))
170             os.unlink(f)
171
172     def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None):
173         # Note that setting read-only to False won't affect the ldif,
174         # only the temporary database that is created from it.
175         my_kcc = KCC(unix_now, readonly=readonly, verify=verify,
176                      dot_file_dir=dot_file_dir)
177         tmpdb = os.path.join(self.tempdir, 'tmpdb')
178         my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
179         self.remove_files(tmpdb)
180         return my_kcc
181
182     def test_list_dsas(self):
183         my_kcc = self._get_kcc('test-list')
184         dsas = set(my_kcc.list_dsas())
185         expected_dsas = set(x[0] for x in MULTISITE_LDIF_DSAS)
186         self.assertEqual(dsas, expected_dsas)
187
188     def test_verify(self):
189         """Check that the KCC generates graphs that pass its own verify
190         option.
191         """
192         my_kcc = self._get_kcc('test-verify', verify=True)
193         tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
194         my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
195
196         my_kcc.run(None,
197                    self.lp, self.creds,
198                    attempt_live_connections=False)
199         self.remove_files(tmpdb)
200
201     def test_unconnected_db(self):
202         """Check that the KCC generates errors on a unconnected db
203         """
204         my_kcc = self._get_kcc('test-verify', verify=True)
205         tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
206         my_kcc.import_ldif(tmpdb, self.lp, UNCONNECTED_LDIF)
207
208         try:
209             my_kcc.run(None,
210                        self.lp, self.creds,
211                        attempt_live_connections=False)
212         except samba.kcc.graph_utils.GraphError:
213             pass
214         except Exception:
215             self.fail("Did not expect this error.")
216         finally:
217             self.remove_files(tmpdb)
218
219     def test_dotfiles(self):
220         """Check that KCC writes dot_files when asked.
221         """
222         my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir)
223         tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb')
224         files = [tmpdb]
225         my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
226         my_kcc.run(None,
227                    self.lp, self.creds,
228                    attempt_live_connections=False)
229
230         dot = '/usr/bin/dot'
231         for fn in os.listdir(self.tempdir):
232             if fn.endswith('.dot'):
233                 ffn = os.path.join(self.tempdir, fn)
234                 if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0:
235                     r = subprocess.call([dot, '-Tcanon', ffn])
236                     self.assertEqual(r, 0)
237
238                 #even if dot is not there, at least check the file is non-empty
239                 size = os.stat(ffn).st_size
240                 self.assertNotEqual(size, 0)
241                 files.append(ffn)
242
243         self.remove_files(*files)