# -*- coding: utf-8 -*-
# This is unit with tests for LDAP access checks
-import getopt
import optparse
import sys
import os
import samba.getopt as options
-from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
-from ldb import ERR_NO_SUCH_OBJECT, ERR_INVALID_DN_SYNTAX, ERR_UNWILLING_TO_PERFORM
-from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS
+from ldb import (
+ SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT, ERR_INSUFFICIENT_ACCESS_RIGHTS)
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc import security
import os
sys.path.append("bin/python")
-sys.path.append("../lib/subunit/python")
-sys.path.append("../lib/testtools")
+import samba
+samba.ensure_external_module("subunit", "subunit")
+samba.ensure_external_module("testtools", "testtools")
import samba.getopt as options
return
creds = Credentials()
creds.guess(lp)
- try:
- creds.set_machine_account(lp)
- except:
- print "Failed to set machine account"
- raise
-
+ creds.set_machine_account(lp)
(tmp_fd, ccachename) = tempfile.mkstemp()
creds.get_named_ccache(lp, ccachename)
vars = {}
samdb = SamDB(url=lp.get("sam database"), session_info=system_session(),
- lp=lp)
+ lp=lp)
vars['DNSDOMAIN'] = lp.get('realm').lower()
vars['HOSTNAME'] = lp.get('netbios name').lower() + "." + vars['DNSDOMAIN']
try:
dump_denied_change(dn,att,messageEltFlagToString(msgElt.flags()),current[0][att],reference[0][att])
except:
+ # FIXME: Should catch an explicit exception here
dump_denied_change(dn,att,messageEltFlagToString(msgElt.flags()),current[0][att],None)
delta.remove(att)
delta.dn = dn
creds = credopts.get_credentials(lp)
creds.set_kerberos_state(DONT_USE_KERBEROS)
try:
- ldb = Ldb(path, session_info=system_session(), credentials=creds,lp=lp)
+ ldb = Ldb(path, session_info=system_session(), credentials=creds,
+ lp=lp)
except:
# XXX: Should catch a particular exception type
raise CommandError("Unable to read domain SID from configuration files")
def getntacl(lp, file, backend=None, eadbfile=None):
checkset_backend(lp, backend, eadbfile)
eadbname = lp.get("posix:eadb")
- if eadbname != None and eadbname != "" :
+ if eadbname is not None and eadbname != "":
try:
- attribute = samba.xattr_tdb.wrap_getxattr(eadbname,file,xattr.XATTR_NTACL_NAME)
+ attribute = samba.xattr_tdb.wrap_getxattr(eadbname, file,
+ xattr.XATTR_NTACL_NAME)
except:
+ # FIXME: Don't catch all exceptions, just those related to opening
+ # xattrdb
print "Fail to open %s" % eadbname
- attribute = samba.xattr_native.wrap_getxattr(file,xattr.XATTR_NTACL_NAME)
+ attribute = samba.xattr_native.wrap_getxattr(file,
+ xattr.XATTR_NTACL_NAME)
else:
- attribute = samba.xattr_native.wrap_getxattr(file,xattr.XATTR_NTACL_NAME)
+ attribute = samba.xattr_native.wrap_getxattr(file,
+ xattr.XATTR_NTACL_NAME)
ntacl = ndr_unpack(xattr.NTACL,attribute)
return ntacl
def setntacl(lp, file, sddl, domsid, backend=None, eadbfile=None):
- checkset_backend(lp,backend,eadbfile)
+ checkset_backend(lp, backend, eadbfile)
ntacl=xattr.NTACL()
ntacl.version = 1
sid=security.dom_sid(domsid)
sd = security.descriptor.from_sddl(sddl, sid)
ntacl.info = sd
eadbname = lp.get("posix:eadb")
- if eadbname != None and eadbname != "":
+ if eadbname is not None and eadbname != "":
try:
- samba.xattr_tdb.wrap_setxattr(eadbname,file,xattr.XATTR_NTACL_NAME,ndr_pack(ntacl))
+ samba.xattr_tdb.wrap_setxattr(eadbname,
+ file,xattr.XATTR_NTACL_NAME,ndr_pack(ntacl))
except:
+ # FIXME: Don't catch all exceptions, just those related to opening
+ # xattrdb
print "Fail to open %s"%eadbname
samba.xattr_native.wrap_setxattr(file,xattr.XATTR_NTACL_NAME,ndr_pack(ntacl))
else:
except:
ldb.transaction_cancel()
raise
- ldb.transaction_commit()
+ else:
+ ldb.transaction_commit()
def provision_paths_from_lp(lp, dnsdomain):
message("Setting up sam.ldb rootDSE")
setup_samdb_rootdse(samdb, setup_path, names)
-
except:
samdb.transaction_cancel()
raise
-
- samdb.transaction_commit()
+ else:
+ samdb.transaction_commit()
def secretsdb_self_join(secretsdb, domain,
except:
self.transaction_cancel()
raise
- self.transaction_commit()
+ else:
+ self.transaction_commit()
def setpassword(self, filter, password, force_change_at_next_login=False):
"""Sets the password for a user
except:
self.transaction_cancel()
raise
- self.transaction_commit()
+ else:
+ self.transaction_commit()
def setexpiry(self, filter, expiry_seconds, no_expiry_req=False):
"""Sets the account expiry for a user
except:
self.transaction_cancel()
raise
- self.transaction_commit()
+ else:
+ self.transaction_commit()
def set_domain_sid(self, sid):
"""Change the domain SID used by this LDB.
"O:S-1-5-32G:S-1-5-32", "S-1-5-32", "native")
eadb = False
except:
- if lp.get("posix:eadb") == None:
+ # XXX: Should catch a specific exception here
+ if lp.get("posix:eadb") is None:
message("Notice: you are not root or your system do not support xattr, tdb backend for attributes has been selected")
message(" if you intend to use this provision in production you'd better rerun the script as root on a system supporting xattr")
file.close()