78759e196be44bcc8c40825d54e2f19e953d60ef
[kai/samba.git] / source4 / torture / drs / python / repl_schema.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2010
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 #
23 # Usage:
24 #  export DC1=dc1_dns_name
25 #  export DC2=dc2_dns_name
26 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN repl_schema -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
28 #
29
30 import sys
31 import time
32 import random
33 import os
34
35 sys.path.append("bin/python")
36
37 from samba.auth import system_session
38 from ldb import LdbError, ERR_NO_SUCH_OBJECT
39 from ldb import SCOPE_BASE, SCOPE_SUBTREE
40 from samba.samdb import SamDB
41
42 import samba.tests
43
44
45 class DrsReplSchemaTestCase(samba.tests.TestCase):
46
47     # RootDSE msg for DC1
48     info_dc1 = None
49     ldb_dc1 = None
50     # RootDSE msg for DC1
51     info_dc2 = None
52     ldb_dc2 = None
53     # prefix for all objects created
54     obj_prefix = None
55
56     def setUp(self):
57         super(DrsReplSchemaTestCase, self).setUp()
58
59         # connect to DCs singleton
60         self._dc_connect("dc1", "DC1", ldap_only=True)
61         self._dc_connect("dc2", "DC2", ldap_only=True)
62
63         # initialize objects prefix if not done yet
64         if self.obj_prefix is None:
65             t = time.strftime("%s", time.gmtime())
66             DrsReplSchemaTestCase.obj_prefix = "DrsReplSchema-%s-" % t
67
68         # cache some of RootDSE props
69         self.schema_dn = self.info_dc1["schemaNamingContext"][0]
70         self.domain_dn = self.info_dc1["defaultNamingContext"][0]
71         self.config_dn = self.info_dc1["configurationNamingContext"][0]
72         self.forest_level = int(self.info_dc1["forestFunctionality"][0])
73
74         # we will need DCs DNS names for 'samba-tool drs' command
75         self.dnsname_dc1 = self.info_dc1["dnsHostName"][0]
76         self.dnsname_dc2 = self.info_dc2["dnsHostName"][0]
77
78     def tearDown(self):
79         super(DrsReplSchemaTestCase, self).tearDown()
80
81     @classmethod
82     def _dc_connect(cls, attr_name, env_var, ldap_only=True):
83         ldb_dc = None
84         attr_name_ldb = "ldb_" + attr_name
85         if hasattr(cls, attr_name_ldb):
86             ldb_dc = getattr(cls, attr_name_ldb)
87         if ldb_dc is None:
88             url_dc = samba.tests.env_get_var_value(env_var)
89             ldb_dc = samba.tests.connect_samdb(url_dc, ldap_only=ldap_only)
90             res = ldb_dc.search(base="", expression="", scope=SCOPE_BASE, attrs=["*"])
91             info_dc = res[0]
92             setattr(cls, "ldb_" + attr_name, ldb_dc)
93             setattr(cls, "url_" + attr_name, url_dc)
94             setattr(cls, "info_" + attr_name, info_dc)
95         return ldb_dc
96
97     def _net_drs_replicate(self, DC, fromDC, nc_dn):
98         """Triggers replication cycle on 'DC' to
99            replicate from 'fromDC'. Naming context to
100            be replicated is 'nc_dn' dn"""
101         # find out where is net command
102         samba_tool_cmd = os.path.abspath("./bin/samba-tool")
103         # make command line credentials string
104         creds = samba.tests.cmdline_credentials
105         cmd_line_auth = "-U%s/%s%%%s" % (creds.get_domain(),
106                                          creds.get_username(), creds.get_password())
107         # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
108         cmd_line = "%s drs replicate %s %s %s %s" % (samba_tool_cmd, DC, fromDC,
109                                                      nc_dn, cmd_line_auth)
110         ret = os.system(cmd_line)
111         self.assertEquals(ret, 0, "Replicating %s from %s has failed!" % (DC, fromDC))
112
113     def _GUID_string(self, guid):
114         return self.ldb_dc1.schema_format_value("objectGUID", guid)
115
116     def _ldap_schemaUpdateNow(self, sam_db):
117         ldif = """
118 dn:
119 changetype: modify
120 add: schemaUpdateNow
121 schemaUpdateNow: 1
122 """
123         sam_db.modify_ldif(ldif)
124
125     def _make_obj_names(self, base_name):
126         '''Try to create a unique name for an object
127            that is to be added to schema'''
128         obj_name = self.obj_prefix + base_name
129         obj_dn = "CN=%s,%s" % (obj_name, self.schema_dn)
130         return (obj_name, obj_dn)
131
132     def _make_class_ldif(self, class_name, class_dn, attrs=None):
133         ldif = """
134 dn: """ + class_dn + """
135 objectClass: top
136 objectClass: classSchema
137 cn: """ + class_name + """
138 governsId: 1.2.840.""" + str(random.randint(1,100000)) + """.1.5.13
139 instanceType: 4
140 objectClassCategory: 1
141 subClassOf: organizationalPerson
142 systemOnly: FALSE
143 """
144         return ldif
145
146     def _check_object(self, obj_dn):
147         '''Check if object obj_dn exists on both DCs'''
148         res_dc1 = self.ldb_dc1.search(base=obj_dn,
149                                       scope=SCOPE_BASE,
150                                       attrs=["*"])
151         self.assertEquals(len(res_dc1), 1,
152                           "%s doesn't exists on %s" % (obj_dn, self.dnsname_dc1))
153         try:
154             res_dc2 = self.ldb_dc2.search(base=obj_dn,
155                                           scope=SCOPE_BASE,
156                                           attrs=["*"])
157         except LdbError, (ERR_NO_SUCH_OBJECT, _):
158             self.fail("%s doesn't exists on %s" % (obj_dn, self.dnsname_dc2))
159         self.assertEquals(len(res_dc2), 1,
160                           "%s doesn't exists on %s" % (obj_dn, self.dnsname_dc2))
161
162     def test_all(self):
163         """Basic plan is to create bunch of classSchema
164            and attributeSchema objects, replicate Schema NC
165            and then check all objects are replicated correctly"""
166
167         # add new classSchema object
168         (class_name, class_dn) = self._make_obj_names("cls-A")
169         ldif = self._make_class_ldif(class_name, class_dn)
170         self.ldb_dc1.add_ldif(ldif)
171         self._ldap_schemaUpdateNow(self.ldb_dc1)
172         # force replication from DC1 to DC2
173         self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, nc_dn=self.schema_dn)
174         # check object is replicated
175         self._check_object(class_dn)