Merge commit 'release-4-0-0alpha15' into master4-tmp
[nivanova/samba-autobuild/.git] / source4 / torture / drs / python / getnc_exop.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 #
23 # Usage:
24 #  export DC1=dc1_dns_name
25 #  export DC2=dc2_dns_name
26 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
28 #
29
30 import drs_base
31 import samba.tests
32
33 from ldb import SCOPE_BASE
34
35 from samba.dcerpc import drsuapi, misc, drsblobs
36 from samba.drs_utils import drs_DsBind
37
38
39 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
40     """Intended as a semi-black box test case for DsGetNCChanges
41        implementation for extended operations. It should be testing
42        how DsGetNCChanges handles different input params (mostly invalid).
43        Final goal is to make DsGetNCChanges as binary compatible to
44        Windows implementation as possible"""
45
46     def setUp(self):
47         super(DrsReplicaSyncTestCase, self).setUp()
48
49     def tearDown(self):
50         super(DrsReplicaSyncTestCase, self).tearDown()
51
52     def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop):
53         req8 = drsuapi.DsGetNCChangesRequest8()
54     
55         req8.destination_dsa_guid = misc.GUID(dest_dsa)
56         req8.source_dsa_invocation_id = misc.GUID(invocation_id)
57         req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
58         req8.naming_context.dn = unicode(nc_dn_str)
59         req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
60         req8.highwatermark.tmp_highest_usn = 0
61         req8.highwatermark.reserved_usn = 0
62         req8.highwatermark.highest_usn = 0
63         req8.uptodateness_vector = None
64         req8.replica_flags = 0
65         req8.max_object_count = 0
66         req8.max_ndr_size = 402116
67         req8.extended_op = exop
68         req8.fsmo_info = 0
69         req8.partial_attribute_set = None
70         req8.partial_attribute_set_ex = None
71         req8.mapping_ctr.num_mappings = 0
72         req8.mapping_ctr.mappings = None
73     
74         return req8
75
76     def _ds_bind(self, server_name):
77         binding_str = "ncacn_ip_tcp:%s[print,seal]" % server_name
78
79         drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
80         (drs_handle, supported_extensions) = drs_DsBind(drs)
81         return (drs, drs_handle)
82
83     def _determine_fSMORoleOwner(self, fsmo_obj_dn):
84         """Returns (owner, not_owner) pair where:
85              owner: dns name for FSMO owner
86              not_owner: dns name for DC not owning the FSMO"""
87         # collect info to return later
88         fsmo_info_1 = {"dns_name": self.dnsname_dc1,
89                        "invocation_id": self.ldb_dc1.get_invocation_id(),
90                        "ntds_guid": self.ldb_dc1.get_ntds_GUID()}
91         fsmo_info_2 = {"dns_name": self.dnsname_dc2,
92                        "invocation_id": self.ldb_dc2.get_invocation_id(),
93                        "ntds_guid": self.ldb_dc2.get_ntds_GUID()}
94         # determine the owner dc
95         res = self.ldb_dc1.search(fsmo_obj_dn,
96                                   scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
97         assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
98         fsmo_owner = res[0]["fSMORoleOwner"][0]
99         if fsmo_owner == self.info_dc1["dsServiceName"][0]:
100             return (fsmo_info_1, fsmo_info_2)
101         return (fsmo_info_2, fsmo_info_1)
102
103     def _check_exop_failed(self, ctr6, expected_failure):
104         self.assertEqual(ctr6.extended_ret, expected_failure)
105         #self.assertEqual(ctr6.object_count, 0)
106         #self.assertEqual(ctr6.first_object, None)
107         self.assertEqual(ctr6.more_data, False)
108         self.assertEqual(ctr6.nc_object_count, 0)
109         self.assertEqual(ctr6.nc_linked_attributes_count, 0)
110         self.assertEqual(ctr6.linked_attributes_count, 0)
111         self.assertEqual(ctr6.linked_attributes, None)
112         self.assertEqual(ctr6.drs_error[0], 0)
113
114     def test_FSMONotOwner(self):
115         """Test role transfer with against DC not owner of the role"""
116         fsmo_dn = self.ldb_dc1.get_schema_basedn()
117         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
118         
119         req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
120                                invocation_id=fsmo_not_owner["invocation_id"],
121                                nc_dn_str=fsmo_dn,
122                                exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
123
124         (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
125         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
126         self.assertEqual(level, 6, "Expected level 6 response!")
127         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_FSMO_NOT_OWNER)
128         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
129         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
130
131     def test_InvalidDestDSA(self):
132         """Test role transfer with invalid destination DSA guid"""
133         fsmo_dn = self.ldb_dc1.get_schema_basedn()
134         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
135
136         req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
137                                invocation_id=fsmo_owner["invocation_id"],
138                                nc_dn_str=fsmo_dn,
139                                exop=drsuapi.DRSUAPI_EXOP_FSMO_REQ_ROLE)
140
141         (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
142         (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
143         self.assertEqual(level, 6, "Expected level 6 response!")
144         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
145         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
146         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))