samba-tool: rewrite dsacl.py to use the new sd_utils helpers
authorStefan Metzmacher <metze@samba.org>
Thu, 16 Mar 2023 17:32:49 +0000 (18:32 +0100)
committerStefan Metzmacher <metze@samba.org>
Wed, 22 Mar 2023 15:57:15 +0000 (15:57 +0000)
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Autobuild-User(master): Stefan Metzmacher <metze@samba.org>
Autobuild-Date(master): Wed Mar 22 15:57:15 UTC 2023 on atb-devel-224

python/samba/netcmd/dsacl.py

index 02be8fd982bc0fd7b3cfee7f74d5d97b2cd4185a..527c53482b698d3536f31a74c42e3da5420e631b 100644 (file)
@@ -17,6 +17,7 @@
 #
 
 import samba.getopt as options
+from samba import sd_utils
 from samba.dcerpc import security
 from samba.samdb import SamDB
 from samba.ndr import ndr_unpack, ndr_pack
@@ -42,38 +43,6 @@ from samba.netcmd import (
     Option,
 )
 
-def find_trustee_sid(samdb, trusteedn):
-    res = samdb.search(base=trusteedn, expression="(objectClass=*)",
-                       scope=SCOPE_BASE)
-    assert(len(res) == 1)
-    return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
-
-
-def modify_descriptor(samdb, object_dn, desc, controls=None):
-    assert(isinstance(desc, security.descriptor))
-    m = ldb.Message()
-    m.dn = ldb.Dn(samdb, object_dn)
-    m["nTSecurityDescriptor"] = ldb.MessageElement(
-            (ndr_pack(desc)), ldb.FLAG_MOD_REPLACE,
-            "nTSecurityDescriptor")
-    samdb.modify(m)
-
-
-def read_descriptor(samdb, object_dn):
-    res = samdb.search(base=object_dn, scope=SCOPE_BASE,
-                       attrs=["nTSecurityDescriptor"])
-    # we should theoretically always have an SD
-    assert(len(res) == 1)
-    desc = res[0]["nTSecurityDescriptor"][0]
-    return ndr_unpack(security.descriptor, desc)
-
-
-def get_domain_sid(samdb):
-    res = samdb.search(base=samdb.domain_dn(),
-                       expression="(objectClass=*)", scope=SCOPE_BASE)
-    return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
-
-
 class cmd_dsacl_base(Command):
     """Base class for DSACL commands."""
 
@@ -85,9 +54,8 @@ class cmd_dsacl_base(Command):
         "versionopts": options.VersionOptions,
     }
 
-    def print_acl(self, samdb, object_dn, prefix=''):
-        desc = read_descriptor(samdb, object_dn)
-        desc_sddl = desc.as_sddl(get_domain_sid(samdb))
+    def print_acl(self, sd_helper, object_dn, prefix=''):
+        desc_sddl = sd_helper.get_sd_as_sddl(object_dn)
         self.outf.write("%sdescriptor for %s:\n" % (prefix, object_dn))
         self.outf.write(desc_sddl + "\n")
 
@@ -124,26 +92,21 @@ class cmd_dsacl_set(cmd_dsacl_base):
                type="string"),
     ]
 
-    def add_ace(self, samdb, object_dn, new_ace):
+    def find_trustee_sid(self, samdb, trusteedn):
+        res = samdb.search(base=trusteedn, expression="(objectClass=*)",
+                           scope=SCOPE_BASE)
+        assert(len(res) == 1)
+        return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
+
+    def add_ace(self, sd_helper, object_dn, new_ace):
         """Add new ace explicitly."""
-        desc = read_descriptor(samdb, object_dn)
-        new_ace = security.descriptor.from_sddl("D:" + new_ace, get_domain_sid(samdb))
-        new_ace_list = re.findall(r"\(.*?\)",new_ace.as_sddl())
-        for new_ace in new_ace_list:
-            desc_sddl = desc.as_sddl(get_domain_sid(samdb))
-            # TODO add bindings for descriptor manipulation and get rid of this
-            desc_aces = re.findall(r"\(.*?\)", desc_sddl)
-            for ace in desc_aces:
-                if ("ID" in ace):
-                    desc_sddl = desc_sddl.replace(ace, "")
-            if new_ace in desc_sddl:
-                continue
-            if desc_sddl.find("(") >= 0:
-                desc_sddl = desc_sddl[:desc_sddl.index("(")] + new_ace + desc_sddl[desc_sddl.index("("):]
-            else:
-                desc_sddl = desc_sddl + new_ace
-            desc = security.descriptor.from_sddl(desc_sddl, get_domain_sid(samdb))
-            modify_descriptor(samdb, object_dn, desc)
+        ai,ii = sd_helper.dacl_prepend_aces(object_dn, new_ace)
+        for ace in ii:
+            sddl = ace.as_sddl(sd_helper.domain_sid)
+            self.outf.write("WARNING: ignored INHERITED_ACE (%s).\n" % sddl)
+        for ace in ai:
+            sddl = ace.as_sddl(sd_helper.domain_sid)
+            self.outf.write("WARNING: (%s) was already found in the current security descriptor.\n" % sddl)
 
     def run(self, car, action, objectdn, trusteedn, sddl,
             H=None, credopts=None, sambaopts=None, versionopts=None):
