s4-dsdb-test: Implement samdb_connect_env() to rely solely on environment
[samba.git] / python / samba / tests / __init__.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Samba Python tests."""
19
20 import os
21 import ldb
22 import samba
23 import samba.auth
24 from samba import param
25 from samba.samdb import SamDB
26 from samba import credentials
27 import subprocess
28 import tempfile
29
30 samba.ensure_external_module("mimeparse", "mimeparse")
31 samba.ensure_external_module("extras", "extras")
32 samba.ensure_external_module("testtools", "testtools")
33
34 # Other modules import these two classes from here, for convenience:
35 from testtools.testcase import (
36     TestCase as TesttoolsTestCase,
37     TestSkipped,
38     )
39
40
41 class TestCase(TesttoolsTestCase):
42     """A Samba test case."""
43
44     def setUp(self):
45         super(TestCase, self).setUp()
46         test_debug_level = os.getenv("TEST_DEBUG_LEVEL")
47         if test_debug_level is not None:
48             test_debug_level = int(test_debug_level)
49             self._old_debug_level = samba.get_debug_level()
50             samba.set_debug_level(test_debug_level)
51             self.addCleanup(samba.set_debug_level, test_debug_level)
52
53     def get_loadparm(self):
54         return env_loadparm()
55
56     def get_credentials(self):
57         return cmdline_credentials
58
59
60 class LdbTestCase(TesttoolsTestCase):
61     """Trivial test case for running tests against a LDB."""
62
63     def setUp(self):
64         super(LdbTestCase, self).setUp()
65         self.filename = os.tempnam()
66         self.ldb = samba.Ldb(self.filename)
67
68     def set_modules(self, modules=[]):
69         """Change the modules for this Ldb."""
70         m = ldb.Message()
71         m.dn = ldb.Dn(self.ldb, "@MODULES")
72         m["@LIST"] = ",".join(modules)
73         self.ldb.add(m)
74         self.ldb = samba.Ldb(self.filename)
75
76
77 class TestCaseInTempDir(TestCase):
78
79     def setUp(self):
80         super(TestCaseInTempDir, self).setUp()
81         self.tempdir = tempfile.mkdtemp()
82         self.addCleanup(self._remove_tempdir)
83
84     def _remove_tempdir(self):
85         self.assertEquals([], os.listdir(self.tempdir))
86         os.rmdir(self.tempdir)
87         self.tempdir = None
88
89
90 def env_loadparm():
91     lp = param.LoadParm()
92     try:
93         lp.load(os.environ["SMB_CONF_PATH"])
94     except KeyError:
95         raise KeyError("SMB_CONF_PATH not set")
96     return lp
97
98
99 def env_get_var_value(var_name):
100     """Returns value for variable in os.environ
101
102     Function throws AssertionError if variable is defined.
103     Unit-test based python tests require certain input params
104     to be set in environment, otherwise they can't be run
105     """
106     assert var_name in os.environ.keys(), "Please supply %s in environment" % var_name
107     return os.environ[var_name]
108
109
110 cmdline_credentials = None
111
112 class RpcInterfaceTestCase(TestCase):
113     """DCE/RPC Test case."""
114
115
116 class ValidNetbiosNameTests(TestCase):
117
118     def test_valid(self):
119         self.assertTrue(samba.valid_netbios_name("FOO"))
120
121     def test_too_long(self):
122         self.assertFalse(samba.valid_netbios_name("FOO"*10))
123
124     def test_invalid_characters(self):
125         self.assertFalse(samba.valid_netbios_name("*BLA"))
126
127
128 class BlackboxProcessError(Exception):
129     """This is raised when check_output() process returns a non-zero exit status
130
131     Exception instance should contain the exact exit code (S.returncode),
132     command line (S.cmd), process output (S.stdout) and process error stream
133     (S.stderr)
134     """
135
136     def __init__(self, returncode, cmd, stdout, stderr):
137         self.returncode = returncode
138         self.cmd = cmd
139         self.stdout = stdout
140         self.stderr = stderr
141
142     def __str__(self):
143         return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self.cmd, self.returncode,
144                                                                              self.stdout, self.stderr)
145
146 class BlackboxTestCase(TestCase):
147     """Base test case for blackbox tests."""
148
149     def _make_cmdline(self, line):
150         bindir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../bin"))
151         parts = line.split(" ")
152         if os.path.exists(os.path.join(bindir, parts[0])):
153             parts[0] = os.path.join(bindir, parts[0])
154         line = " ".join(parts)
155         return line
156
157     def check_run(self, line):
158         line = self._make_cmdline(line)
159         p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
160         retcode = p.wait()
161         if retcode:
162             raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
163
164     def check_output(self, line):
165         line = self._make_cmdline(line)
166         p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
167         retcode = p.wait()
168         if retcode:
169             raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
170         return p.stdout.read()
171
172
173 def connect_samdb(samdb_url, lp=None, session_info=None, credentials=None,
174                   flags=0, ldb_options=None, ldap_only=False, global_schema=True):
175     """Create SamDB instance and connects to samdb_url database.
176
177     :param samdb_url: Url for database to connect to.
178     :param lp: Optional loadparm object
179     :param session_info: Optional session information
180     :param credentials: Optional credentials, defaults to anonymous.
181     :param flags: Optional LDB flags
182     :param ldap_only: If set, only remote LDAP connection will be created.
183     :param global_schema: Whether to use global schema.
184
185     Added value for tests is that we have a shorthand function
186     to make proper URL for ldb.connect() while using default
187     parameters for connection based on test environment
188     """
189     if not "://" in samdb_url:
190         if not ldap_only and os.path.isfile(samdb_url):
191             samdb_url = "tdb://%s" % samdb_url
192         else:
193             samdb_url = "ldap://%s" % samdb_url
194     # use 'paged_search' module when connecting remotely
195     if samdb_url.startswith("ldap://"):
196         ldb_options = ["modules:paged_searches"]
197     elif ldap_only:
198         raise AssertionError("Trying to connect to %s while remote "
199                              "connection is required" % samdb_url)
200
201     # set defaults for test environment
202     if lp is None:
203         lp = env_loadparm()
204     if session_info is None:
205         session_info = samba.auth.system_session(lp)
206     if credentials is None:
207         credentials = cmdline_credentials
208
209     return SamDB(url=samdb_url,
210                  lp=lp,
211                  session_info=session_info,
212                  credentials=credentials,
213                  flags=flags,
214                  options=ldb_options,
215                  global_schema=global_schema)
216
217
218 def connect_samdb_ex(samdb_url, lp=None, session_info=None, credentials=None,
219                      flags=0, ldb_options=None, ldap_only=False):
220     """Connects to samdb_url database
221
222     :param samdb_url: Url for database to connect to.
223     :param lp: Optional loadparm object
224     :param session_info: Optional session information
225     :param credentials: Optional credentials, defaults to anonymous.
226     :param flags: Optional LDB flags
227     :param ldap_only: If set, only remote LDAP connection will be created.
228     :return: (sam_db_connection, rootDse_record) tuple
229     """
230     sam_db = connect_samdb(samdb_url, lp, session_info, credentials,
231                            flags, ldb_options, ldap_only)
232     # fetch RootDse
233     res = sam_db.search(base="", expression="", scope=ldb.SCOPE_BASE,
234                         attrs=["*"])
235     return (sam_db, res[0])
236
237
238 def connect_samdb_env(env_url, env_username, env_password, lp=None):
239     """Connect to SamDB by getting URL and Credentials from environment
240
241     :param env_url: Environment variable name to get lsb url from
242     :param env_username: Username environment variable
243     :param env_password: Password environment variable
244     :return: sam_db_connection
245     """
246     samdb_url = env_get_var_value(env_url)
247     creds = credentials.Credentials()
248     if lp is None:
249         # guess Credentials parameters here. Otherwise workstation
250         # and domain fields are NULL and gencache code segfalts
251         lp = param.LoadParm()
252         creds.guess(lp)
253     creds.set_username(env_get_var_value(env_username))
254     creds.set_password(env_get_var_value(env_password))
255     return connect_samdb(samdb_url, credentials=creds, lp=lp)
256
257
258 def delete_force(samdb, dn):
259     try:
260         samdb.delete(dn)
261     except ldb.LdbError, (num, errstr):
262         assert num == ldb.ERR_NO_SUCH_OBJECT, "ldb.delete() failed: %s" % errstr