PEP8: fix E305: expected 2 blank lines after class or function definition, found 1
[kai/samba-autobuild/.git] / python / samba / tests / upgradeprovisionneeddc.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2008
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for samba.upgradeprovision that need a DC."""
19
20 import os
21 import re
22 import shutil
23
24 from samba import param
25 from samba.credentials import Credentials
26 from samba.auth import system_session
27 from samba.provision import getpolicypath, find_provision_key_parameters
28 from samba.upgradehelpers import (get_paths, get_ldbs,
29                                   identic_rename,
30                                   updateOEMInfo, getOEMInfo, update_gpo,
31                                   delta_update_basesamdb,
32                                   update_dns_account_password,
33                                   search_constructed_attrs_stored,
34                                   increment_calculated_keyversion_number)
35 from samba.tests import env_loadparm, TestCaseInTempDir
36 from samba.tests.provision import create_dummy_secretsdb
37 import ldb
38
39
40 def dummymessage(a=None, b=None):
41     pass
42
43
44 smb_conf_path = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "ad_dc_ntvfs", "etc/smb.conf")
45
46
47 class UpgradeProvisionBasicLdbHelpersTestCase(TestCaseInTempDir):
48     """Some simple tests for individual functions in the provisioning code.
49     """
50
51     def test_get_ldbs(self):
52         paths = get_paths(param, None, smb_conf_path)
53         creds = Credentials()
54         lp = env_loadparm()
55         creds.guess(lp)
56         get_ldbs(paths, creds, system_session(), lp)
57
58     def test_find_key_param(self):
59         paths = get_paths(param, None, smb_conf_path)
60         creds = Credentials()
61         lp = env_loadparm()
62         creds.guess(lp)
63         rootdn = "dc=samba,dc=example,dc=com"
64         ldbs = get_ldbs(paths, creds, system_session(), lp)
65         names = find_provision_key_parameters(ldbs.sam, ldbs.secrets, ldbs.idmap,
66                                               paths, smb_conf_path, lp)
67         self.assertEquals(names.realm, "SAMBA.EXAMPLE.COM")
68         self.assertEquals(str(names.rootdn).lower(), rootdn.lower())
69         self.assertNotEquals(names.policyid_dc, None)
70         self.assertNotEquals(names.ntdsguid, "")
71
72
73 class UpgradeProvisionWithLdbTestCase(TestCaseInTempDir):
74
75     def _getEmptyDbName(self):
76         return os.path.join(self.tempdir, "sam.ldb")
77
78     def setUp(self):
79         super(UpgradeProvisionWithLdbTestCase, self).setUp()
80         paths = get_paths(param, None, smb_conf_path)
81         self.creds = Credentials()
82         self.lp = env_loadparm()
83         self.creds.guess(self.lp)
84         self.paths = paths
85         self.ldbs = get_ldbs(paths, self.creds, system_session(), self.lp)
86         self.names = find_provision_key_parameters(self.ldbs.sam,
87                                                    self.ldbs.secrets, self.ldbs.idmap, paths, smb_conf_path,
88                                                    self.lp)
89         self.referencedb = create_dummy_secretsdb(
90             os.path.join(self.tempdir, "ref.ldb"))
91
92     def test_search_constructed_attrs_stored(self):
93         hashAtt = search_constructed_attrs_stored(self.ldbs.sam,
94                                                   self.names.rootdn,
95                                                   ["msds-KeyVersionNumber"])
96         self.assertFalse("msds-KeyVersionNumber" in hashAtt)
97
98     def test_increment_calculated_keyversion_number(self):
99         dn = "CN=Administrator,CN=Users,%s" % self.names.rootdn
100         # We conctruct a simple hash for the user administrator
101         hash = {}
102         # And we want the version to be 140
103         hash[dn.lower()] = 140
104
105         increment_calculated_keyversion_number(self.ldbs.sam,
106                                                self.names.rootdn,
107                                                hash)
108         self.assertEqual(self.ldbs.sam.get_attribute_replmetadata_version(dn,
109                                                                           "unicodePwd"),
110                             140)
111         # This function should not decrement the version
112         hash[dn.lower()] = 130
113
114         increment_calculated_keyversion_number(self.ldbs.sam,
115                                                self.names.rootdn,
116                                                hash)
117         self.assertEqual(self.ldbs.sam.get_attribute_replmetadata_version(dn,
118                                                                           "unicodePwd"),
119                             140)
120
121     def test_identic_rename(self):
122         rootdn = "DC=samba,DC=example,DC=com"
123
124         guestDN = ldb.Dn(self.ldbs.sam, "CN=Guest,CN=Users,%s" % rootdn)
125         identic_rename(self.ldbs.sam, guestDN)
126         res = self.ldbs.sam.search(expression="(name=Guest)", base=rootdn,
127                                    scope=ldb.SCOPE_SUBTREE, attrs=["dn"])
128         self.assertEquals(len(res), 1)
129         self.assertEquals(str(res[0]["dn"]), "CN=Guest,CN=Users,%s" % rootdn)
130
131     def test_delta_update_basesamdb(self):
132         dummysampath = self._getEmptyDbName()
133         delta_update_basesamdb(self.paths.samdb, dummysampath,
134                                self.creds, system_session(), self.lp,
135                                dummymessage)
136
137     def test_update_gpo_simple(self):
138         dir = getpolicypath(self.paths.sysvol, self.names.dnsdomain,
139                             self.names.policyid)
140         shutil.rmtree(dir)
141         self.assertFalse(os.path.isdir(dir))
142         update_gpo(self.paths, self.ldbs.sam, self.names, self.lp, dummymessage)
143         self.assertTrue(os.path.isdir(dir))
144
145     def test_update_gpo_acl(self):
146         path = os.path.join(self.tempdir, "testupdategpo")
147         save = self.paths.sysvol
148         self.paths.sysvol = path
149         os.mkdir(path)
150         os.mkdir(os.path.join(path, self.names.dnsdomain))
151         os.mkdir(os.path.join(os.path.join(path, self.names.dnsdomain),
152                               "Policies"))
153         update_gpo(self.paths, self.ldbs.sam, self.names, self.lp, dummymessage)
154         shutil.rmtree(path)
155         self.paths.sysvol = save
156
157     def test_getOEMInfo(self):
158         realm = self.lp.get("realm")
159         basedn = "DC=%s" % realm.replace(".", ", DC=")
160         oem = getOEMInfo(self.ldbs.sam, basedn)
161         self.assertNotEquals(oem, "")
162
163     def test_update_dns_account(self):
164         update_dns_account_password(self.ldbs.sam, self.ldbs.secrets,
165                                     self.names)
166
167     def test_updateOEMInfo(self):
168         realm = self.lp.get("realm")
169         basedn = "DC=%s" % realm.replace(".", ", DC=")
170         oem = getOEMInfo(self.ldbs.sam, basedn)
171         updateOEMInfo(self.ldbs.sam, basedn)
172         oem2 = getOEMInfo(self.ldbs.sam, basedn)
173         self.assertNotEquals(str(oem), str(oem2))
174         self.assertTrue(re.match(".*upgrade to.*", str(oem2)))
175
176     def tearDown(self):
177         for name in ["ref.ldb", "secrets.ldb", "secrets.tdb", "secrets.tdb.bak", "secrets.ntdb", "sam.ldb"]:
178             path = os.path.join(self.tempdir, name)
179             if os.path.exists(path):
180                 os.unlink(path)
181         super(UpgradeProvisionWithLdbTestCase, self).tearDown()