54f89597eabd1d0b1c84701cacf6a03460fb73d6
[samba.git] / source4 / dsdb / tests / python / ldap_modify_order.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008-2011
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 import optparse
20 import sys
21 import os
22 from itertools import permutations
23 import traceback
24
25 sys.path.insert(0, "bin/python")
26 import samba
27 from samba.tests.subunitrun import SubunitOptions, TestProgram
28 import samba.getopt as options
29
30 from samba.auth import system_session
31 from ldb import SCOPE_BASE, LdbError
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
34 from samba.samdb import SamDB
35
36 from samba.tests import delete_force
37
38 TEST_DATA_DIR = os.path.join(
39     os.path.dirname(__file__),
40     'testdata')
41
42 LDB_STRERR = {}
43 def _build_ldb_strerr():
44     import ldb
45     for k, v in vars(ldb).items():
46         if k.startswith('ERR_') and isinstance(v, int):
47             LDB_STRERR[v] = k
48
49 _build_ldb_strerr()
50
51
52 class ModifyOrderTests(samba.tests.TestCase):
53
54     def setUp(self):
55         super().setUp()
56         self.admin_dsdb = get_dsdb(admin_creds)
57         self.base_dn = self.admin_dsdb.domain_dn()
58
59     def delete_object(self, dn):
60         delete_force(self.admin_dsdb, dn)
61
62     def get_user_dn(self, name):
63         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
64
65     def _test_modify_order(self,
66                            start_attrs,
67                            mod_attrs,
68                            extra_search_attrs=(),
69                            name=None):
70         if name is None:
71             name = traceback.extract_stack()[-2][2][5:]
72
73         if opts.normal_user:
74             name += '-non-admin'
75             username = "user123"
76             password = "pass123@#$@#"
77             self.admin_dsdb.newuser(username, password)
78             self.addCleanup(self.delete_object, self.get_user_dn(username))
79             mod_creds = self.insta_creds(template=admin_creds,
80                                          username=username,
81                                          userpass=password)
82         else:
83             mod_creds = admin_creds
84
85         mod_dsdb = get_dsdb(mod_creds)
86         sig = []
87         op_lut = ['', 'add', 'replace', 'delete']
88
89         search_attrs = set(extra_search_attrs)
90         lines = [name, "initial attrs:"]
91         for k, v in start_attrs:
92             lines.append("%20s: %r" % (k, v))
93             search_attrs.add(k)
94
95         for k, v, op in mod_attrs:
96             search_attrs.add(k)
97
98         search_attrs = sorted(search_attrs)
99         header = "\n".join(lines)
100         sig.append(header)
101
102         clusters = {}
103         for i, attrs in enumerate(permutations(mod_attrs)):
104             # for each permuation we construct a string describing the
105             # requested operations, and a string describing the result
106             # (which may be an exception). The we cluster the
107             # attribute strings by their results.
108             dn = "cn=ldaptest_%s_%d,cn=users,%s" % (name, i, self.base_dn)
109             m = Message()
110             m.dn = Dn(self.admin_dsdb, dn)
111
112             # We are using Message objects here for add (rather than the
113             # more convenient dict) because we maybe care about the order
114             # in which attributes are added.
115
116             for k, v in start_attrs:
117                 m[k] = MessageElement(v, 0, k)
118
119             self.admin_dsdb.add(m)
120             self.addCleanup(self.delete_object, dn)
121
122             m = Message()
123             m.dn = Dn(mod_dsdb, dn)
124
125             attr_lines = []
126             for k, v, op in attrs:
127                 if v is None:
128                     v = dn
129                 m[k] = MessageElement(v, op, k)
130                 attr_lines.append("%16s %-8s %s" % (k, op_lut[op], v))
131
132             attr_str = '\n'.join(attr_lines)
133
134             try:
135                 mod_dsdb.modify(m)
136             except LdbError as e:
137                 err, _ = e.args
138                 s = LDB_STRERR.get(err, "unknown error")
139                 result_str = "%s (%d)" % (s, err)
140             else:
141                 res = self.admin_dsdb.search(base=dn, scope=SCOPE_BASE,
142                                              attrs=search_attrs)
143
144                 lines = []
145                 for k, v in sorted(dict(res[0]).items()):
146                     if k != "dn" or k in extra_search_attrs:
147                         lines.append("%20s: %r" % (k, sorted(v)))
148
149                 result_str = '\n'.join(lines)
150
151             clusters.setdefault(result_str, []).append(attr_str)
152
153         for s, attrs in sorted(clusters.items()):
154             sig.extend([
155                 "== result ===[%3d]=======================" % len(attrs),
156                 s,
157                 "-- operations ---------------------------"])
158             for a in attrs:
159                 sig.append(a)
160                 sig.append("-" * 34)
161
162         sig = '\n'.join(sig).replace(self.base_dn, "{base dn}")
163
164         if opts.verbose:
165             print(sig)
166
167         if opts.rewrite_ground_truth:
168             f = open(os.path.join(TEST_DATA_DIR, name + '.expected'), 'w')
169             f.write(sig)
170             f.close()
171         f = open(os.path.join(TEST_DATA_DIR, name + '.expected'))
172         expected = f.read()
173         f.close()
174
175         self.assertStringsEqual(sig, expected)
176
177     def test_modify_order_mixed(self):
178         start_attrs = [("objectclass", "user"),
179                        ("carLicense", ["1", "2", "3"]),
180                        ("otherTelephone", "123")]
181
182         mod_attrs = [("carLicense", "3", FLAG_MOD_DELETE),
183                      ("carLicense", "4", FLAG_MOD_ADD),
184                      ("otherTelephone", "4", FLAG_MOD_REPLACE),
185                      ("otherTelephone", "123", FLAG_MOD_DELETE)]
186         self._test_modify_order(start_attrs, mod_attrs)
187
188     def test_modify_order_mixed2(self):
189         start_attrs = [("objectclass", "user"),
190                        ("carLicense", ["1", "2", "3"]),
191                        ("ipPhone", "123")]
192
193         mod_attrs = [("carLicense", "3", FLAG_MOD_DELETE),
194                      ("carLicense", "4", FLAG_MOD_ADD),
195                      ("ipPhone", "4", FLAG_MOD_REPLACE),
196                      ("ipPhone", "123", FLAG_MOD_DELETE)]
197         self._test_modify_order(start_attrs, mod_attrs)
198
199     def test_modify_order_telephone(self):
200         start_attrs = [("objectclass", "user"),
201                        ("otherTelephone", "123")]
202
203         mod_attrs = [("carLicense", "3", FLAG_MOD_REPLACE),
204                      ("carLicense", "4", FLAG_MOD_ADD),
205                      ("otherTelephone", "4", FLAG_MOD_REPLACE),
206                      ("otherTelephone", "4", FLAG_MOD_ADD),
207                      ("otherTelephone", "123", FLAG_MOD_DELETE)]
208         self._test_modify_order(start_attrs, mod_attrs)
209
210     def test_modify_order_telephone_delete_delete(self):
211         start_attrs = [("objectclass", "user"),
212                        ("otherTelephone", "123")]
213
214         mod_attrs = [("carLicense", "3", FLAG_MOD_REPLACE),
215                      ("carLicense", "4", FLAG_MOD_DELETE),
216                      ("otherTelephone", "4", FLAG_MOD_REPLACE),
217                      ("otherTelephone", "4", FLAG_MOD_DELETE),
218                      ("otherTelephone", "123", FLAG_MOD_DELETE)]
219         self._test_modify_order(start_attrs, mod_attrs)
220
221     def test_modify_order_objectclass(self):
222         start_attrs = [("objectclass", "user"),
223                        ("otherTelephone", "123")]
224
225         mod_attrs = [("objectclass", "computer", FLAG_MOD_REPLACE),
226                      ("objectclass", "user", FLAG_MOD_DELETE),
227                      ("objectclass", "person", FLAG_MOD_DELETE)]
228         self._test_modify_order(start_attrs, mod_attrs)
229
230     def test_modify_order_objectclass2(self):
231         start_attrs = [("objectclass", "user")]
232
233         mod_attrs = [("objectclass", "computer", FLAG_MOD_REPLACE),
234                      ("objectclass", "user", FLAG_MOD_ADD),
235                      ("objectclass", "attributeSchema", FLAG_MOD_REPLACE),
236                      ("objectclass", "inetOrgPerson", FLAG_MOD_ADD),
237                      ("objectclass", "person", FLAG_MOD_DELETE)]
238         self._test_modify_order(start_attrs, mod_attrs)
239
240     def test_modify_order_singlevalue(self):
241         start_attrs = [("objectclass", "user"),
242                        ("givenName", "a")]
243
244         mod_attrs = [("givenName", "a", FLAG_MOD_REPLACE),
245                      ("givenName", ["b", "a"], FLAG_MOD_REPLACE),
246                      ("givenName", "b", FLAG_MOD_DELETE),
247                      ("givenName", "a", FLAG_MOD_DELETE),
248                      ("givenName", "c", FLAG_MOD_ADD)]
249         self._test_modify_order(start_attrs, mod_attrs)
250
251     def test_modify_order_inapplicable(self):
252         #attrbutes that don't go on a user
253         start_attrs = [("objectclass", "user"),
254                        ("givenName", "a")]
255
256         mod_attrs = [("dhcpSites", "b", FLAG_MOD_REPLACE),
257                      ("dhcpSites", "b", FLAG_MOD_DELETE),
258                      ("dhcpSites", "c", FLAG_MOD_ADD)]
259         self._test_modify_order(start_attrs, mod_attrs)
260
261     def test_modify_order_sometimes_inapplicable(self):
262         # attributes that don't go on a user, but do on a computer,
263         # which we sometimes change into.
264         start_attrs = [("objectclass", "user"),
265                        ("givenName", "a")]
266
267         mod_attrs = [("objectclass", "computer", FLAG_MOD_REPLACE),
268                      ("objectclass", "person", FLAG_MOD_DELETE),
269                      ("dnsHostName", "b", FLAG_MOD_ADD),
270                      ("dnsHostName", "c", FLAG_MOD_REPLACE)]
271         self._test_modify_order(start_attrs, mod_attrs)
272
273     def test_modify_order_account_locality_device(self):
274         # account, locality, and device all take l (locality name) but
275         # only device takes owner. We shouldn't be able to change
276         # objectclass at all.
277         start_attrs = [("objectclass", "account"),
278                        ("l", "a")]
279
280         mod_attrs = [("objectclass", ["device", "top"], FLAG_MOD_REPLACE),
281                      ("l", "a", FLAG_MOD_DELETE),
282                      ("owner", "c", FLAG_MOD_ADD)
283         ]
284         self._test_modify_order(start_attrs, mod_attrs)
285
286     def test_modify_order_container_flags_multivalue(self):
287         # account, locality, and device all take l (locality name)
288         # but only device takes owner
289         start_attrs = [("objectclass", "container"),
290                        ("wWWHomePage", "a")]
291
292         mod_attrs = [("flags", ["0", "1"], FLAG_MOD_ADD),
293                      ("flags", "65355", FLAG_MOD_ADD),
294                      ("flags", "65355", FLAG_MOD_DELETE),
295                      ("flags", ["2", "101"], FLAG_MOD_REPLACE),
296         ]
297         self._test_modify_order(start_attrs, mod_attrs)
298
299     def test_modify_order_container_flags(self):
300         #flags should be an integer
301         start_attrs = [("objectclass", "container")]
302
303         mod_attrs = [("flags", "0x6", FLAG_MOD_ADD),
304                      ("flags", "5", FLAG_MOD_ADD),
305                      ("flags", "101", FLAG_MOD_REPLACE),
306                      ("flags", "c", FLAG_MOD_DELETE),
307         ]
308         self._test_modify_order(start_attrs, mod_attrs)
309
310     def test_modify_order_member(self):
311         name = "modify_order_member_other_group"
312
313         dn2 = "cn=%s,%s" % (name, self.base_dn)
314         m = Message()
315         m.dn = Dn(self.admin_dsdb, dn2)
316         self.admin_dsdb.add({"dn": dn2, "objectclass": "group"})
317         self.addCleanup(self.delete_object, dn2)
318
319         start_attrs = [("objectclass", "group"),
320                        ("member", dn2)]
321
322         mod_attrs = [("member", None, FLAG_MOD_DELETE),
323                      ("member", None, FLAG_MOD_REPLACE),
324                      ("member", dn2, FLAG_MOD_DELETE),
325                      ("member", None, FLAG_MOD_ADD),
326         ]
327         self._test_modify_order(start_attrs, mod_attrs, ["memberOf"])
328
329
330 def get_dsdb(creds=None):
331     if creds is None:
332         creds = admin_creds
333     dsdb = SamDB(host,
334                  credentials=creds,
335                  session_info=system_session(lp),
336                  lp=lp)
337     return dsdb
338
339
340 parser = optparse.OptionParser("ldap_modify_order.py [options] <host>")
341 sambaopts = options.SambaOptions(parser)
342 parser.add_option_group(sambaopts)
343 parser.add_option_group(options.VersionOptions(parser))
344 credopts = options.CredentialsOptions(parser)
345 parser.add_option_group(credopts)
346 subunitopts = SubunitOptions(parser)
347 parser.add_option_group(subunitopts)
348 parser.add_option("--rewrite-ground-truth", action="store_true",
349                   help="write expected values")
350 parser.add_option("-v", "--verbose", action="store_true")
351 parser.add_option("--normal-user", action="store_true")
352
353 opts, args = parser.parse_args()
354
355 if len(args) < 1:
356     parser.print_usage()
357     sys.exit(1)
358
359 host = args[0]
360
361 lp = sambaopts.get_loadparm()
362 admin_creds = credopts.get_credentials(lp)
363
364 if "://" not in host:
365     if os.path.isfile(host):
366         host = "tdb://%s" % host
367     else:
368         host = "ldap://%s" % host
369
370
371 TestProgram(module=__name__, opts=subunitopts)