from abc import ABCMeta, abstractmethod
import xml.etree.ElementTree as etree
import re
+from samba.net import Net
+from samba.dcerpc import nbt
+from samba import smb
+import samba.gpo as gpo
+import chardet
try:
from enum import Enum
}
}
+''' Fetch the hostname of a writable DC '''
+def get_dc_hostname(creds, lp):
+ net = Net(creds=creds, lp=lp)
+ cldap_ret = net.finddc(domain=lp.get('realm'), flags=(nbt.NBT_SERVER_LDAP |
+ nbt.NBT_SERVER_DS))
+ return cldap_ret.pdc_dns_name
+
+''' Fetch a list of GUIDs for applicable GPOs '''
+def get_gpo_list(dc_hostname, creds, lp):
+ gpos = []
+ ads = gpo.ADS_STRUCT(dc_hostname, lp, creds)
+ if ads.connect():
+ gpos = ads.get_gpo_list(creds.get_username())
+ return gpos
+
+def apply_gp(lp, creds, test_ldb, logger, store, gp_extensions):
+ gp_db = store.get_gplog(creds.get_username())
+ dc_hostname = get_dc_hostname(creds, lp)
+ try:
+ conn = smb.SMB(dc_hostname, 'sysvol', lp=lp, creds=creds)
+ except:
+ logger.error('Error connecting to \'%s\' using SMB' % dc_hostname)
+ raise
+ gpos = get_gpo_list(dc_hostname, creds, lp)
+
+ for gpo_obj in gpos:
+ guid = gpo_obj.name
+ if guid == 'Local Policy':
+ continue
+ path = os.path.join(lp.get('realm').lower(), 'Policies', guid)
+ local_path = os.path.join(lp.get("path", "sysvol"), path)
+ version = int(gpo.gpo_get_sysvol_gpt_version(local_path)[1])
+ if version != store.get_int(guid):
+ logger.info('GPO %s has changed' % guid)
+ gp_db.state(GPOSTATE.APPLY)
+ else:
+ gp_db.state(GPOSTATE.ENFORCE)
+ gp_db.set_guid(guid)
+ store.start()
+ for ext in gp_extensions:
+ try:
+ ext.parse(ext.list(path), test_ldb, conn, gp_db, lp)
+ except Exception as e:
+ logger.error('Failed to parse gpo %s for extension %s' % \
+ (guid, str(ext)))
+ logger.error('Message was: ' + str(e))
+ store.cancel()
+ continue
+ store.store(guid, '%i' % version)
+ store.commit()
+
+def unapply_log(gp_db):
+ while True:
+ item = gp_db.apply_log_pop()
+ if item:
+ yield item
+ else:
+ break
+
+def unapply_gp(lp, creds, test_ldb, logger, store, gp_extensions):
+ gp_db = store.get_gplog(creds.get_username())
+ gp_db.state(GPOSTATE.UNAPPLY)
+ for gpo_guid in unapply_log(gp_db):
+ gp_db.set_guid(gpo_guid)
+ unapply_attributes = gp_db.list(gp_extensions)
+ for attr in unapply_attributes:
+ attr_obj = attr[-1](logger, test_ldb, gp_db, lp, attr[0], attr[1])
+ attr_obj.mapper()[attr[0]][0](attr[1]) # Set the old value
+ gp_db.delete(str(attr_obj), attr[0])
+ gp_db.commit()
+
from samba.samdb import SamDB
except:
SamDB = None
-from samba.gpclass import *
-from samba.net import Net
-from samba.dcerpc import nbt
-from samba import smb
-import samba.gpo as gpo
+from samba.gpclass import apply_gp, unapply_gp, GPOStorage
+from samba.gp_sec_ext import gp_sec_ext
import logging
-import chardet
-
-''' Fetch the hostname of a writable DC '''
-def get_dc_hostname(creds, lp):
- net = Net(creds=creds, lp=lp)
- cldap_ret = net.finddc(domain=lp.get('realm'), flags=(nbt.NBT_SERVER_LDAP |
- nbt.NBT_SERVER_DS))
- return cldap_ret.pdc_dns_name
-
-''' Fetch a list of GUIDs for applicable GPOs '''
-def get_gpo_list(dc_hostname, creds, lp):
- gpos = []
- ads = gpo.ADS_STRUCT(dc_hostname, lp, creds)
- if ads.connect():
- gpos = ads.get_gpo_list(creds.get_username())
- return gpos
-
-def apply_gp(lp, creds, test_ldb, logger, store, gp_extensions):
- gp_db = store.get_gplog(creds.get_username())
- dc_hostname = get_dc_hostname(creds, lp)
- try:
- conn = smb.SMB(dc_hostname, 'sysvol', lp=lp, creds=creds)
- except:
- logger.error('Error connecting to \'%s\' using SMB' % dc_hostname)
- raise
- gpos = get_gpo_list(dc_hostname, creds, lp)
-
- for gpo_obj in gpos:
- guid = gpo_obj.name
- if guid == 'Local Policy':
- continue
- path = os.path.join(lp.get('realm').lower(), 'Policies', guid)
- local_path = os.path.join(lp.get("path", "sysvol"), path)
- version = int(gpo.gpo_get_sysvol_gpt_version(local_path)[1])
- if version != store.get_int(guid):
- logger.info('GPO %s has changed' % guid)
- gp_db.state(GPOSTATE.APPLY)
- else:
- gp_db.state(GPOSTATE.ENFORCE)
- gp_db.set_guid(guid)
- store.start()
- for ext in gp_extensions:
- try:
- ext.parse(ext.list(path), test_ldb, conn, gp_db, lp)
- except Exception as e:
- logger.error('Failed to parse gpo %s for extension %s' % \
- (guid, str(ext)))
- logger.error('Message was: ' + str(e))
- store.cancel()
- continue
- store.store(guid, '%i' % version)
- store.commit()
-
-def unapply_log(gp_db):
- while True:
- item = gp_db.apply_log_pop()
- if item:
- yield item
- else:
- break
-
-def unapply_gp(lp, creds, test_ldb, logger, store, gp_extensions):
- gp_db = store.get_gplog(creds.get_username())
- gp_db.state(GPOSTATE.UNAPPLY)
- for gpo_guid in unapply_log(gp_db):
- gp_db.set_guid(gpo_guid)
- unapply_attributes = gp_db.list(gp_extensions)
- for attr in unapply_attributes:
- attr_obj = attr[-1](logger, test_ldb, gp_db, lp, attr[0], attr[1])
- attr_obj.mapper()[attr[0]][0](attr[1]) # Set the old value
- gp_db.delete(str(attr_obj), attr[0])
- gp_db.commit()
if __name__ == "__main__":
parser = optparse.OptionParser('samba_gpoupdate [options]')