gpo: Extract Kerberos policy from Security extension
[samba.git] / python / samba / tests / gpo.py
index 92403a56d006fa0197b4130425fc7dcf8b3a47f2..f2927373dc004862f8e2939a73d5d861664245fa 100644 (file)
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import os
+import errno
 from samba import gpo, tests
 from samba.gpclass import register_gp_extension, list_gp_extensions, \
-    unregister_gp_extension, gp_log, GPOStorage
+    unregister_gp_extension, GPOStorage
 from samba.param import LoadParm
 from samba.gpclass import check_refresh_gpo_list, check_safe_path, \
     check_guid, parse_gpext_conf, atomic_write_conf, get_deleted_gpos_list
 from subprocess import Popen, PIPE
-from tempfile import NamedTemporaryFile
-
-poldir = r'\\addom.samba.example.com\sysvol\addom.samba.example.com\Policies'
-dspath = 'CN=Policies,CN=System,DC=addom,DC=samba,DC=example,DC=com'
+from tempfile import NamedTemporaryFile, TemporaryDirectory
+from samba.gp_sec_ext import gp_krb_ext, gp_sec_ext
+from samba.gp_scripts_ext import gp_scripts_ext
+from samba.gp_sudoers_ext import gp_sudoers_ext
+from samba.gpclass import gp_inf_ext
+import logging
+from samba.credentials import Credentials
+from samba.compat import get_bytes
+from samba.dcerpc import preg
+from samba.ndr import ndr_pack
+import codecs
+
+realm = os.environ.get('REALM')
+policies = realm + '/POLICIES'
+realm = realm.lower()
+poldir = r'\\{0}\sysvol\{0}\Policies'.format(realm)
+# the first part of the base DN varies by testenv. Work it out from the realm
+base_dn = 'DC={0},DC=samba,DC=example,DC=com'.format(realm.split('.')[0])
+dspath = 'CN=Policies,CN=System,' + base_dn
 gpt_data = '[General]\nVersion=%d'
 
 def days2rel_nttime(val):
@@ -58,7 +74,7 @@ def stage_file(path, data):
     if os.path.exists(path):
         os.rename(path, '%s.bak' % path)
     with NamedTemporaryFile(delete=False, dir=os.path.dirname(path)) as f:
-        f.write(data)
+        f.write(get_bytes(data))
         os.rename(f.name, path)
         os.chmod(path, 0o644)
     return True
@@ -74,6 +90,7 @@ class GPOTests(tests.TestCase):
     def setUp(self):
         super(GPOTests, self).setUp()
         self.server = os.environ["SERVER"]
+        self.dc_account = self.server.upper() + '$'
         self.lp = LoadParm()
         self.lp.load_default()
         self.creds = self.insta_creds(template=self.get_credentials())
@@ -91,11 +108,11 @@ class GPOTests(tests.TestCase):
         file_sys_paths = [None, '%s\\%s' % (poldir, guid)]
         ds_paths = [None, 'CN=%s,%s' % (guid, dspath)]
         for i in range(0, len(gpos)):
