import os
from samba import gpo, tests
from samba.gpclass import register_gp_extension, list_gp_extensions, \
- unregister_gp_extension
+ unregister_gp_extension, gp_log, GPOStorage
from samba.param import LoadParm
from samba.gpclass import check_refresh_gpo_list, check_safe_path, \
check_guid, parse_gpext_conf, atomic_write_conf
+from subprocess import Popen, PIPE
+from tempfile import NamedTemporaryFile
poldir = r'\\addom.samba.example.com\sysvol\addom.samba.example.com\Policies'
dspath = 'CN=Policies,CN=System,DC=addom,DC=samba,DC=example,DC=com'
gpt_data = '[General]\nVersion=%d'
+def days2rel_nttime(val):
+ seconds = 60
+ minutes = 60
+ hours = 24
+ sam_add = 10000000
+ return -(val * seconds * minutes * hours * sam_add)
+
+def gpupdate_force(lp):
+ gpupdate = lp.get('gpo update command')
+ gpupdate.append('--force')
+
+ return Popen(gpupdate, stdout=PIPE, stderr=PIPE).wait()
+
+def gpupdate_unapply(lp):
+ gpupdate = lp.get('gpo update command')
+ gpupdate.append('--unapply')
+
+ return Popen(gpupdate, stdout=PIPE, stderr=PIPE).wait()
+
+def stage_file(path, data):
+ dirname = os.path.dirname(path)
+ if not os.path.exists(dirname):
+ try:
+ os.makedirs(dirname)
+ except OSError as e:
+ if not (e.errno == errno.EEXIST and os.path.isdir(dirname)):
+ return False
+ if os.path.exists(path):
+ os.rename(path, '%s.bak' % path)
+ with NamedTemporaryFile(delete=False, dir=os.path.dirname(path)) as f:
+ f.write(data)
+ os.rename(f.name, path)
+ os.chmod(path, 0o644)
+ return True
+
+def unstage_file(path):
+ backup = '%s.bak' % path
+ if os.path.exists(backup):
+ os.rename(backup, path)
+ elif os.path.exists(path):
+ os.remove(path)
class GPOTests(tests.TestCase):
def setUp(self):
'Failed to find test variable in gpext.conf')
parser.remove_section('test_section')
atomic_write_conf(lp, parser)
+
+ def test_gp_log_get_applied(self):
+ local_path = self.lp.get('path', 'sysvol')
+ guids = ['{31B2F340-016D-11D2-945F-00C04FB984F9}',
+ '{6AC1786C-016F-11D2-945F-00C04FB984F9}']
+ gpofile = '%s/addom.samba.example.com/Policies/%s/MACHINE/Microsoft/' \
+ 'Windows NT/SecEdit/GptTmpl.inf'
+ stage = '[System Access]\nMinimumPasswordAge = 998\n'
+ cache_dir = self.lp.get('cache directory')
+ store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+ for guid in guids:
+ gpttmpl = gpofile % (local_path, guid)
+ ret = stage_file(gpttmpl, stage)
+ self.assertTrue(ret, 'Could not create the target %s' % gpttmpl)
+
+ ret = gpupdate_force(self.lp)
+ self.assertEquals(ret, 0, 'gpupdate force failed')
+
+ gp_db = store.get_gplog('ADDC$')
+
+ applied_guids = gp_db.get_applied_guids()
+ self.assertEquals(len(applied_guids), 2, 'The guids were not found')
+ self.assertIn(guids[0], applied_guids,
+ '%s not in applied guids' % guids[0])
+ self.assertIn(guids[1], applied_guids,
+ '%s not in applied guids' % guids[1])
+
+ applied_settings = gp_db.get_applied_settings(applied_guids)
+ for policy in applied_settings:
+ self.assertIn('System Access', policy[1],
+ 'System Access policies not set')
+ self.assertIn('minPwdAge', policy[1]['System Access'],
+ 'minPwdAge policy not set')
+ if policy[0] == guids[0]:
+ self.assertEqual(int(policy[1]['System Access']['minPwdAge']),
+ days2rel_nttime(1),
+ 'minPwdAge policy not set')
+ elif policy[0] == guids[1]:
+ self.assertEqual(int(policy[1]['System Access']['minPwdAge']),
+ days2rel_nttime(998),
+ 'minPwdAge policy not set')
+
+ for guid in guids:
+ gpttmpl = gpofile % (local_path, guid)
+ unstage_file(gpttmpl)
+
+ ret = gpupdate_unapply(self.lp)
+ self.assertEquals(ret, 0, 'gpupdate unapply failed')