@@ -156,6 +119,7 @@ class cmd_dsacl_set(cmd_dsacl_base):
 
         samdb = SamDB(url=H, session_info=system_session(),
                       credentials=creds, lp=lp)
+        sd_helper = sd_utils.SDUtils(samdb)
         cars = {'change-rid': GUID_DRS_CHANGE_RID_MASTER,
                 'change-pdc': GUID_DRS_CHANGE_PDC,
                 'change-infrastructure': GUID_DRS_CHANGE_INFR_MASTER,
@@ -170,7 +134,7 @@ class cmd_dsacl_set(cmd_dsacl_base):
                 'repl-sync': GUID_DRS_REPL_SYNCRONIZE,
                 'ro-repl-secret-sync': GUID_DRS_RO_REPL_SECRET_SYNC,
                 }
-        sid = find_trustee_sid(samdb, trusteedn)
+        sid = self.find_trustee_sid(samdb, trusteedn)
         if sddl:
             new_ace = sddl
         elif action == "allow":
@@ -180,9 +144,9 @@ class cmd_dsacl_set(cmd_dsacl_base):
         else:
             raise CommandError("Wrong argument '%s'!" % action)
 
-        self.print_acl(samdb, objectdn, prefix='old ')
-        self.add_ace(samdb, objectdn, new_ace)
-        self.print_acl(samdb, objectdn, prefix='new ')
+        self.print_acl(sd_helper, objectdn, prefix='old ')
+        self.add_ace(sd_helper, objectdn, new_ace)
+        self.print_acl(sd_helper, objectdn, prefix='new ')
 
 
 class cmd_dsacl_get(cmd_dsacl_base):
@@ -202,7 +166,8 @@ class cmd_dsacl_get(cmd_dsacl_base):
 
         samdb = SamDB(url=H, session_info=system_session(),
             credentials=creds, lp=lp)
-        self.print_acl(samdb, objectdn)
+        sd_helper = sd_utils.SDUtils(samdb)
+        self.print_acl(sd_helper, objectdn)
 
 
 class cmd_dsacl_delete(cmd_dsacl_base):
@@ -226,23 +191,21 @@ class cmd_dsacl_delete(cmd_dsacl_base):
 
         samdb = SamDB(url=H, session_info=system_session(),
                       credentials=creds, lp=lp)
+        sd_helper = sd_utils.SDUtils(samdb)
 
-        self.print_acl(samdb, objectdn, prefix='old ')
-        self.delete_ace(samdb, objectdn, sddl)
-        self.print_acl(samdb, objectdn, prefix='new ')
+        self.print_acl(sd_helper, objectdn, prefix='old ')
+        self.delete_ace(sd_helper, objectdn, sddl)
+        self.print_acl(sd_helper, objectdn, prefix='new ')
 
-    def delete_ace(self, samdb, object_dn, delete_aces):
+    def delete_ace(self, sd_helper, object_dn, delete_aces):
         """Delete ace explicitly."""
-        desc = read_descriptor(samdb, object_dn)
-        domsid = get_domain_sid(samdb)
-        delete_aces = security.descriptor.from_sddl("D:" + delete_aces, domsid).dacl.aces
-        for ace in delete_aces:
-            if ace in desc.dacl.aces:
-                desc.dacl_del_ace(ace)
-            else:
-                sddl = ace.as_sddl(domsid)
-                self.outf.write("WARNING: (%s) was not found in the current security descriptor.\n" % sddl)
-        modify_descriptor(samdb, object_dn, desc)
+        di,ii = sd_helper.dacl_delete_aces(object_dn, delete_aces)
+        for ace in ii:
+            sddl = ace.as_sddl(sd_helper.domain_sid)
+            self.outf.write("WARNING: ignored INHERITED_ACE (%s).\n" % sddl)
+        for ace in di:
+            sddl = ace.as_sddl(sd_helper.domain_sid)
+            self.outf.write("WARNING: (%s) was not found in the current security descriptor.\n" % sddl)
 
 
 class cmd_dsacl(SuperCommand):