#
import samba.getopt as options
+from samba import sd_utils
from samba.dcerpc import security
from samba.samdb import SamDB
from samba.ndr import ndr_unpack, ndr_pack
Option,
)
-
-
-class cmd_dsacl_set(Command):
- """Modify access list on a directory object."""
+class cmd_dsacl_base(Command):
+ """Base class for DSACL commands."""
synopsis = "%prog [options]"
- car_help = """ The access control right to allow or deny """
takes_optiongroups = {
"sambaopts": options.SambaOptions,
"versionopts": options.VersionOptions,
}
+ def print_acl(self, sd_helper, object_dn, prefix=''):
+ desc_sddl = sd_helper.get_sd_as_sddl(object_dn)
+ self.outf.write("%sdescriptor for %s:\n" % (prefix, object_dn))
+ self.outf.write(desc_sddl + "\n")
+
+
+class cmd_dsacl_set(cmd_dsacl_base):
+ """Modify access list on a directory object."""
+
+ car_help = """ The access control right to allow or deny """
+
takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server",
type=str, metavar="URL", dest="H"),
res = samdb.search(base=trusteedn, expression="(objectClass=*)",
scope=SCOPE_BASE)
assert(len(res) == 1)
- return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
-
- def modify_descriptor(self, samdb, object_dn, desc, controls=None):
- assert(isinstance(desc, security.descriptor))
- m = ldb.Message()
- m.dn = ldb.Dn(samdb, object_dn)
- m["nTSecurityDescriptor"]= ldb.MessageElement(
- (ndr_pack(desc)), ldb.FLAG_MOD_REPLACE,
- "nTSecurityDescriptor")
- samdb.modify(m)
-
- def read_descriptor(self, samdb, object_dn):
- res = samdb.search(base=object_dn, scope=SCOPE_BASE,
- attrs=["nTSecurityDescriptor"])
- # we should theoretically always have an SD
- assert(len(res) == 1)
- desc = res[0]["nTSecurityDescriptor"][0]
- return ndr_unpack(security.descriptor, desc)
+ return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
- def get_domain_sid(self, samdb):
- res = samdb.search(base=samdb.domain_dn(),
- expression="(objectClass=*)", scope=SCOPE_BASE)
- return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
-
- def add_ace(self, samdb, object_dn, new_ace):
+ def add_ace(self, sd_helper, object_dn, new_ace):
"""Add new ace explicitly."""
- desc = self.read_descriptor(samdb, object_dn)
- desc_sddl = desc.as_sddl(self.get_domain_sid(samdb))
- #TODO add bindings for descriptor manipulation and get rid of this
- desc_aces = re.findall("\(.*?\)", desc_sddl)
- for ace in desc_aces:
- if ("ID" in ace):
- desc_sddl = desc_sddl.replace(ace, "")
- if new_ace in desc_sddl:
- return
- if desc_sddl.find("(") >= 0:
- desc_sddl = desc_sddl[:desc_sddl.index("(")] + new_ace + desc_sddl[desc_sddl.index("("):]
- else:
- desc_sddl = desc_sddl + new_ace
- desc = security.descriptor.from_sddl(desc_sddl, self.get_domain_sid(samdb))
- self.modify_descriptor(samdb, object_dn, desc)
-
- def print_new_acl(self, samdb, object_dn):
- desc = self.read_descriptor(samdb, object_dn)
- desc_sddl = desc.as_sddl(self.get_domain_sid(samdb))
- self.outf.write("new descriptor for %s:\n" % object_dn)
- self.outf.write(desc_sddl + "\n")
+ ai,ii = sd_helper.dacl_prepend_aces(object_dn, new_ace)
+ for ace in ii:
+ sddl = ace.as_sddl(sd_helper.domain_sid)
+ self.outf.write("WARNING: ignored INHERITED_ACE (%s).\n" % sddl)
+ for ace in ai:
+ sddl = ace.as_sddl(sd_helper.domain_sid)
+ self.outf.write("WARNING: (%s) was already found in the current security descriptor.\n" % sddl)
def run(self, car, action, objectdn, trusteedn, sddl,
H=None, credopts=None, sambaopts=None, versionopts=None):
samdb = SamDB(url=H, session_info=system_session(),
credentials=creds, lp=lp)
- cars = {'change-rid' : GUID_DRS_CHANGE_RID_MASTER,
- 'change-pdc' : GUID_DRS_CHANGE_PDC,
- 'change-infrastructure' : GUID_DRS_CHANGE_INFR_MASTER,
- 'change-schema' : GUID_DRS_CHANGE_SCHEMA_MASTER,
- 'change-naming' : GUID_DRS_CHANGE_DOMAIN_MASTER,
- 'allocate_rids' : GUID_DRS_ALLOCATE_RIDS,
- 'get-changes' : GUID_DRS_GET_CHANGES,
- 'get-changes-all' : GUID_DRS_GET_ALL_CHANGES,
- 'get-changes-filtered' : GUID_DRS_GET_FILTERED_ATTRIBUTES,
- 'topology-manage' : GUID_DRS_MANAGE_TOPOLOGY,
- 'topology-monitor' : GUID_DRS_MONITOR_TOPOLOGY,
- 'repl-sync' : GUID_DRS_REPL_SYNCRONIZE,
- 'ro-repl-secret-sync' : GUID_DRS_RO_REPL_SECRET_SYNC,
+ sd_helper = sd_utils.SDUtils(samdb)
+ cars = {'change-rid': GUID_DRS_CHANGE_RID_MASTER,
+ 'change-pdc': GUID_DRS_CHANGE_PDC,
+ 'change-infrastructure': GUID_DRS_CHANGE_INFR_MASTER,
+ 'change-schema': GUID_DRS_CHANGE_SCHEMA_MASTER,
+ 'change-naming': GUID_DRS_CHANGE_DOMAIN_MASTER,
+ 'allocate_rids': GUID_DRS_ALLOCATE_RIDS,
+ 'get-changes': GUID_DRS_GET_CHANGES,
+ 'get-changes-all': GUID_DRS_GET_ALL_CHANGES,
+ 'get-changes-filtered': GUID_DRS_GET_FILTERED_ATTRIBUTES,
+ 'topology-manage': GUID_DRS_MANAGE_TOPOLOGY,
+ 'topology-monitor': GUID_DRS_MONITOR_TOPOLOGY,
+ 'repl-sync': GUID_DRS_REPL_SYNCRONIZE,
+ 'ro-repl-secret-sync': GUID_DRS_RO_REPL_SECRET_SYNC,
}
sid = self.find_trustee_sid(samdb, trusteedn)
if sddl:
else:
raise CommandError("Wrong argument '%s'!" % action)
- self.print_new_acl(samdb, objectdn)
- self.add_ace(samdb, objectdn, new_ace)
- self.print_new_acl(samdb, objectdn)
+ self.print_acl(sd_helper, objectdn, prefix='old ')
+ self.add_ace(sd_helper, objectdn, new_ace)
+ self.print_acl(sd_helper, objectdn, prefix='new ')
+
+
+class cmd_dsacl_get(cmd_dsacl_base):
+ """Print access list on a directory object."""
+
+ takes_options = [
+ Option("-H", "--URL", help="LDB URL for database or target server",
+ type=str, metavar="URL", dest="H"),
+ Option("--objectdn", help="DN of the object whose SD to modify",
+ type="string"),
+ ]
+
+ def run(self, objectdn,
+ H=None, credopts=None, sambaopts=None, versionopts=None):
+ lp = sambaopts.get_loadparm()
+ creds = credopts.get_credentials(lp)
+
+ samdb = SamDB(url=H, session_info=system_session(),
+ credentials=creds, lp=lp)
+ sd_helper = sd_utils.SDUtils(samdb)
+ self.print_acl(sd_helper, objectdn)
+
+
+class cmd_dsacl_delete(cmd_dsacl_base):
+ """Delete an access list entry on a directory object."""
+
+ takes_options = [
+ Option("-H", "--URL", help="LDB URL for database or target server",
+ type=str, metavar="URL", dest="H"),
+ Option("--objectdn", help="DN of the object whose SD to modify",
+ type="string"),
+ Option("--sddl", help="An ACE or group of ACEs to be deleted from the object",
+ type="string"),
+ ]
+
+ def run(self, objectdn, sddl, H=None, credopts=None, sambaopts=None, versionopts=None):
+ lp = sambaopts.get_loadparm()
+ creds = credopts.get_credentials(lp)
+
+ if sddl is None or objectdn is None:
+ return self.usage()
+
+ samdb = SamDB(url=H, session_info=system_session(),
+ credentials=creds, lp=lp)
+ sd_helper = sd_utils.SDUtils(samdb)
+
+ self.print_acl(sd_helper, objectdn, prefix='old ')
+ self.delete_ace(sd_helper, objectdn, sddl)
+ self.print_acl(sd_helper, objectdn, prefix='new ')
+
+ def delete_ace(self, sd_helper, object_dn, delete_aces):
+ """Delete ace explicitly."""
+ di,ii = sd_helper.dacl_delete_aces(object_dn, delete_aces)
+ for ace in ii:
+ sddl = ace.as_sddl(sd_helper.domain_sid)
+ self.outf.write("WARNING: ignored INHERITED_ACE (%s).\n" % sddl)
+ for ace in di:
+ sddl = ace.as_sddl(sd_helper.domain_sid)
+ self.outf.write("WARNING: (%s) was not found in the current security descriptor.\n" % sddl)
class cmd_dsacl(SuperCommand):
subcommands = {}
subcommands["set"] = cmd_dsacl_set()
+ subcommands["get"] = cmd_dsacl_get()
+ subcommands["delete"] = cmd_dsacl_delete()