PEP8: fix E305: expected 2 blank lines after class or function definition, found 1
[bbaumbach/samba-autobuild/.git] / source4 / dsdb / tests / python / token_group.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # test tokengroups attribute against internal token calculation
4
5 from __future__ import print_function
6 import optparse
7 import sys
8 import os
9
10 sys.path.insert(0, "bin/python")
11 import samba
12
13 from samba.tests.subunitrun import SubunitOptions, TestProgram
14
15 import samba.getopt as options
16
17 from samba.auth import system_session
18 from samba import ldb, dsdb
19 from samba.samdb import SamDB
20 from samba.auth import AuthContext
21 from samba.ndr import ndr_unpack
22 from samba import gensec
23 from samba.credentials import Credentials, DONT_USE_KERBEROS
24 from samba.dsdb import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP
25 import samba.tests
26 from samba.tests import delete_force
27 from samba.dcerpc import samr, security
28 from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES, AUTH_SESSION_INFO_NTLM
29
30
31 parser = optparse.OptionParser("token_group.py [options] <host>")
32 sambaopts = options.SambaOptions(parser)
33 parser.add_option_group(sambaopts)
34 parser.add_option_group(options.VersionOptions(parser))
35 # use command line creds if available
36 credopts = options.CredentialsOptions(parser)
37 parser.add_option_group(credopts)
38 subunitopts = SubunitOptions(parser)
39 parser.add_option_group(subunitopts)
40 opts, args = parser.parse_args()
41
42 if len(args) < 1:
43     parser.print_usage()
44     sys.exit(1)
45
46 url = args[0]
47
48 lp = sambaopts.get_loadparm()
49 creds = credopts.get_credentials(lp)
50 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
51
52
53 def closure(vSet, wSet, aSet):
54     for edge in aSet:
55         start, end = edge
56         if start in wSet:
57             if end not in wSet and end in vSet:
58                 wSet.add(end)
59                 closure(vSet, wSet, aSet)
60
61
62 class StaticTokenTest(samba.tests.TestCase):
63
64     def setUp(self):
65         super(StaticTokenTest, self).setUp()
66         self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
67         self.base_dn = self.ldb.domain_dn()
68
69         res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
70         self.assertEquals(len(res), 1)
71
72         self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
73
74         session_info_flags = (AUTH_SESSION_INFO_DEFAULT_GROUPS |
75                                AUTH_SESSION_INFO_AUTHENTICATED |
76                                AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
77         if creds.get_kerberos_state() == DONT_USE_KERBEROS:
78             session_info_flags |= AUTH_SESSION_INFO_NTLM
79
80         session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn,
81                                           session_info_flags=session_info_flags)
82
83         token = session.security_token
84         self.user_sids = []
85         for s in token.sids:
86             self.user_sids.append(str(s))
87
88     def test_rootDSE_tokenGroups(self):
89         """Testing rootDSE tokengroups against internal calculation"""
90         if not url.startswith("ldap"):
91             self.fail(msg="This test is only valid on ldap")
92
93         res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
94         self.assertEquals(len(res), 1)
95
96         print("Getting tokenGroups from rootDSE")
97         tokengroups = []
98         for sid in res[0]['tokenGroups']:
99             tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
100
101         sidset1 = set(tokengroups)
102         sidset2 = set(self.user_sids)
103         if len(sidset1.difference(sidset2)):
104             print("token sids don't match")
105             print("tokengroups: %s" % tokengroups)
106             print("calculated : %s" % self.user_sids)
107             print("difference : %s" % sidset1.difference(sidset2))
108             self.fail(msg="calculated groups don't match against rootDSE tokenGroups")
109
110     def test_dn_tokenGroups(self):
111         print("Getting tokenGroups from user DN")
112         res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
113         self.assertEquals(len(res), 1)
114
115         dn_tokengroups = []
116         for sid in res[0]['tokenGroups']:
117             dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
118
119         sidset1 = set(dn_tokengroups)
120         sidset2 = set(self.user_sids)
121         if len(sidset1.difference(sidset2)):
122             print("token sids don't match")
123             print("difference : %s" % sidset1.difference(sidset2))
124             self.fail(msg="calculated groups don't match against user DN tokenGroups")
125
126     def test_pac_groups(self):
127         if creds.get_kerberos_state() == DONT_USE_KERBEROS:
128             self.skipTest("Kerberos disabled, skipping PAC test")
129
130         settings = {}
131         settings["lp_ctx"] = lp
132         settings["target_hostname"] = lp.get("netbios name")
133
134         gensec_client = gensec.Security.start_client(settings)
135         gensec_client.set_credentials(creds)
136         gensec_client.want_feature(gensec.FEATURE_SEAL)
137         gensec_client.start_mech_by_sasl_name("GSSAPI")
138
139         auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[])
140
141         gensec_server = gensec.Security.start_server(settings, auth_context)
142         machine_creds = Credentials()
143         machine_creds.guess(lp)
144         machine_creds.set_machine_account(lp)
145         gensec_server.set_credentials(machine_creds)
146
147         gensec_server.want_feature(gensec.FEATURE_SEAL)
148         gensec_server.start_mech_by_sasl_name("GSSAPI")
149
150         client_finished = False
151         server_finished = False
152         server_to_client = ""
153
154         # Run the actual call loop.
155         while client_finished == False and server_finished == False:
156             if not client_finished:
157                 print("running client gensec_update")
158                 (client_finished, client_to_server) = gensec_client.update(server_to_client)
159             if not server_finished:
160                 print("running server gensec_update")
161                 (server_finished, server_to_client) = gensec_server.update(client_to_server)
162
163         session = gensec_server.session_info()
164
165         token = session.security_token
166         pac_sids = []
167         for s in token.sids:
168             pac_sids.append(str(s))
169
170         sidset1 = set(pac_sids)
171         sidset2 = set(self.user_sids)
172         if len(sidset1.difference(sidset2)):
173             print("token sids don't match")
174             print("difference : %s" % sidset1.difference(sidset2))
175             self.fail(msg="calculated groups don't match against user PAC tokenGroups")
176
177
178 class DynamicTokenTest(samba.tests.TestCase):
179
180     def get_creds(self, target_username, target_password):
181         creds_tmp = Credentials()
182         creds_tmp.set_username(target_username)
183         creds_tmp.set_password(target_password)
184         creds_tmp.set_domain(creds.get_domain())
185         creds_tmp.set_realm(creds.get_realm())
186         creds_tmp.set_workstation(creds.get_workstation())
187         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
188                                       | gensec.FEATURE_SEAL)
189         return creds_tmp
190
191     def get_ldb_connection(self, target_username, target_password):
192         creds_tmp = self.get_creds(target_username, target_password)
193         ldb_target = SamDB(url=url, credentials=creds_tmp, lp=lp)
194         return ldb_target
195
196     def setUp(self):
197         super(DynamicTokenTest, self).setUp()
198         self.admin_ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
199
200         self.base_dn = self.admin_ldb.domain_dn()
201
202         self.test_user = "tokengroups_user1"
203         self.test_user_pass = "samba123@"
204         self.admin_ldb.newuser(self.test_user, self.test_user_pass)
205         self.test_group0 = "tokengroups_group0"
206         self.admin_ldb.newgroup(self.test_group0, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
207         res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group0, self.base_dn),
208                                     attrs=["objectSid"], scope=ldb.SCOPE_BASE)
209         self.test_group0_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
210
211         self.admin_ldb.add_remove_group_members(self.test_group0, [self.test_user],
212                                                 add_members_operation=True)
213
214         self.test_group1 = "tokengroups_group1"
215         self.admin_ldb.newgroup(self.test_group1, grouptype=dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
216         res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group1, self.base_dn),
217                                     attrs=["objectSid"], scope=ldb.SCOPE_BASE)
218         self.test_group1_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
219
220         self.admin_ldb.add_remove_group_members(self.test_group1, [self.test_user],
221                                                 add_members_operation=True)
222
223         self.test_group2 = "tokengroups_group2"
224         self.admin_ldb.newgroup(self.test_group2, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
225
226         res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group2, self.base_dn),
227                                     attrs=["objectSid"], scope=ldb.SCOPE_BASE)
228         self.test_group2_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
229
230         self.admin_ldb.add_remove_group_members(self.test_group2, [self.test_user],
231                                                 add_members_operation=True)
232
233         self.test_group3 = "tokengroups_group3"
234         self.admin_ldb.newgroup(self.test_group3, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
235
236         res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group3, self.base_dn),
237                                     attrs=["objectSid"], scope=ldb.SCOPE_BASE)
238         self.test_group3_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
239
240         self.admin_ldb.add_remove_group_members(self.test_group3, [self.test_group1],
241                                                 add_members_operation=True)
242
243         self.test_group4 = "tokengroups_group4"
244         self.admin_ldb.newgroup(self.test_group4, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
245
246         res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group4, self.base_dn),
247                                     attrs=["objectSid"], scope=ldb.SCOPE_BASE)
248         self.test_group4_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
249
250         self.admin_ldb.add_remove_group_members(self.test_group4, [self.test_group3],
251                                                 add_members_operation=True)
252
253         self.test_group5 = "tokengroups_group5"
254         self.admin_ldb.newgroup(self.test_group5, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
255
256         res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group5, self.base_dn),
257                                     attrs=["objectSid"], scope=ldb.SCOPE_BASE)
258         self.test_group5_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
259
260         self.admin_ldb.add_remove_group_members(self.test_group5, [self.test_group4],
261                                                 add_members_operation=True)
262
263         self.test_group6 = "tokengroups_group6"
264         self.admin_ldb.newgroup(self.test_group6, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
265
266         res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group6, self.base_dn),
267                                     attrs=["objectSid"], scope=ldb.SCOPE_BASE)
268         self.test_group6_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
269
270         self.admin_ldb.add_remove_group_members(self.test_group6, [self.test_user],
271                                                 add_members_operation=True)
272
273         self.ldb = self.get_ldb_connection(self.test_user, self.test_user_pass)
274
275         res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
276         self.assertEquals(len(res), 1)
277
278         self.user_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0])
279         self.user_sid_dn = "<SID=%s>" % str(self.user_sid)
280
281         res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=[])
282         self.assertEquals(len(res), 1)
283
284         self.test_user_dn = res[0].dn
285
286         session_info_flags = (AUTH_SESSION_INFO_DEFAULT_GROUPS |
287                                AUTH_SESSION_INFO_AUTHENTICATED |
288                                AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
289
290         if creds.get_kerberos_state() == DONT_USE_KERBEROS:
291             session_info_flags |= AUTH_SESSION_INFO_NTLM
292
293         session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn,
294                                           session_info_flags=session_info_flags)
295
296         token = session.security_token
297         self.user_sids = []
298         for s in token.sids:
299             self.user_sids.append(str(s))
300
301     def tearDown(self):
302         super(DynamicTokenTest, self).tearDown()
303         delete_force(self.admin_ldb, "CN=%s,%s,%s" %
304                      (self.test_user, "cn=users", self.base_dn))
305         delete_force(self.admin_ldb, "CN=%s,%s,%s" %
306                      (self.test_group0, "cn=users", self.base_dn))
307         delete_force(self.admin_ldb, "CN=%s,%s,%s" %
308                      (self.test_group1, "cn=users", self.base_dn))
309         delete_force(self.admin_ldb, "CN=%s,%s,%s" %
310                      (self.test_group2, "cn=users", self.base_dn))
311         delete_force(self.admin_ldb, "CN=%s,%s,%s" %
312                      (self.test_group3, "cn=users", self.base_dn))
313         delete_force(self.admin_ldb, "CN=%s,%s,%s" %
314                      (self.test_group4, "cn=users", self.base_dn))
315         delete_force(self.admin_ldb, "CN=%s,%s,%s" %
316                      (self.test_group5, "cn=users", self.base_dn))
317         delete_force(self.admin_ldb, "CN=%s,%s,%s" %
318                      (self.test_group6, "cn=users", self.base_dn))
319
320     def test_rootDSE_tokenGroups(self):
321         """Testing rootDSE tokengroups against internal calculation"""
322         if not url.startswith("ldap"):
323             self.fail(msg="This test is only valid on ldap")
324
325         res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
326         self.assertEquals(len(res), 1)
327
328         print("Getting tokenGroups from rootDSE")
329         tokengroups = []
330         for sid in res[0]['tokenGroups']:
331             tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
332
333         sidset1 = set(tokengroups)
334         sidset2 = set(self.user_sids)
335         if len(sidset1.difference(sidset2)):
336             print("token sids don't match")
337             print("tokengroups: %s" % tokengroups)
338             print("calculated : %s" % self.user_sids)
339             print("difference : %s" % sidset1.difference(sidset2))
340             self.fail(msg="calculated groups don't match against rootDSE tokenGroups")
341
342     def test_dn_tokenGroups(self):
343         print("Getting tokenGroups from user DN")
344         res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
345         self.assertEquals(len(res), 1)
346
347         dn_tokengroups = []
348         for sid in res[0]['tokenGroups']:
349             dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
350
351         sidset1 = set(dn_tokengroups)
352         sidset2 = set(self.user_sids)
353
354         # The SIDs on the DN do not include the NTLM authentication SID
355         sidset2.discard(samba.dcerpc.security.SID_NT_NTLM_AUTHENTICATION)
356
357         if len(sidset1.difference(sidset2)):
358             print("token sids don't match")
359             print("difference : %s" % sidset1.difference(sidset2))
360             self.fail(msg="calculated groups don't match against user DN tokenGroups")
361
362     def test_pac_groups(self):
363         settings = {}
364         settings["lp_ctx"] = lp
365         settings["target_hostname"] = lp.get("netbios name")
366
367         gensec_client = gensec.Security.start_client(settings)
368         gensec_client.set_credentials(self.get_creds(self.test_user, self.test_user_pass))
369         gensec_client.want_feature(gensec.FEATURE_SEAL)
370         gensec_client.start_mech_by_sasl_name("GSSAPI")
371
372         auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[])
373
374         gensec_server = gensec.Security.start_server(settings, auth_context)
375         machine_creds = Credentials()
376         machine_creds.guess(lp)
377         machine_creds.set_machine_account(lp)
378         gensec_server.set_credentials(machine_creds)
379
380         gensec_server.want_feature(gensec.FEATURE_SEAL)
381         gensec_server.start_mech_by_sasl_name("GSSAPI")
382
383         client_finished = False
384         server_finished = False
385         server_to_client = ""
386
387         # Run the actual call loop.
388         while client_finished == False and server_finished == False:
389             if not client_finished:
390                 print("running client gensec_update")
391                 (client_finished, client_to_server) = gensec_client.update(server_to_client)
392             if not server_finished:
393                 print("running server gensec_update")
394                 (server_finished, server_to_client) = gensec_server.update(client_to_server)
395
396         session = gensec_server.session_info()
397
398         token = session.security_token
399         pac_sids = []
400         for s in token.sids:
401             pac_sids.append(str(s))
402
403         sidset1 = set(pac_sids)
404         sidset2 = set(self.user_sids)
405         if len(sidset1.difference(sidset2)):
406             print("token sids don't match")
407             print("difference : %s" % sidset1.difference(sidset2))
408             self.fail(msg="calculated groups don't match against user PAC tokenGroups")
409
410     def test_tokenGroups_manual(self):
411         # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
412         # and compare the result
413         res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
414                                     expression="(|(objectclass=user)(objectclass=group))",
415                                     attrs=["memberOf"])
416         aSet = set()
417         aSetR = set()
418         vSet = set()
419         for obj in res:
420             if "memberOf" in obj:
421                 for dn in obj["memberOf"]:
422                     first = obj.dn.get_casefold()
423                     second = ldb.Dn(self.admin_ldb, dn.decode('utf8')).get_casefold()
424                     aSet.add((first, second))
425                     aSetR.add((second, first))
426                     vSet.add(first)
427                     vSet.add(second)
428
429         res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
430                                     expression="(objectclass=user)",
431                                     attrs=["primaryGroupID"])
432         for obj in res:
433             if "primaryGroupID" in obj:
434                 sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0]))
435                 res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
436                                              attrs=[])
437                 first = obj.dn.get_casefold()
438                 second = res2[0].dn.get_casefold()
439
440                 aSet.add((first, second))
441                 aSetR.add((second, first))
442                 vSet.add(first)
443                 vSet.add(second)
444
445         wSet = set()
446         wSet.add(self.test_user_dn.get_casefold())
447         closure(vSet, wSet, aSet)
448         wSet.remove(self.test_user_dn.get_casefold())
449
450         tokenGroupsSet = set()
451
452         res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
453         self.assertEquals(len(res), 1)
454
455         dn_tokengroups = []
456         for sid in res[0]['tokenGroups']:
457             sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
458             res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
459                                          attrs=[])
460             tokenGroupsSet.add(res3[0].dn.get_casefold())
461
462         if len(wSet.difference(tokenGroupsSet)):
463             self.fail(msg="additional calculated: %s" % wSet.difference(tokenGroupsSet))
464
465         if len(tokenGroupsSet.difference(wSet)):
466             self.fail(msg="additional tokenGroups: %s" % tokenGroupsSet.difference(wSet))
467
468     def filtered_closure(self, wSet, filter_grouptype):
469         res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
470                                     expression="(|(objectclass=user)(objectclass=group))",
471                                     attrs=["memberOf"])
472         aSet = set()
473         aSetR = set()
474         vSet = set()
475         for obj in res:
476             vSet.add(obj.dn.get_casefold())
477             if "memberOf" in obj:
478                 for dn in obj["memberOf"]:
479                     first = obj.dn.get_casefold()
480                     second = ldb.Dn(self.admin_ldb, dn.decode('utf8')).get_casefold()
481                     aSet.add((first, second))
482                     aSetR.add((second, first))
483                     vSet.add(first)
484                     vSet.add(second)
485
486         res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
487                                     expression="(objectclass=user)",
488                                     attrs=["primaryGroupID"])
489         for obj in res:
490             if "primaryGroupID" in obj:
491                 sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0]))
492                 res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
493                                              attrs=[])
494                 first = obj.dn.get_casefold()
495                 second = res2[0].dn.get_casefold()
496
497                 aSet.add((first, second))
498                 aSetR.add((second, first))
499                 vSet.add(first)
500                 vSet.add(second)
501
502         uSet = set()
503         for v in vSet:
504             res_group = self.admin_ldb.search(base=v, scope=ldb.SCOPE_BASE,
505                                               attrs=["groupType"],
506                                               expression="objectClass=group")
507             if len(res_group) == 1:
508                 if hex(int(res_group[0]["groupType"][0]) & 0x00000000FFFFFFFF) == hex(filter_grouptype):
509                     uSet.add(v)
510             else:
511                 uSet.add(v)
512
513         closure(uSet, wSet, aSet)
514
515     def test_tokenGroupsGlobalAndUniversal_manual(self):
516         # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
517         # and compare the result
518
519         # The variable names come from MS-ADTS May 15, 2014
520
521         S = set()
522         S.add(self.test_user_dn.get_casefold())
523
524         self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP)
525
526         T = set()
527         # Not really a SID, we do this on DNs...
528         for sid in S:
529             X = set()
530             X.add(sid)
531             self.filtered_closure(X, GTYPE_SECURITY_UNIVERSAL_GROUP)
532
533             T = T.union(X)
534
535         T.remove(self.test_user_dn.get_casefold())
536
537         tokenGroupsSet = set()
538
539         res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"])
540         self.assertEquals(len(res), 1)
541
542         dn_tokengroups = []
543         for sid in res[0]['tokenGroupsGlobalAndUniversal']:
544             sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
545             res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
546                                          attrs=[])
547             tokenGroupsSet.add(res3[0].dn.get_casefold())
548
549         if len(T.difference(tokenGroupsSet)):
550             self.fail(msg="additional calculated: %s" % T.difference(tokenGroupsSet))
551
552         if len(tokenGroupsSet.difference(T)):
553             self.fail(msg="additional tokenGroupsGlobalAndUniversal: %s" % tokenGroupsSet.difference(T))
554
555     def test_samr_GetGroupsForUser(self):
556         # Confirm that we get the correct results against SAMR also
557         if not url.startswith("ldap://"):
558             self.fail(msg="This test is only valid on ldap (so we an find the hostname and use SAMR)")
559         host = url.split("://")[1]
560         (domain_sid, user_rid) = self.user_sid.split()
561         samr_conn = samba.dcerpc.samr.samr("ncacn_ip_tcp:%s[seal]" % host, lp, creds)
562         samr_handle = samr_conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
563         samr_domain = samr_conn.OpenDomain(samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED,
564                                            domain_sid)
565         user_handle = samr_conn.OpenUser(samr_domain, security.SEC_FLAG_MAXIMUM_ALLOWED, user_rid)
566         rids = samr_conn.GetGroupsForUser(user_handle)
567         samr_dns = set()
568         for rid in rids.rids:
569             self.assertEqual(rid.attributes, samr.SE_GROUP_MANDATORY | samr.SE_GROUP_ENABLED_BY_DEFAULT | samr.SE_GROUP_ENABLED)
570             sid = "%s-%d" % (domain_sid, rid.rid)
571             res = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
572                                         attrs=[])
573             samr_dns.add(res[0].dn.get_casefold())
574
575         user_info = samr_conn.QueryUserInfo(user_handle, 1)
576         self.assertEqual(rids.rids[0].rid, user_info.primary_gid)
577
578         tokenGroupsSet = set()
579         res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"])
580         for sid in res[0]['tokenGroupsGlobalAndUniversal']:
581             sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
582             res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
583                                          attrs=[],
584                                          expression="(&(|(grouptype=%d)(grouptype=%d))(objectclass=group))"
585                                          % (GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP))
586             if len(res) == 1:
587                 tokenGroupsSet.add(res3[0].dn.get_casefold())
588
589         if len(samr_dns.difference(tokenGroupsSet)):
590             self.fail(msg="additional samr_GetUserGroups over tokenGroups: %s" % samr_dns.difference(tokenGroupsSet))
591
592         memberOf = set()
593         # Add the primary group
594         primary_group_sid = "%s-%d" % (domain_sid, user_info.primary_gid)
595         res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
596                                      attrs=[])
597
598         memberOf.add(res2[0].dn.get_casefold())
599         res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["memberOf"])
600         for dn in res[0]['memberOf']:
601             res3 = self.admin_ldb.search(base=dn, scope=ldb.SCOPE_BASE,
602                                          attrs=[],
603                                          expression="(&(|(grouptype=%d)(grouptype=%d))(objectclass=group))"
604                                          % (GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP))
605             if len(res3) == 1:
606                 memberOf.add(res3[0].dn.get_casefold())
607
608         if len(memberOf.difference(samr_dns)):
609             self.fail(msg="additional memberOf over samr_GetUserGroups: %s" % memberOf.difference(samr_dns))
610
611         if len(samr_dns.difference(memberOf)):
612             self.fail(msg="additional samr_GetUserGroups over memberOf: %s" % samr_dns.difference(memberOf))
613
614         S = set()
615         S.add(self.test_user_dn.get_casefold())
616
617         self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP)
618         self.filtered_closure(S, GTYPE_SECURITY_UNIVERSAL_GROUP)
619
620         # Now remove the user DN and primary group
621         S.remove(self.test_user_dn.get_casefold())
622
623         if len(samr_dns.difference(S)):
624             self.fail(msg="additional samr_GetUserGroups over filtered_closure: %s" % samr_dns.difference(S))
625
626     def test_samr_GetGroupsForUser_nomember(self):
627         # Confirm that we get the correct results against SAMR also
628         if not url.startswith("ldap://"):
629             self.fail(msg="This test is only valid on ldap (so we an find the hostname and use SAMR)")
630         host = url.split("://")[1]
631
632         test_user = "tokengroups_user2"
633         self.admin_ldb.newuser(test_user, self.test_user_pass)
634         res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (test_user, self.base_dn),
635                                     attrs=["objectSid"], scope=ldb.SCOPE_BASE)
636         user_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
637
638         (domain_sid, user_rid) = user_sid.split()
639         samr_conn = samba.dcerpc.samr.samr("ncacn_ip_tcp:%s[seal]" % host, lp, creds)
640         samr_handle = samr_conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
641         samr_domain = samr_conn.OpenDomain(samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED,
642                                            domain_sid)
643         user_handle = samr_conn.OpenUser(samr_domain, security.SEC_FLAG_MAXIMUM_ALLOWED, user_rid)
644         rids = samr_conn.GetGroupsForUser(user_handle)
645         user_info = samr_conn.QueryUserInfo(user_handle, 1)
646         delete_force(self.admin_ldb, "CN=%s,%s,%s" %
647                      (test_user, "cn=users", self.base_dn))
648         self.assertEqual(len(rids.rids), 1)
649         self.assertEqual(rids.rids[0].rid, user_info.primary_gid)
650
651
652 if not "://" in url:
653     if os.path.isfile(url):
654         url = "tdb://%s" % url
655     else:
656         url = "ldap://%s" % url
657
658 TestProgram(module=__name__, opts=subunitopts)