-            self.assertEquals(gpos[i].name, names[i],
+            self.assertEqual(gpos[i].name, names[i],
                               'The gpo name did not match expected name %s' % gpos[i].name)
-            self.assertEquals(gpos[i].file_sys_path, file_sys_paths[i],
+            self.assertEqual(gpos[i].file_sys_path, file_sys_paths[i],
                               'file_sys_path did not match expected %s' % gpos[i].file_sys_path)
-            self.assertEquals(gpos[i].ds_path, ds_paths[i],
+            self.assertEqual(gpos[i].ds_path, ds_paths[i],
                               'ds_path did not match expected %s' % gpos[i].ds_path)
 
     def test_gpo_ads_does_not_segfault(self):
@@ -107,19 +124,18 @@ class GPOTests(tests.TestCase):
     def test_gpt_version(self):
         global gpt_data
         local_path = self.lp.cache_path('gpo_cache')
-        policies = 'ADDOM.SAMBA.EXAMPLE.COM/POLICIES'
         guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
         gpo_path = os.path.join(local_path, policies, guid)
         old_vers = gpo.gpo_get_sysvol_gpt_version(gpo_path)[1]
 
         with open(os.path.join(gpo_path, 'GPT.INI'), 'w') as gpt:
             gpt.write(gpt_data % 42)
-        self.assertEquals(gpo.gpo_get_sysvol_gpt_version(gpo_path)[1], 42,
+        self.assertEqual(gpo.gpo_get_sysvol_gpt_version(gpo_path)[1], 42,
                           'gpo_get_sysvol_gpt_version() did not return the expected version')
 
         with open(os.path.join(gpo_path, 'GPT.INI'), 'w') as gpt:
             gpt.write(gpt_data % old_vers)
-        self.assertEquals(gpo.gpo_get_sysvol_gpt_version(gpo_path)[1], old_vers,
+        self.assertEqual(gpo.gpo_get_sysvol_gpt_version(gpo_path)[1], old_vers,
                           'gpo_get_sysvol_gpt_version() did not return the expected version')
 
     def test_check_refresh_gpo_list(self):
@@ -133,7 +149,7 @@ class GPOTests(tests.TestCase):
                         'GPO cache %s was not created' % cache)
 
         guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
-        gpt_ini = os.path.join(cache, 'ADDOM.SAMBA.EXAMPLE.COM/POLICIES',
+        gpt_ini = os.path.join(cache, policies,
                                guid, 'GPT.INI')
         self.assertTrue(os.path.exists(gpt_ini),
                         'GPT.INI was not cached for %s' % guid)
@@ -147,12 +163,12 @@ class GPOTests(tests.TestCase):
         self.assertEqual(check_safe_path('\\\\etc/\\passwd'), 'etc/passwd')
 
         # there should be no backslashes used to delineate paths
-        before = 'sysvol/addom.samba.example.com\\Policies/' \
+        before = 'sysvol/' + realm + '\\Policies/' \
             '{31B2F340-016D-11D2-945F-00C04FB984F9}\\GPT.INI'
-        after = 'addom.samba.example.com/Policies/' \
+        after = realm + '/Policies/' \
             '{31B2F340-016D-11D2-945F-00C04FB984F9}/GPT.INI'
         result = check_safe_path(before)
-        self.assertEquals(result, after, 'check_safe_path() didn\'t'
+        self.assertEqual(result, after, 'check_safe_path() didn\'t'
                           ' correctly convert \\ to /')
 
     def test_gpt_ext_register(self):
@@ -167,7 +183,7 @@ class GPOTests(tests.TestCase):
         gp_exts = list_gp_extensions(self.lp.configfile)
         self.assertTrue(ext_guid in gp_exts.keys(),
                         'Failed to list gp exts')
-        self.assertEquals(gp_exts[ext_guid]['DllName'], ext_path,
+        self.assertEqual(gp_exts[ext_guid]['DllName'], ext_path,
                           'Failed to list gp exts')
 
         unregister_gp_extension(ext_guid)
@@ -187,7 +203,7 @@ class GPOTests(tests.TestCase):
         lp, parser = parse_gpext_conf(self.lp.configfile)
         self.assertTrue('test_section' in parser.sections(),
                         'test_section not found in gpext.conf')
-        self.assertEquals(parser.get('test_section', 'test_var'), ext_guid,
+        self.assertEqual(parser.get('test_section', 'test_var'), ext_guid,
                           'Failed to find test variable in gpext.conf')
         parser.remove_section('test_section')
         atomic_write_conf(lp, parser)
@@ -196,7 +212,7 @@ class GPOTests(tests.TestCase):
         local_path = self.lp.get('path', 'sysvol')
         guids = ['{31B2F340-016D-11D2-945F-00C04FB984F9}',
                  '{6AC1786C-016F-11D2-945F-00C04FB984F9}']
-        gpofile = '%s/addom.samba.example.com/Policies/%s/MACHINE/Microsoft/' \
+        gpofile = '%s/' + realm + '/Policies/%s/MACHINE/Microsoft/' \
                   'Windows NT/SecEdit/GptTmpl.inf'
         stage = '[System Access]\nMinimumPasswordAge = 998\n'
         cache_dir = self.lp.get('cache directory')
@@ -207,12 +223,12 @@ class GPOTests(tests.TestCase):
             self.assertTrue(ret, 'Could not create the target %s' % gpttmpl)
 
         ret = gpupdate_force(self.lp)
-        self.assertEquals(ret, 0, 'gpupdate force failed')
+        self.assertEqual(ret, 0, 'gpupdate force failed')
 
-        gp_db = store.get_gplog('ADDC$')
+        gp_db = store.get_gplog(self.dc_account)
 
         applied_guids = gp_db.get_applied_guids()
-        self.assertEquals(len(applied_guids), 2, 'The guids were not found')
+        self.assertEqual(len(applied_guids), 2, 'The guids were not found')
         self.assertIn(guids[0], applied_guids,
                       '%s not in applied guids' % guids[0])
         self.assertIn(guids[1], applied_guids,
@@ -235,7 +251,7 @@ class GPOTests(tests.TestCase):
 
         ads = gpo.ADS_STRUCT(self.server, self.lp, self.creds)
         if ads.connect():
-            gpos = ads.get_gpo_list('ADDC$')
+            gpos = ads.get_gpo_list(self.dc_account)
         del_gpos = get_deleted_gpos_list(gp_db, gpos[:-1])
         self.assertEqual(len(del_gpos), 1, 'Returned delete gpos is incorrect')
         self.assertEqual(guids[-1], del_gpos[0][0],
@@ -250,4 +266,251 @@ class GPOTests(tests.TestCase):
             unstage_file(gpttmpl)
 
         ret = gpupdate_unapply(self.lp)
-        self.assertEquals(ret, 0, 'gpupdate unapply failed')
+        self.assertEqual(ret, 0, 'gpupdate unapply failed')
+
+    def test_process_group_policy(self):
+        local_path = self.lp.cache_path('gpo_cache')
+        guids = ['{31B2F340-016D-11D2-945F-00C04FB984F9}',
+                 '{6AC1786C-016F-11D2-945F-00C04FB984F9}']
+        gpofile = '%s/' + policies + '/%s/MACHINE/MICROSOFT/' \
+                  'WINDOWS NT/SECEDIT/GPTTMPL.INF'
+        logger = logging.getLogger('gpo_tests')
+        cache_dir = self.lp.get('cache directory')
+        store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+
+        machine_creds = Credentials()
+        machine_creds.guess(self.lp)
+        machine_creds.set_machine_account()
+
+        # Initialize the group policy extension
+        ext = gp_krb_ext(logger, self.lp, machine_creds, store)
+
+        ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
+        if ads.connect():
+            gpos = ads.get_gpo_list(machine_creds.get_username())
+
+        stage = '[Kerberos Policy]\nMaxTicketAge = %d\n'
+        opts = [100, 200]
+        for i in range(0, 2):
+            gpttmpl = gpofile % (local_path, guids[i])
+            ret = stage_file(gpttmpl, stage % opts[i])
+            self.assertTrue(ret, 'Could not create the target %s' % gpttmpl)
+
+        # Process all gpos
+        ext.process_group_policy([], gpos)
+
+        ret = store.get_int('kdc:user_ticket_lifetime')
+        self.assertEqual(ret, opts[1], 'Higher priority policy was not set')
+
+        # Remove policy
+        gp_db = store.get_gplog(machine_creds.get_username())
+        del_gpos = get_deleted_gpos_list(gp_db, [])
+        ext.process_group_policy(del_gpos, [])
+
+        ret = store.get_int('kdc:user_ticket_lifetime')
+        self.assertEqual(ret, None, 'MaxTicketAge should not have applied')
+
+        # Process just the first gpo
+        ext.process_group_policy([], gpos[:-1])
+
+        ret = store.get_int('kdc:user_ticket_lifetime')
+        self.assertEqual(ret, opts[0], 'Lower priority policy was not set')
+
+        # Remove policy
+        ext.process_group_policy(del_gpos, [])
+
+        for guid in guids:
+            gpttmpl = gpofile % (local_path, guid)
+            unstage_file(gpttmpl)
+
+    def test_gp_scripts(self):
+        local_path = self.lp.cache_path('gpo_cache')
+        guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
+        reg_pol = os.path.join(local_path, policies, guid,
+                               'MACHINE/REGISTRY.POL')
+        logger = logging.getLogger('gpo_tests')
+        cache_dir = self.lp.get('cache directory')
+        store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+
+        machine_creds = Credentials()
+        machine_creds.guess(self.lp)
+        machine_creds.set_machine_account()
+
+        # Initialize the group policy extension
+        ext = gp_scripts_ext(logger, self.lp, machine_creds, store)
+
+        ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
+        if ads.connect():
+            gpos = ads.get_gpo_list(machine_creds.get_username())
+
+        reg_key = b'Software\\Policies\\Samba\\Unix Settings'
+        sections = { b'%s\\Daily Scripts' % reg_key : '.cron.daily',
+                     b'%s\\Monthly Scripts' % reg_key : '.cron.monthly',
+                     b'%s\\Weekly Scripts' % reg_key : '.cron.weekly',
+                     b'%s\\Hourly Scripts' % reg_key : '.cron.hourly' }
+        for keyname in sections.keys():
+            # Stage the Registry.pol file with test data
+            stage = preg.file()
+            e = preg.entry()
+            e.keyname = keyname
+            e.valuename = b'Software\\Policies\\Samba\\Unix Settings'
+            e.type = 1
+            e.data = b'echo hello world'
+            stage.num_entries = 1
+            stage.entries = [e]
+            ret = stage_file(reg_pol, ndr_pack(stage))
+            self.assertTrue(ret, 'Could not create the target %s' % reg_pol)
+
+            # Process all gpos, with temp output directory
+            with TemporaryDirectory(sections[keyname]) as dname:
+                ext.process_group_policy([], gpos, dname)
+                scripts = os.listdir(dname)
+                self.assertEquals(len(scripts), 1,
+                    'The %s script was not created' % keyname.decode())
+                out, _ = Popen([os.path.join(dname, scripts[0])], stdout=PIPE).communicate()
+                self.assertIn(b'hello world', out,
+                    '%s script execution failed' % keyname.decode())
+
+            # Unstage the Registry.pol file
+            unstage_file(reg_pol)
+
+    def test_gp_sudoers(self):
+        local_path = self.lp.cache_path('gpo_cache')
+        guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
+        reg_pol = os.path.join(local_path, policies, guid,
+                               'MACHINE/REGISTRY.POL')
+        logger = logging.getLogger('gpo_tests')
+        cache_dir = self.lp.get('cache directory')
+        store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+
+        machine_creds = Credentials()
+        machine_creds.guess(self.lp)
+        machine_creds.set_machine_account()
+
+        # Initialize the group policy extension
+        ext = gp_sudoers_ext(logger, self.lp, machine_creds, store)
+
+        ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
+        if ads.connect():
+            gpos = ads.get_gpo_list(machine_creds.get_username())
+
+        # Stage the Registry.pol file with test data
+        stage = preg.file()
+        e = preg.entry()
+        e.keyname = b'Software\\Policies\\Samba\\Unix Settings\\Sudo Rights'
+        e.valuename = b'Software\\Policies\\Samba\\Unix Settings'
+        e.type = 1
+        e.data = b'fakeu  ALL=(ALL) NOPASSWD: ALL'
+        stage.num_entries = 1
+        stage.entries = [e]
+        ret = stage_file(reg_pol, ndr_pack(stage))
+        self.assertTrue(ret, 'Could not create the target %s' % reg_pol)
+
+        # Process all gpos, with temp output directory
+        with TemporaryDirectory() as dname:
+            ext.process_group_policy([], gpos, dname)
+            sudoers = os.listdir(dname)
+            self.assertEquals(len(sudoers), 1, 'The sudoer file was not created')
+            self.assertIn(e.data,
+                    open(os.path.join(dname, sudoers[0]), 'r').read(),
+                    'The sudoers entry was not applied')
+
+        # Unstage the Registry.pol file
+        unstage_file(reg_pol)
+
+    def test_gp_inf_ext_utf(self):
+        logger = logging.getLogger('gpo_tests')
+        cache_dir = self.lp.get('cache directory')
+        store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+
+        machine_creds = Credentials()
+        machine_creds.guess(self.lp)
+        machine_creds.set_machine_account()
+
+        ext = gp_inf_ext(logger, self.lp, machine_creds, store)
+        test_data = '[Kerberos Policy]\nMaxTicketAge = 99\n'
+
+        with NamedTemporaryFile() as f:
+            with codecs.open(f.name, 'w', 'utf-16') as w:
+                w.write(test_data)
+            try:
+                inf_conf = ext.read(f.name)
+            except UnicodeDecodeError:
+                self.fail('Failed to parse utf-16')
+            self.assertIn('Kerberos Policy', inf_conf.keys(),
+                          'Kerberos Policy was not read from the file')
+            self.assertEquals(inf_conf.get('Kerberos Policy', 'MaxTicketAge'),
+                              '99', 'MaxTicketAge was not read from the file')
+
+        with NamedTemporaryFile() as f:
+            with codecs.open(f.name, 'w', 'utf-8') as w:
+                w.write(test_data)
+            inf_conf = ext.read(f.name)
+            self.assertIn('Kerberos Policy', inf_conf.keys(),
+                          'Kerberos Policy was not read from the file')
+            self.assertEquals(inf_conf.get('Kerberos Policy', 'MaxTicketAge'),
+                              '99', 'MaxTicketAge was not read from the file')
+
+    def test_rsop(self):
+        logger = logging.getLogger('gpo_tests')
+        cache_dir = self.lp.get('cache directory')
+        local_path = self.lp.cache_path('gpo_cache')
+        store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+
+        machine_creds = Credentials()
+        machine_creds.guess(self.lp)
+        machine_creds.set_machine_account()
+
+        ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
+        if ads.connect():
+            gpos = ads.get_gpo_list(machine_creds.get_username())
+
+        gp_extensions = []
+        gp_extensions.append(gp_krb_ext(logger, self.lp, machine_creds, store))
+        gp_extensions.append(gp_scripts_ext(logger, self.lp, machine_creds,
+            store))
+
+        # Create registry stage data
+        reg_pol = os.path.join(local_path, policies, '%s/MACHINE/REGISTRY.POL')
+        reg_stage = preg.file()
+        e = preg.entry()
+        e.keyname = b'Software\\Policies\\Samba\\Unix Settings\\Daily Scripts'
+        e.valuename = b'Software\\Policies\\Samba\\Unix Settings'
+        e.type = 1
+        e.data = b'echo hello world'
+        reg_stage.num_entries = 1
+        reg_stage.entries = [e]
+
+        # Create krb stage date
+        gpofile = os.path.join(local_path, policies, '%s/MACHINE/MICROSOFT/' \
+                  'WINDOWS NT/SECEDIT/GPTTMPL.INF')
+        krb_stage = '[Kerberos Policy]\nMaxTicketAge = 99\n'
+
+        for g in [g for g in gpos if g.file_sys_path]:
+            ret = stage_file(gpofile % g.name, krb_stage)
+            self.assertTrue(ret, 'Could not create the target %s' %
+                                 (gpofile % g.name))
+            ret = stage_file(reg_pol % g.name, ndr_pack(reg_stage))
+            self.assertTrue(ret, 'Could not create the target %s' %
+                                 (reg_pol % g.name))
+            for ext in gp_extensions:
+                ret = ext.rsop(g)
+                self.assertEquals(len(ret.keys()), 1,
+                                  'A single policy should have been displayed')
+
+                # Check the Security Extension
+                if type(ext) == gp_krb_ext:
+                    self.assertIn('Kerberos Policy', ret.keys(),
+                                  'Kerberos Policy not found')
+                    self.assertIn('MaxTicketAge', ret['Kerberos Policy'],
+                                  'MaxTicketAge setting not found')
+                    self.assertEquals(ret['Kerberos Policy']['MaxTicketAge'], '99',
+                                      'MaxTicketAge was not set to 99')
+                # Check the Scripts Extension
+                elif type(ext) == gp_scripts_ext:
+                    self.assertIn('Daily Scripts', ret.keys(),
+                                  'Daily Scripts not found')
+                    self.assertIn('echo hello world', ret['Daily Scripts'],
+                                  'Daily script was not created')
+            unstage_file(gpofile % g.name)
+            unstage_file(reg_pol % g.name)