From d2536b31200106ad0bbea60abd3e654c23540e6d Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Thu, 25 Aug 2011 17:10:23 +1000 Subject: [PATCH] py-samba3: Use passdb/param wrapper for samba3 module Instead of parsing samba3 database files (password, group mapping, account policy, secrets), use passdb python wrapper. Similarly for parsing configuration, use samba3 param python wrapper. Other databases (idmap, registry, wins) are still parsed in python. Signed-off-by: Andrew Bartlett --- .../scripting/python/samba/samba3/__init__.py | 454 +----------------- .../scripting/python/samba/tests/samba3.py | 230 ++++----- 2 files changed, 117 insertions(+), 567 deletions(-) diff --git a/source4/scripting/python/samba/samba3/__init__.py b/source4/scripting/python/samba/samba3/__init__.py index 385d9331ec0..dd2f927aa4a 100644 --- a/source4/scripting/python/samba/samba3/__init__.py +++ b/source4/scripting/python/samba/samba3/__init__.py @@ -26,6 +26,9 @@ import os import struct import tdb +import passdb +import param as s3param + def fetch_uint32(tdb, key): try: @@ -125,74 +128,6 @@ class Registry(TdbDatabase): return ret -class PolicyDatabase(TdbDatabase): - """Samba 3 Account Policy database reader.""" - def __init__(self, file): - """Open a policy database - - :param file: Path to the file to open. - """ - super(PolicyDatabase, self).__init__(file) - self.min_password_length = fetch_uint32(self.tdb, "min password length\x00") - self.password_history = fetch_uint32(self.tdb, "password history\x00") - self.user_must_logon_to_change_password = fetch_uint32(self.tdb, "user must logon to change pasword\x00") - self.maximum_password_age = fetch_uint32(self.tdb, "maximum password age\x00") - self.minimum_password_age = fetch_uint32(self.tdb, "minimum password age\x00") - self.lockout_duration = fetch_uint32(self.tdb, "lockout duration\x00") - self.reset_count_minutes = fetch_uint32(self.tdb, "reset count minutes\x00") - self.bad_lockout_minutes = fetch_uint32(self.tdb, "bad lockout minutes\x00") - self.disconnect_time = fetch_int32(self.tdb, "disconnect time\x00") - self.refuse_machine_password_change = fetch_uint32(self.tdb, "refuse machine password change\x00") - - # FIXME: Read privileges as well - - -GROUPDB_DATABASE_VERSION_V1 = 1 # native byte format. -GROUPDB_DATABASE_VERSION_V2 = 2 # le format. - -GROUP_PREFIX = "UNIXGROUP/" - -# Alias memberships are stored reverse, as memberships. The performance -# critical operation is to determine the aliases a SID is member of, not -# listing alias members. So we store a list of alias SIDs a SID is member of -# hanging of the member as key. -MEMBEROF_PREFIX = "MEMBEROF/" - -class GroupMappingDatabase(TdbDatabase): - """Samba 3 group mapping database reader.""" - def _check_version(self): - assert fetch_int32(self.tdb, "INFO/version\x00") in (GROUPDB_DATABASE_VERSION_V1, GROUPDB_DATABASE_VERSION_V2) - - def groupsids(self): - """Retrieve the SIDs for the groups in this database. - - :return: List with sids as strings. - """ - for k in self.tdb.iterkeys(): - if k.startswith(GROUP_PREFIX): - yield k[len(GROUP_PREFIX):].rstrip("\0") - - def get_group(self, sid): - """Retrieve the group mapping information for a particular group. - - :param sid: SID of the group - :return: None if the group can not be found, otherwise - a tuple with gid, sid_name_use, the NT name and comment. - """ - data = self.tdb.get("%s%s\0" % (GROUP_PREFIX, sid)) - if data is None: - return data - (gid, sid_name_use) = struct.unpack(" 0: - (bad_password_time, data) = unpack_int32(data) - if bad_password_time != 0: - user.bad_password_time = bad_password_time - (pass_last_set_time, data) = unpack_int32(data) - (pass_can_change_time, data) = unpack_int32(data) - (pass_must_change_time, data) = unpack_int32(data) - - if logon_time != 0: - user.logon_time = logon_time - user.logoff_time = logoff_time - user.kickoff_time = kickoff_time - if pass_last_set_time != 0: - user.pass_last_set_time = pass_last_set_time - user.pass_can_change_time = pass_can_change_time - - (user.username, data) = unpack_string(data) - (user.domain, data) = unpack_string(data) - (user.nt_username, data) = unpack_string(data) - (user.fullname, data) = unpack_string(data) - (user.homedir, data) = unpack_string(data) - (user.dir_drive, data) = unpack_string(data) - (user.logon_script, data) = unpack_string(data) - (user.profile_path, data) = unpack_string(data) - (user.acct_desc, data) = unpack_string(data) - (user.workstations, data) = unpack_string(data) - (user.unknown_str, data) = unpack_string(data) - (user.munged_dial, data) = unpack_string(data) - - (user.user_rid, data) = unpack_int32(data) - (user.group_rid, data) = unpack_int32(data) - - (user.lm_password, data) = unpack_string(data) - (user.nt_password, data) = unpack_string(data) - - if self.version > 1: - (user.nt_password_history, data) = unpack_string(data) - - (user.acct_ctrl, data) = unpack_uint16(data) - (_, data) = unpack_uint32(data) # remove_me field - (user.logon_divs, data) = unpack_uint16(data) - (hours, data) = unpack_string(data) - user.hours = [] - for entry in hours: - for i in range(8): - user.hours.append(ord(entry) & (2 ** i) == (2 ** i)) - (user.bad_password_count, data) = unpack_uint16(data) - (user.logon_count, data) = unpack_uint16(data) - (user.unknown_6, data) = unpack_uint32(data) - assert len(data) == 0 - return user - - def shellsplit(text): """Very simple shell-like line splitting. @@ -675,139 +355,51 @@ class WinsDatabase(object): pass -class ParamFile(object): - """Simple smb.conf-compatible file parser - - Does not use a parameter table, unlike the "normal". - """ - - def __init__(self, sections=None): - self._sections = sections or {} - - def _sanitize_name(self, name): - return name.strip().lower().replace(" ","") - - def __repr__(self): - return "ParamFile(%r)" % self._sections - - def read(self, filename): - """Read a file. - - :param filename: Path to the file - """ - section = None - for i, l in enumerate(open(filename, 'r').xreadlines()): - l = l.strip() - if not l or l[0] == '#' or l[0] == ';': - continue - if l[0] == "[" and l[-1] == "]": - section = self._sanitize_name(l[1:-1]) - self._sections.setdefault(section, {}) - elif "=" in l: - (k, v) = l.split("=", 1) - self._sections[section][self._sanitize_name(k)] = v - else: - raise Exception("Unable to parser line %d: %r" % (i+1,l)) - - def get(self, param, section=None): - """Return the value of a parameter. - - :param param: Parameter name - :param section: Section name, defaults to "global" - :return: parameter value as string if found, None otherwise. - """ - if section is None: - section = "global" - section = self._sanitize_name(section) - if not section in self._sections: - return None - param = self._sanitize_name(param) - if not param in self._sections[section]: - return None - return self._sections[section][param].strip() - - def __getitem__(self, section): - return self._sections[section] - - def get_section(self, section): - return self._sections.get(section) - - def add_section(self, section): - self._sections[self._sanitize_name(section)] = {} - - def set_string(self, name, value): - self._sections["global"][name] = value - - def get_string(self, name): - return self._sections["global"].get(name) - - class Samba3(object): """Samba 3 configuration and state data reader.""" - def __init__(self, libdir, smbconfpath): + def __init__(self, smbconfpath, s3_lp_ctx=None): """Open the configuration and data for a Samba 3 installation. - :param libdir: Library directory :param smbconfpath: Path to the smb.conf file. + :param s3_lp_ctx: Samba3 Loadparm context """ self.smbconfpath = smbconfpath - self.libdir = libdir - self.lp = ParamFile() - self.lp.read(self.smbconfpath) - self.privatedir = self.lp.get("private dir") or libdir + if s3_lp_ctx: + self.lp = s3_lp_ctx + else: + self.lp = s3param.get_context() + self.lp.load(smbconfpath) - def libdir_path(self, path): + def statedir_path(self, path): if path[0] == "/" or path[0] == ".": return path - return os.path.join(self.libdir, path) + return os.path.join(self.lp.get("state directory"), path) def privatedir_path(self, path): if path[0] == "/" or path[0] == ".": return path - return os.path.join(self.privatedir, path) + return os.path.join(self.lp.get("private dir"), path) def get_conf(self): return self.lp def get_sam_db(self): - lp = self.get_conf() - backends = (lp.get("passdb backend") or "").split(" ") - if ":" in backends[0]: - (name, location) = backends[0].split(":", 2) - else: - name = backends[0] - location = None - if name == "smbpasswd": - return SmbpasswdFile(self.libdir_path(location or "smbpasswd")) - elif name == "tdbsam": - return TdbSam(self.libdir_path(location or "passdb.tdb")) - elif name == "ldapsam": - if location is not None: - return LdapSam("ldap:%s" % location) - return LdapSam(lp.get("ldap server")) - else: - raise NotImplementedError("unsupported passdb backend %s" % backends[0]) - - def get_policy_db(self): - return PolicyDatabase(self.libdir_path("account_policy.tdb")) + return passdb.PDB(self.lp.get('passdb backend')) def get_registry(self): - return Registry(self.libdir_path("registry.tdb")) + return Registry(self.statedir_path("registry.tdb")) def get_secrets_db(self): return SecretsDatabase(self.privatedir_path("secrets.tdb")) def get_shareinfo_db(self): - return ShareInfoDatabase(self.libdir_path("share_info.tdb")) + return ShareInfoDatabase(self.statedir_path("share_info.tdb")) def get_idmap_db(self): - return IdmapDatabase(self.libdir_path("winbindd_idmap.tdb")) + return IdmapDatabase(self.statedir_path("winbindd_idmap.tdb")) def get_wins_db(self): - return WinsDatabase(self.libdir_path("wins.dat")) + return WinsDatabase(self.statedir_path("wins.dat")) def get_shares(self): return Shares(self.get_conf(), self.get_shareinfo_db()) - - def get_groupmapping_db(self): - return GroupMappingDatabase(self.libdir_path("group_mapping.tdb")) diff --git a/source4/scripting/python/samba/tests/samba3.py b/source4/scripting/python/samba/tests/samba3.py index 3a4b851c75e..03f19021427 100644 --- a/source4/scripting/python/samba/tests/samba3.py +++ b/source4/scripting/python/samba/tests/samba3.py @@ -19,11 +19,12 @@ """Tests for samba.samba3.""" -from samba.samba3 import (GroupMappingDatabase, Registry, PolicyDatabase, - SecretsDatabase, TdbSam) -from samba.samba3 import (WinsDatabase, SmbpasswdFile, ACB_NORMAL, - IdmapDatabase, SAMUser, ParamFile) +from samba.samba3 import (Registry, SecretsDatabase) +from samba.samba3 import (WinsDatabase, IdmapDatabase) +from samba.samba3 import passdb +from samba.samba3 import param as s3param from samba.tests import TestCase +from samba.dcerpc.security import dom_sid import os for p in [ "../../../../../testdata/samba3", "../../../../testdata/samba3" ]: @@ -57,108 +58,111 @@ class RegistryTestCase(TestCase): self.registry.values("HKLM/SYSTEM/CURRENTCONTROLSET/SERVICES/EVENTLOG")) -class PolicyTestCase(TestCase): +class PassdbTestCase(TestCase): def setUp(self): - super(PolicyTestCase, self).setUp() - self.policy = PolicyDatabase(os.path.join(DATADIR, "account_policy.tdb")) + super (PassdbTestCase, self).setUp() + self.lp = s3param.get_context() + self.lp.load(os.path.join(DATADIR, "smb.conf")) + self.lp.set("private dir", DATADIR) + self.lp.set("state directory", DATADIR) + passdb.set_secrets_dir(DATADIR) + self.pdb = passdb.PDB("tdbsam") - def test_policy(self): - self.assertEquals(self.policy.min_password_length, 5) - self.assertEquals(self.policy.minimum_password_age, 0) - self.assertEquals(self.policy.maximum_password_age, 999999999) - self.assertEquals(self.policy.refuse_machine_password_change, 0) - self.assertEquals(self.policy.reset_count_minutes, 0) - self.assertEquals(self.policy.disconnect_time, -1) - self.assertEquals(self.policy.user_must_logon_to_change_password, None) - self.assertEquals(self.policy.password_history, 0) - self.assertEquals(self.policy.lockout_duration, 0) - self.assertEquals(self.policy.bad_lockout_minutes, None) + def tearDown(self): + self.lp = [] + self.pdb = [] + super(PassdbTestCase, self).tearDown() + def test_param(self): + self.assertEquals("BEDWYR", self.lp.get("netbios name")) + self.assertEquals("SAMBA", self.lp.get("workgroup")) + self.assertEquals("USER", self.lp.get("security")) -class GroupsTestCase(TestCase): + def test_policy(self): + policy = self.pdb.get_account_policy() + self.assertEquals(0, policy['bad lockout attempt']) + self.assertEquals(4294967295, policy['disconnect time']) + self.assertEquals(0, policy['lockout duration']) + self.assertEquals(999999999, policy['maximum password age']) + self.assertEquals(0, policy['minimum password age']) + self.assertEquals(5, policy['min password length']) + self.assertEquals(0, policy['password history']) + self.assertEquals(0, policy['refuse machine password change']) + self.assertEquals(0, policy['reset count minutes']) + self.assertEquals(0, policy['user must logon to change password']) - def setUp(self): - super(GroupsTestCase, self).setUp() - self.groupdb = GroupMappingDatabase(os.path.join(DATADIR, "group_mapping.tdb")) + def test_get_sid(self): + domain_sid = passdb.get_global_sam_sid() + self.assertEquals(dom_sid("S-1-5-21-2470180966-3899876309-2637894779"), domain_sid) - def tearDown(self): - self.groupdb.close() - super(GroupsTestCase, self).tearDown() + def test_usernames(self): + userlist = self.pdb.search_users(0) + self.assertEquals(3, len(userlist)) + + def test_getuser(self): + user = self.pdb.getsampwnam("root") + + self.assertEquals(16, user.acct_ctrl) + self.assertEquals("", user.acct_desc) + self.assertEquals(0, user.bad_password_count) + self.assertEquals(0, user.bad_password_time) + self.assertEquals(0, user.code_page) + self.assertEquals(0, user.country_code) + self.assertEquals("", user.dir_drive) + self.assertEquals("BEDWYR", user.domain) + self.assertEquals("root", user.full_name) + self.assertEquals(dom_sid('S-1-5-21-2470180966-3899876309-2637894779-513'), user.group_sid) + self.assertEquals("\\\\BEDWYR\\root", user.home_dir) + self.assertEquals([-1 for i in range(21)], user.hours) + self.assertEquals(21, user.hours_len) + self.assertEquals(9223372036854775807, user.kickoff_time) + self.assertEquals(None, user.lanman_passwd) + self.assertEquals(9223372036854775807, user.logoff_time) + self.assertEquals(0, user.logon_count) + self.assertEquals(168, user.logon_divs) + self.assertEquals("", user.logon_script) + self.assertEquals(0, user.logon_time) + self.assertEquals("", user.munged_dial) + self.assertEquals('\x87\x8d\x80\x14`l\xda)gzD\xef\xa15?\xc7', user.nt_passwd) + self.assertEquals("", user.nt_username) + self.assertEquals(1125418267, user.pass_can_change_time) + self.assertEquals(1125418267, user.pass_last_set_time) + self.assertEquals(2125418266, user.pass_must_change_time) + self.assertEquals(None, user.plaintext_passwd) + self.assertEquals("\\\\BEDWYR\\root\\profile", user.profile_path) + self.assertEquals(None, user.pw_history) + self.assertEquals(dom_sid("S-1-5-21-2470180966-3899876309-2637894779-1000"), user.user_sid) + self.assertEquals("root", user.username) + self.assertEquals("", user.workstations) def test_group_length(self): - self.assertEquals(13, len(list(self.groupdb.groupsids()))) + grouplist = self.pdb.enum_group_mapping() + self.assertEquals(13, len(grouplist)) def test_get_group(self): - self.assertEquals((-1, 5L, 'Administrators', ''), self.groupdb.get_group("S-1-5-32-544")) + group = self.pdb.getgrsid(dom_sid("S-1-5-32-544")) + self.assertEquals("Administrators", group.nt_name) + self.assertEquals(4294967295, group.gid) + self.assertEquals(5, group.sid_name_use) def test_groupsids(self): - sids = list(self.groupdb.groupsids()) + grouplist = self.pdb.enum_group_mapping() + sids = [] + for g in grouplist: + sids.append(str(g.sid)) self.assertTrue("S-1-5-32-544" in sids) + self.assertTrue("S-1-5-32-545" in sids) + self.assertTrue("S-1-5-32-546" in sids) + self.assertTrue("S-1-5-32-548" in sids) + self.assertTrue("S-1-5-32-549" in sids) + self.assertTrue("S-1-5-32-550" in sids) + self.assertTrue("S-1-5-32-551" in sids) def test_alias_length(self): - self.assertEquals(0, len(list(self.groupdb.aliases()))) - - -class SecretsDbTestCase(TestCase): - - def setUp(self): - super(SecretsDbTestCase, self).setUp() - self.secretsdb = SecretsDatabase(os.path.join(DATADIR, "secrets.tdb")) - - def tearDown(self): - self.secretsdb.close() - super(SecretsDbTestCase, self).tearDown() - - def test_get_sid(self): - self.assertTrue(self.secretsdb.get_sid("BEDWYR") is not None) - - -class TdbSamTestCase(TestCase): - - def setUp(self): - super(TdbSamTestCase, self).setUp() - self.samdb = TdbSam(os.path.join(DATADIR, "passdb.tdb")) - - def tearDown(self): - self.samdb.close() - super(TdbSamTestCase, self).tearDown() - - def test_usernames(self): - self.assertEquals(3, len(list(self.samdb.usernames()))) - - def test_getuser(self): - user = SAMUser("root") - user.logoff_time = 2147483647 - user.kickoff_time = 2147483647 - user.pass_can_change_time = 1125418267 - user.username = "root" - user.uid = None - user.lm_password = 'U)\x02\x03\x1b\xed\xe9\xef\xaa\xd3\xb45\xb5\x14\x04\xee' - user.nt_password = '\x87\x8d\x80\x14`l\xda)gzD\xef\xa15?\xc7' - user.acct_ctrl = 16 - user.pass_last_set_time = 1125418267 - user.fullname = "root" - user.nt_username = "" - user.logoff_time = 2147483647 - user.acct_desc = "" - user.group_rid = 1001 - user.logon_count = 0 - user.bad_password_count = 0 - user.domain = "BEDWYR" - user.munged_dial = "" - user.workstations = "" - user.user_rid = 1000 - user.kickoff_time = 2147483647 - user.logoff_time = 2147483647 - user.unknown_6 = 1260L - user.logon_divs = 0 - user.hours = [True for i in range(168)] - other = self.samdb["root"] - for name in other.__dict__: - if other.__dict__[name] != user.__dict__[name]: - print "%s: %r != %r" % (name, other.__dict__[name], user.__dict__[name]) - self.assertEquals(user, other) + aliaslist = self.pdb.search_aliases() + self.assertEquals(1, len(aliaslist)) + self.assertEquals("Jelmers NT Group", aliaslist[0]['account_name']) class WinsDatabaseTestCase(TestCase): @@ -178,29 +182,6 @@ class WinsDatabaseTestCase(TestCase): super(WinsDatabaseTestCase, self).tearDown() -class SmbpasswdTestCase(TestCase): - - def setUp(self): - super(SmbpasswdTestCase, self).setUp() - self.samdb = SmbpasswdFile(os.path.join(DATADIR, "smbpasswd")) - - def test_length(self): - self.assertEquals(3, len(self.samdb)) - - def test_get_user(self): - user = SAMUser("rootpw") - user.lm_password = "552902031BEDE9EFAAD3B435B51404EE" - user.nt_password = "878D8014606CDA29677A44EFA1353FC7" - user.acct_ctrl = ACB_NORMAL - user.pass_last_set_time = int(1125418267) - user.uid = 0 - self.assertEquals(user, self.samdb["rootpw"]) - - def tearDown(self): - self.samdb.close() - super(SmbpasswdTestCase, self).tearDown() - - class IdmapDbTestCase(TestCase): def setUp(self): @@ -229,26 +210,3 @@ class IdmapDbTestCase(TestCase): def tearDown(self): self.idmapdb.close() super(IdmapDbTestCase, self).tearDown() - - -class ParamTestCase(TestCase): - - def test_init(self): - file = ParamFile() - self.assertTrue(file is not None) - - def test_add_section(self): - file = ParamFile() - file.add_section("global") - self.assertTrue(file["global"] is not None) - - def test_set_param_string(self): - file = ParamFile() - file.add_section("global") - file.set_string("data", "bar") - self.assertEquals("bar", file.get_string("data")) - - def test_get_section(self): - file = ParamFile() - self.assertEquals(None, file.get_section("unknown")) - self.assertRaises(KeyError, lambda: file["unknown"]) -- 2.34.1