gp: Ensure centrify crontab user policy performs proper cleanup
authorDavid Mulder <dmulder@samba.org>
Tue, 25 Jul 2023 19:23:10 +0000 (13:23 -0600)
committerAndrew Bartlett <abartlet@samba.org>
Mon, 31 Jul 2023 09:58:30 +0000 (09:58 +0000)
This resolves cleanup issues for user and group
centrify compatible policies. It also ensures the
crontab policies use functions from the scripts
policy, to avoid code duplication and simplify
cleanup.

Signed-off-by: David Mulder <dmulder@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/gp/gp_centrify_crontab_ext.py
selftest/knownfail.d/gpo [deleted file]

index e8ffd8bec9fbcb358e1bd010665c0672426b711d..e532416d224d1ec7dde2caeb485d75750f61547e 100644 (file)
 
 import os, re
 from subprocess import Popen, PIPE
-from samba.gp.gpclass import gp_pol_ext, drop_privileges, gp_file_applier
-from hashlib import blake2b
+from samba.gp.gpclass import gp_pol_ext, drop_privileges, gp_file_applier, \
+    gp_misc_applier
 from tempfile import NamedTemporaryFile
+from samba.gp.gp_scripts_ext import fetch_crontab, install_crontab, \
+    install_user_crontab
 
 intro = '''
 ### autogenerated by samba
@@ -92,73 +94,44 @@ class gp_centrify_crontab_ext(gp_pol_ext, gp_file_applier):
                     output[str(self)].append(e.data)
         return output
 
-def fetch_crontab(username):
-    p = Popen(['crontab', '-l', '-u', username], stdout=PIPE, stderr=PIPE)
-    out, err = p.communicate()
-    if p.returncode != 0:
-        raise RuntimeError('Failed to read the crontab: %s' % err)
-    m = re.findall('%s(.*)%s' % (intro, end), out.decode(), re.DOTALL)
-    if len(m) == 1:
-        entries = m[0].strip().split('\n')
-    else:
-        entries = []
-    m = re.findall('(.*)%s.*%s(.*)' % (intro, end), out.decode(), re.DOTALL)
-    if len(m) == 1:
-        others = '\n'.join([l.strip() for l in m[0]])
-    else:
-        others = out.decode()
-    return others, entries
+class gp_user_centrify_crontab_ext(gp_centrify_crontab_ext, gp_misc_applier):
+    def unapply(self, guid, attribute, entry):
+        others, entries = fetch_crontab(self.username)
+        if entry in entries:
+            entries.remove(entry)
+            install_user_crontab(self.username, others, entries)
+        self.cache_remove_attribute(guid, attribute)
 
-def install_crontab(fname, username):
-    p = Popen(['crontab', fname, '-u', username], stdout=PIPE, stderr=PIPE)
-    _, err = p.communicate()
-    if p.returncode != 0:
-        raise RuntimeError('Failed to install crontab: %s' % err)
+    def apply(self, guid, attribute, entry):
+        old_val = self.cache_get_attribute_value(guid, attribute)
+        others, entries = fetch_crontab(self.username)
+        if not old_val or entry not in entries:
+            entries.append(entry)
+            install_user_crontab(self.username, others, entries)
+            self.cache_add_attribute(guid, attribute, entry)
 
-class gp_user_centrify_crontab_ext(gp_centrify_crontab_ext):
     def process_group_policy(self, deleted_gpo_list, changed_gpo_list):
         for guid, settings in deleted_gpo_list:
-            self.gp_db.set_guid(guid)
             if str(self) in settings:
-                others, entries = fetch_crontab(self.username)
                 for attribute, entry in settings[str(self)].items():
-                    if entry in entries:
-                        entries.remove(entry)
-                    self.gp_db.delete(str(self), attribute)
-                with NamedTemporaryFile() as f:
-                    if len(entries) > 0:
-                        f.write('\n'.join([others, intro,
-                                '\n'.join(entries), end]).encode())
-                    else:
-                        f.write(others.encode())
-                    f.flush()
-                    install_crontab(f.name, self.username)
-            self.gp_db.commit()
+                    self.unapply(guid, attribute, entry)
 
         for gpo in changed_gpo_list:
             if gpo.file_sys_path:
                 section = \
                     'Software\\Policies\\Centrify\\UnixSettings\\CrontabEntries'
-                self.gp_db.set_guid(gpo.name)
                 pol_file = 'USER/Registry.pol'
                 path = os.path.join(gpo.file_sys_path, pol_file)
                 pol_conf = drop_privileges('root', self.parse, path)
                 if not pol_conf:
                     continue
+                attrs = []
                 for e in pol_conf.entries:
                     if e.keyname == section and e.data.strip():
-                        attribute = blake2b(e.data.encode()).hexdigest()
-                        old_val = self.gp_db.retrieve(str(self), attribute)
-                        others, entries = fetch_crontab(self.username)
-                        if not old_val or e.data not in entries:
-                            entries.append(e.data)
-                            with NamedTemporaryFile() as f:
-                                f.write('\n'.join([others, intro,
-                                        '\n'.join(entries), end]).encode())
-                                f.flush()
-                                install_crontab(f.name, self.username)
-                            self.gp_db.store(str(self), attribute, e.data)
-                        self.gp_db.commit()
+                        attribute = self.generate_attribute(e.data)
+                        attrs.append(attribute)
+                        self.apply(gpo.name, attribute, e.data)
+                self.clean(gpo.name, keep=attrs)
 
     def rsop(self, gpo):
         return super().rsop(gpo, target='USER')
diff --git a/selftest/knownfail.d/gpo b/selftest/knownfail.d/gpo
deleted file mode 100644 (file)
index 94ba8c2..0000000
+++ /dev/null
@@ -1 +0,0 @@
-^samba.tests.gpo.samba.tests.gpo.GPOTests.test_gp_user_centrify_crontab_ext