2 # -*- coding: utf-8 -*-
3 # test tokengroups attribute against internal token calculation
5 from __future__ import print_function
10 sys.path.insert(0, "bin/python")
13 from samba.tests.subunitrun import SubunitOptions, TestProgram
15 import samba.getopt as options
17 from samba.auth import system_session
18 from samba import ldb, dsdb
19 from samba.samdb import SamDB
20 from samba.auth import AuthContext
21 from samba.ndr import ndr_unpack
22 from samba import gensec
23 from samba.credentials import Credentials, DONT_USE_KERBEROS
24 from samba.dsdb import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP
26 from samba.tests import delete_force
27 from samba.dcerpc import samr, security
28 from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES, AUTH_SESSION_INFO_NTLM
31 parser = optparse.OptionParser("token_group.py [options] <host>")
32 sambaopts = options.SambaOptions(parser)
33 parser.add_option_group(sambaopts)
34 parser.add_option_group(options.VersionOptions(parser))
35 # use command line creds if available
36 credopts = options.CredentialsOptions(parser)
37 parser.add_option_group(credopts)
38 subunitopts = SubunitOptions(parser)
39 parser.add_option_group(subunitopts)
40 opts, args = parser.parse_args()
48 lp = sambaopts.get_loadparm()
49 creds = credopts.get_credentials(lp)
50 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
52 def closure(vSet, wSet, aSet):
56 if end not in wSet and end in vSet:
58 closure(vSet, wSet, aSet)
60 class StaticTokenTest(samba.tests.TestCase):
63 super(StaticTokenTest, self).setUp()
64 self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
65 self.base_dn = self.ldb.domain_dn()
67 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
68 self.assertEquals(len(res), 1)
70 self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
72 session_info_flags = ( AUTH_SESSION_INFO_DEFAULT_GROUPS |
73 AUTH_SESSION_INFO_AUTHENTICATED |
74 AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
75 if creds.get_kerberos_state() == DONT_USE_KERBEROS:
76 session_info_flags |= AUTH_SESSION_INFO_NTLM
78 session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn,
79 session_info_flags=session_info_flags)
81 token = session.security_token
84 self.user_sids.append(str(s))
86 def test_rootDSE_tokenGroups(self):
87 """Testing rootDSE tokengroups against internal calculation"""
88 if not url.startswith("ldap"):
89 self.fail(msg="This test is only valid on ldap")
91 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
92 self.assertEquals(len(res), 1)
94 print("Getting tokenGroups from rootDSE")
96 for sid in res[0]['tokenGroups']:
97 tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
99 sidset1 = set(tokengroups)
100 sidset2 = set(self.user_sids)
101 if len(sidset1.difference(sidset2)):
102 print("token sids don't match")
103 print("tokengroups: %s" % tokengroups)
104 print("calculated : %s" % self.user_sids)
105 print("difference : %s" % sidset1.difference(sidset2))
106 self.fail(msg="calculated groups don't match against rootDSE tokenGroups")
108 def test_dn_tokenGroups(self):
109 print("Getting tokenGroups from user DN")
110 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
111 self.assertEquals(len(res), 1)
114 for sid in res[0]['tokenGroups']:
115 dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
117 sidset1 = set(dn_tokengroups)
118 sidset2 = set(self.user_sids)
119 if len(sidset1.difference(sidset2)):
120 print("token sids don't match")
121 print("difference : %s" % sidset1.difference(sidset2))
122 self.fail(msg="calculated groups don't match against user DN tokenGroups")
124 def test_pac_groups(self):
125 if creds.get_kerberos_state() == DONT_USE_KERBEROS:
126 self.skipTest("Kerberos disabled, skipping PAC test")
129 settings["lp_ctx"] = lp
130 settings["target_hostname"] = lp.get("netbios name")
132 gensec_client = gensec.Security.start_client(settings)
133 gensec_client.set_credentials(creds)
134 gensec_client.want_feature(gensec.FEATURE_SEAL)
135 gensec_client.start_mech_by_sasl_name("GSSAPI")
137 auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[])
139 gensec_server = gensec.Security.start_server(settings, auth_context)
140 machine_creds = Credentials()
141 machine_creds.guess(lp)
142 machine_creds.set_machine_account(lp)
143 gensec_server.set_credentials(machine_creds)
145 gensec_server.want_feature(gensec.FEATURE_SEAL)
146 gensec_server.start_mech_by_sasl_name("GSSAPI")
148 client_finished = False
149 server_finished = False
150 server_to_client = ""
152 # Run the actual call loop.
153 while client_finished == False and server_finished == False:
154 if not client_finished:
155 print("running client gensec_update")
156 (client_finished, client_to_server) = gensec_client.update(server_to_client)
157 if not server_finished:
158 print("running server gensec_update")
159 (server_finished, server_to_client) = gensec_server.update(client_to_server)
161 session = gensec_server.session_info()
163 token = session.security_token
166 pac_sids.append(str(s))
168 sidset1 = set(pac_sids)
169 sidset2 = set(self.user_sids)
170 if len(sidset1.difference(sidset2)):
171 print("token sids don't match")
172 print("difference : %s" % sidset1.difference(sidset2))
173 self.fail(msg="calculated groups don't match against user PAC tokenGroups")
175 class DynamicTokenTest(samba.tests.TestCase):
177 def get_creds(self, target_username, target_password):
178 creds_tmp = Credentials()
179 creds_tmp.set_username(target_username)
180 creds_tmp.set_password(target_password)
181 creds_tmp.set_domain(creds.get_domain())
182 creds_tmp.set_realm(creds.get_realm())
183 creds_tmp.set_workstation(creds.get_workstation())
184 creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
185 | gensec.FEATURE_SEAL)
188 def get_ldb_connection(self, target_username, target_password):
189 creds_tmp = self.get_creds(target_username, target_password)
190 ldb_target = SamDB(url=url, credentials=creds_tmp, lp=lp)
194 super(DynamicTokenTest, self).setUp()
195 self.admin_ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
197 self.base_dn = self.admin_ldb.domain_dn()
199 self.test_user = "tokengroups_user1"
200 self.test_user_pass = "samba123@"
201 self.admin_ldb.newuser(self.test_user, self.test_user_pass)
202 self.test_group0 = "tokengroups_group0"
203 self.admin_ldb.newgroup(self.test_group0, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
204 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group0, self.base_dn),
205 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
206 self.test_group0_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
208 self.admin_ldb.add_remove_group_members(self.test_group0, [self.test_user],
209 add_members_operation=True)
211 self.test_group1 = "tokengroups_group1"
212 self.admin_ldb.newgroup(self.test_group1, grouptype=dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
213 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group1, self.base_dn),
214 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
215 self.test_group1_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
217 self.admin_ldb.add_remove_group_members(self.test_group1, [self.test_user],
218 add_members_operation=True)
220 self.test_group2 = "tokengroups_group2"
221 self.admin_ldb.newgroup(self.test_group2, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
223 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group2, self.base_dn),
224 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
225 self.test_group2_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
227 self.admin_ldb.add_remove_group_members(self.test_group2, [self.test_user],
228 add_members_operation=True)
230 self.test_group3 = "tokengroups_group3"
231 self.admin_ldb.newgroup(self.test_group3, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
233 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group3, self.base_dn),
234 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
235 self.test_group3_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
237 self.admin_ldb.add_remove_group_members(self.test_group3, [self.test_group1],
238 add_members_operation=True)
240 self.test_group4 = "tokengroups_group4"
241 self.admin_ldb.newgroup(self.test_group4, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
243 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group4, self.base_dn),
244 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
245 self.test_group4_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
247 self.admin_ldb.add_remove_group_members(self.test_group4, [self.test_group3],
248 add_members_operation=True)
250 self.test_group5 = "tokengroups_group5"
251 self.admin_ldb.newgroup(self.test_group5, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
253 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group5, self.base_dn),
254 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
255 self.test_group5_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
257 self.admin_ldb.add_remove_group_members(self.test_group5, [self.test_group4],
258 add_members_operation=True)
260 self.test_group6 = "tokengroups_group6"
261 self.admin_ldb.newgroup(self.test_group6, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
263 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group6, self.base_dn),
264 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
265 self.test_group6_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
267 self.admin_ldb.add_remove_group_members(self.test_group6, [self.test_user],
268 add_members_operation=True)
270 self.ldb = self.get_ldb_connection(self.test_user, self.test_user_pass)
272 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
273 self.assertEquals(len(res), 1)
275 self.user_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0])
276 self.user_sid_dn = "<SID=%s>" % str(self.user_sid)
278 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=[])
279 self.assertEquals(len(res), 1)
281 self.test_user_dn = res[0].dn
283 session_info_flags = ( AUTH_SESSION_INFO_DEFAULT_GROUPS |
284 AUTH_SESSION_INFO_AUTHENTICATED |
285 AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
287 if creds.get_kerberos_state() == DONT_USE_KERBEROS:
288 session_info_flags |= AUTH_SESSION_INFO_NTLM
290 session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn,
291 session_info_flags=session_info_flags)
293 token = session.security_token
296 self.user_sids.append(str(s))
299 super(DynamicTokenTest, self).tearDown()
300 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
301 (self.test_user, "cn=users", self.base_dn))
302 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
303 (self.test_group0, "cn=users", self.base_dn))
304 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
305 (self.test_group1, "cn=users", self.base_dn))
306 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
307 (self.test_group2, "cn=users", self.base_dn))
308 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
309 (self.test_group3, "cn=users", self.base_dn))
310 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
311 (self.test_group4, "cn=users", self.base_dn))
312 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
313 (self.test_group5, "cn=users", self.base_dn))
314 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
315 (self.test_group6, "cn=users", self.base_dn))
317 def test_rootDSE_tokenGroups(self):
318 """Testing rootDSE tokengroups against internal calculation"""
319 if not url.startswith("ldap"):
320 self.fail(msg="This test is only valid on ldap")
322 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
323 self.assertEquals(len(res), 1)
325 print("Getting tokenGroups from rootDSE")
327 for sid in res[0]['tokenGroups']:
328 tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
330 sidset1 = set(tokengroups)
331 sidset2 = set(self.user_sids)
332 if len(sidset1.difference(sidset2)):
333 print("token sids don't match")
334 print("tokengroups: %s" % tokengroups)
335 print("calculated : %s" % self.user_sids)
336 print("difference : %s" % sidset1.difference(sidset2))
337 self.fail(msg="calculated groups don't match against rootDSE tokenGroups")
339 def test_dn_tokenGroups(self):
340 print("Getting tokenGroups from user DN")
341 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
342 self.assertEquals(len(res), 1)
345 for sid in res[0]['tokenGroups']:
346 dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
348 sidset1 = set(dn_tokengroups)
349 sidset2 = set(self.user_sids)
351 # The SIDs on the DN do not include the NTLM authentication SID
352 sidset2.discard(samba.dcerpc.security.SID_NT_NTLM_AUTHENTICATION)
354 if len(sidset1.difference(sidset2)):
355 print("token sids don't match")
356 print("difference : %s" % sidset1.difference(sidset2))
357 self.fail(msg="calculated groups don't match against user DN tokenGroups")
359 def test_pac_groups(self):
361 settings["lp_ctx"] = lp
362 settings["target_hostname"] = lp.get("netbios name")
364 gensec_client = gensec.Security.start_client(settings)
365 gensec_client.set_credentials(self.get_creds(self.test_user, self.test_user_pass))
366 gensec_client.want_feature(gensec.FEATURE_SEAL)
367 gensec_client.start_mech_by_sasl_name("GSSAPI")
369 auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[])
371 gensec_server = gensec.Security.start_server(settings, auth_context)
372 machine_creds = Credentials()
373 machine_creds.guess(lp)
374 machine_creds.set_machine_account(lp)
375 gensec_server.set_credentials(machine_creds)
377 gensec_server.want_feature(gensec.FEATURE_SEAL)
378 gensec_server.start_mech_by_sasl_name("GSSAPI")
380 client_finished = False
381 server_finished = False
382 server_to_client = ""
384 # Run the actual call loop.
385 while client_finished == False and server_finished == False:
386 if not client_finished:
387 print("running client gensec_update")
388 (client_finished, client_to_server) = gensec_client.update(server_to_client)
389 if not server_finished:
390 print("running server gensec_update")
391 (server_finished, server_to_client) = gensec_server.update(client_to_server)
393 session = gensec_server.session_info()
395 token = session.security_token
398 pac_sids.append(str(s))
400 sidset1 = set(pac_sids)
401 sidset2 = set(self.user_sids)
402 if len(sidset1.difference(sidset2)):
403 print("token sids don't match")
404 print("difference : %s" % sidset1.difference(sidset2))
405 self.fail(msg="calculated groups don't match against user PAC tokenGroups")
408 def test_tokenGroups_manual(self):
409 # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
410 # and compare the result
411 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
412 expression="(|(objectclass=user)(objectclass=group))",
418 if "memberOf" in obj:
419 for dn in obj["memberOf"]:
420 first = obj.dn.get_casefold()
421 second = ldb.Dn(self.admin_ldb, dn).get_casefold()
422 aSet.add((first, second))
423 aSetR.add((second, first))
427 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
428 expression="(objectclass=user)",
429 attrs=["primaryGroupID"])
431 if "primaryGroupID" in obj:
432 sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0]))
433 res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
435 first = obj.dn.get_casefold()
436 second = res2[0].dn.get_casefold()
438 aSet.add((first, second))
439 aSetR.add((second, first))
444 wSet.add(self.test_user_dn.get_casefold())
445 closure(vSet, wSet, aSet)
446 wSet.remove(self.test_user_dn.get_casefold())
448 tokenGroupsSet = set()
450 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
451 self.assertEquals(len(res), 1)
454 for sid in res[0]['tokenGroups']:
455 sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
456 res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
458 tokenGroupsSet.add(res3[0].dn.get_casefold())
460 if len(wSet.difference(tokenGroupsSet)):
461 self.fail(msg="additional calculated: %s" % wSet.difference(tokenGroupsSet))
463 if len(tokenGroupsSet.difference(wSet)):
464 self.fail(msg="additional tokenGroups: %s" % tokenGroupsSet.difference(wSet))
467 def filtered_closure(self, wSet, filter_grouptype):
468 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
469 expression="(|(objectclass=user)(objectclass=group))",
475 vSet.add(obj.dn.get_casefold())
476 if "memberOf" in obj:
477 for dn in obj["memberOf"]:
478 first = obj.dn.get_casefold()
479 second = ldb.Dn(self.admin_ldb, dn).get_casefold()
480 aSet.add((first, second))
481 aSetR.add((second, first))
485 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
486 expression="(objectclass=user)",
487 attrs=["primaryGroupID"])
489 if "primaryGroupID" in obj:
490 sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0]))
491 res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
493 first = obj.dn.get_casefold()
494 second = res2[0].dn.get_casefold()
496 aSet.add((first, second))
497 aSetR.add((second, first))
503 res_group = self.admin_ldb.search(base=v, scope=ldb.SCOPE_BASE,
505 expression="objectClass=group")
506 if len(res_group) == 1:
507 if hex(int(res_group[0]["groupType"][0]) & 0x00000000FFFFFFFF) == hex(filter_grouptype):
512 closure(uSet, wSet, aSet)
515 def test_tokenGroupsGlobalAndUniversal_manual(self):
516 # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
517 # and compare the result
519 # The variable names come from MS-ADTS May 15, 2014
522 S.add(self.test_user_dn.get_casefold())
524 self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP)
527 # Not really a SID, we do this on DNs...
531 self.filtered_closure(X, GTYPE_SECURITY_UNIVERSAL_GROUP)
535 T.remove(self.test_user_dn.get_casefold())
537 tokenGroupsSet = set()
539 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"])
540 self.assertEquals(len(res), 1)
543 for sid in res[0]['tokenGroupsGlobalAndUniversal']:
544 sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
545 res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
547 tokenGroupsSet.add(res3[0].dn.get_casefold())
549 if len(T.difference(tokenGroupsSet)):
550 self.fail(msg="additional calculated: %s" % T.difference(tokenGroupsSet))
552 if len(tokenGroupsSet.difference(T)):
553 self.fail(msg="additional tokenGroupsGlobalAndUniversal: %s" % tokenGroupsSet.difference(T))
555 def test_samr_GetGroupsForUser(self):
556 # Confirm that we get the correct results against SAMR also
557 if not url.startswith("ldap://"):
558 self.fail(msg="This test is only valid on ldap (so we an find the hostname and use SAMR)")
559 host = url.split("://")[1]
560 (domain_sid, user_rid) = self.user_sid.split()
561 samr_conn = samba.dcerpc.samr.samr("ncacn_ip_tcp:%s[seal]" % host, lp, creds)
562 samr_handle = samr_conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
563 samr_domain = samr_conn.OpenDomain(samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED,
565 user_handle = samr_conn.OpenUser(samr_domain, security.SEC_FLAG_MAXIMUM_ALLOWED, user_rid)
566 rids = samr_conn.GetGroupsForUser(user_handle)
568 for rid in rids.rids:
569 self.assertEqual(rid.attributes, samr.SE_GROUP_MANDATORY | samr.SE_GROUP_ENABLED_BY_DEFAULT| samr.SE_GROUP_ENABLED)
570 sid = "%s-%d" % (domain_sid, rid.rid)
571 res = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
573 samr_dns.add(res[0].dn.get_casefold())
575 user_info = samr_conn.QueryUserInfo(user_handle, 1)
576 self.assertEqual(rids.rids[0].rid, user_info.primary_gid)
578 tokenGroupsSet = set()
579 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"])
580 for sid in res[0]['tokenGroupsGlobalAndUniversal']:
581 sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
582 res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
584 expression="(&(|(grouptype=%d)(grouptype=%d))(objectclass=group))"
585 % (GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP))
587 tokenGroupsSet.add(res3[0].dn.get_casefold())
589 if len(samr_dns.difference(tokenGroupsSet)):
590 self.fail(msg="additional samr_GetUserGroups over tokenGroups: %s" % samr_dns.difference(tokenGroupsSet))
593 # Add the primary group
594 primary_group_sid = "%s-%d" % (domain_sid, user_info.primary_gid)
595 res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
598 memberOf.add(res2[0].dn.get_casefold())
599 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["memberOf"])
600 for dn in res[0]['memberOf']:
601 res3 = self.admin_ldb.search(base=dn, scope=ldb.SCOPE_BASE,
603 expression="(&(|(grouptype=%d)(grouptype=%d))(objectclass=group))"
604 % (GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP))
606 memberOf.add(res3[0].dn.get_casefold())
608 if len(memberOf.difference(samr_dns)):
609 self.fail(msg="additional memberOf over samr_GetUserGroups: %s" % memberOf.difference(samr_dns))
611 if len(samr_dns.difference(memberOf)):
612 self.fail(msg="additional samr_GetUserGroups over memberOf: %s" % samr_dns.difference(memberOf))
615 S.add(self.test_user_dn.get_casefold())
617 self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP)
618 self.filtered_closure(S, GTYPE_SECURITY_UNIVERSAL_GROUP)
620 # Now remove the user DN and primary group
621 S.remove(self.test_user_dn.get_casefold())
623 if len(samr_dns.difference(S)):
624 self.fail(msg="additional samr_GetUserGroups over filtered_closure: %s" % samr_dns.difference(S))
626 def test_samr_GetGroupsForUser_nomember(self):
627 # Confirm that we get the correct results against SAMR also
628 if not url.startswith("ldap://"):
629 self.fail(msg="This test is only valid on ldap (so we an find the hostname and use SAMR)")
630 host = url.split("://")[1]
632 test_user = "tokengroups_user2"
633 self.admin_ldb.newuser(test_user, self.test_user_pass)
634 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (test_user, self.base_dn),
635 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
636 user_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
638 (domain_sid, user_rid) = user_sid.split()
639 samr_conn = samba.dcerpc.samr.samr("ncacn_ip_tcp:%s[seal]" % host, lp, creds)
640 samr_handle = samr_conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
641 samr_domain = samr_conn.OpenDomain(samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED,
643 user_handle = samr_conn.OpenUser(samr_domain, security.SEC_FLAG_MAXIMUM_ALLOWED, user_rid)
644 rids = samr_conn.GetGroupsForUser(user_handle)
645 user_info = samr_conn.QueryUserInfo(user_handle, 1)
646 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
647 (test_user, "cn=users", self.base_dn))
648 self.assertEqual(len(rids.rids), 1)
649 self.assertEqual(rids.rids[0].rid, user_info.primary_gid)
652 if os.path.isfile(url):
653 url = "tdb://%s" % url
655 url = "ldap://%s" % url
657 TestProgram(module=__name__, opts=subunitopts)