import os, re
from subprocess import Popen, PIPE
-from samba.gp.gpclass import gp_pol_ext, drop_privileges, gp_file_applier
-from hashlib import blake2b
+from samba.gp.gpclass import gp_pol_ext, drop_privileges, gp_file_applier, \
+ gp_misc_applier
from tempfile import NamedTemporaryFile
+from samba.gp.gp_scripts_ext import fetch_crontab, install_crontab, \
+ install_user_crontab
intro = '''
### autogenerated by samba
output[str(self)].append(e.data)
return output
-def fetch_crontab(username):
- p = Popen(['crontab', '-l', '-u', username], stdout=PIPE, stderr=PIPE)
- out, err = p.communicate()
- if p.returncode != 0:
- raise RuntimeError('Failed to read the crontab: %s' % err)
- m = re.findall('%s(.*)%s' % (intro, end), out.decode(), re.DOTALL)
- if len(m) == 1:
- entries = m[0].strip().split('\n')
- else:
- entries = []
- m = re.findall('(.*)%s.*%s(.*)' % (intro, end), out.decode(), re.DOTALL)
- if len(m) == 1:
- others = '\n'.join([l.strip() for l in m[0]])
- else:
- others = out.decode()
- return others, entries
+class gp_user_centrify_crontab_ext(gp_centrify_crontab_ext, gp_misc_applier):
+ def unapply(self, guid, attribute, entry):
+ others, entries = fetch_crontab(self.username)
+ if entry in entries:
+ entries.remove(entry)
+ install_user_crontab(self.username, others, entries)
+ self.cache_remove_attribute(guid, attribute)
-def install_crontab(fname, username):
- p = Popen(['crontab', fname, '-u', username], stdout=PIPE, stderr=PIPE)
- _, err = p.communicate()
- if p.returncode != 0:
- raise RuntimeError('Failed to install crontab: %s' % err)
+ def apply(self, guid, attribute, entry):
+ old_val = self.cache_get_attribute_value(guid, attribute)
+ others, entries = fetch_crontab(self.username)
+ if not old_val or entry not in entries:
+ entries.append(entry)
+ install_user_crontab(self.username, others, entries)
+ self.cache_add_attribute(guid, attribute, entry)
-class gp_user_centrify_crontab_ext(gp_centrify_crontab_ext):
def process_group_policy(self, deleted_gpo_list, changed_gpo_list):
for guid, settings in deleted_gpo_list:
- self.gp_db.set_guid(guid)
if str(self) in settings:
- others, entries = fetch_crontab(self.username)
for attribute, entry in settings[str(self)].items():
- if entry in entries:
- entries.remove(entry)
- self.gp_db.delete(str(self), attribute)
- with NamedTemporaryFile() as f:
- if len(entries) > 0:
- f.write('\n'.join([others, intro,
- '\n'.join(entries), end]).encode())
- else:
- f.write(others.encode())
- f.flush()
- install_crontab(f.name, self.username)
- self.gp_db.commit()
+ self.unapply(guid, attribute, entry)
for gpo in changed_gpo_list:
if gpo.file_sys_path:
section = \
'Software\\Policies\\Centrify\\UnixSettings\\CrontabEntries'
- self.gp_db.set_guid(gpo.name)
pol_file = 'USER/Registry.pol'
path = os.path.join(gpo.file_sys_path, pol_file)
pol_conf = drop_privileges('root', self.parse, path)
if not pol_conf:
continue
+ attrs = []
for e in pol_conf.entries:
if e.keyname == section and e.data.strip():
- attribute = blake2b(e.data.encode()).hexdigest()
- old_val = self.gp_db.retrieve(str(self), attribute)
- others, entries = fetch_crontab(self.username)
- if not old_val or e.data not in entries:
- entries.append(e.data)
- with NamedTemporaryFile() as f:
- f.write('\n'.join([others, intro,
- '\n'.join(entries), end]).encode())
- f.flush()
- install_crontab(f.name, self.username)
- self.gp_db.store(str(self), attribute, e.data)
- self.gp_db.commit()
+ attribute = self.generate_attribute(e.data)
+ attrs.append(attribute)
+ self.apply(gpo.name, attribute, e.data)
+ self.clean(gpo.name, keep=attrs)
def rsop(self, gpo):
return super().rsop(gpo, target='USER')