2 # -*- coding: utf-8 -*-
3 # test tokengroups attribute against internal token calculation
5 from __future__ import print_function
10 sys.path.insert(0, "bin/python")
13 from samba.tests.subunitrun import SubunitOptions, TestProgram
15 import samba.getopt as options
17 from samba.auth import system_session
18 from samba import ldb, dsdb
19 from samba.samdb import SamDB
20 from samba.auth import AuthContext
21 from samba.ndr import ndr_unpack
22 from samba import gensec
23 from samba.credentials import Credentials, DONT_USE_KERBEROS
24 from samba.dsdb import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP
26 from samba.tests import delete_force
27 from samba.dcerpc import samr, security
28 from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES, AUTH_SESSION_INFO_NTLM
31 parser = optparse.OptionParser("token_group.py [options] <host>")
32 sambaopts = options.SambaOptions(parser)
33 parser.add_option_group(sambaopts)
34 parser.add_option_group(options.VersionOptions(parser))
35 # use command line creds if available
36 credopts = options.CredentialsOptions(parser)
37 parser.add_option_group(credopts)
38 subunitopts = SubunitOptions(parser)
39 parser.add_option_group(subunitopts)
40 opts, args = parser.parse_args()
48 lp = sambaopts.get_loadparm()
49 creds = credopts.get_credentials(lp)
50 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
53 def closure(vSet, wSet, aSet):
57 if end not in wSet and end in vSet:
59 closure(vSet, wSet, aSet)
62 class StaticTokenTest(samba.tests.TestCase):
65 super(StaticTokenTest, self).setUp()
66 self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
67 self.base_dn = self.ldb.domain_dn()
69 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
70 self.assertEquals(len(res), 1)
72 self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
74 session_info_flags = (AUTH_SESSION_INFO_DEFAULT_GROUPS |
75 AUTH_SESSION_INFO_AUTHENTICATED |
76 AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
77 if creds.get_kerberos_state() == DONT_USE_KERBEROS:
78 session_info_flags |= AUTH_SESSION_INFO_NTLM
80 session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn,
81 session_info_flags=session_info_flags)
83 token = session.security_token
86 self.user_sids.append(str(s))
88 def test_rootDSE_tokenGroups(self):
89 """Testing rootDSE tokengroups against internal calculation"""
90 if not url.startswith("ldap"):
91 self.fail(msg="This test is only valid on ldap")
93 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
94 self.assertEquals(len(res), 1)
96 print("Getting tokenGroups from rootDSE")
98 for sid in res[0]['tokenGroups']:
99 tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
101 sidset1 = set(tokengroups)
102 sidset2 = set(self.user_sids)
103 if len(sidset1.difference(sidset2)):
104 print("token sids don't match")
105 print("tokengroups: %s" % tokengroups)
106 print("calculated : %s" % self.user_sids)
107 print("difference : %s" % sidset1.difference(sidset2))
108 self.fail(msg="calculated groups don't match against rootDSE tokenGroups")
110 def test_dn_tokenGroups(self):
111 print("Getting tokenGroups from user DN")
112 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
113 self.assertEquals(len(res), 1)
116 for sid in res[0]['tokenGroups']:
117 dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
119 sidset1 = set(dn_tokengroups)
120 sidset2 = set(self.user_sids)
121 if len(sidset1.difference(sidset2)):
122 print("token sids don't match")
123 print("difference : %s" % sidset1.difference(sidset2))
124 self.fail(msg="calculated groups don't match against user DN tokenGroups")
126 def test_pac_groups(self):
127 if creds.get_kerberos_state() == DONT_USE_KERBEROS:
128 self.skipTest("Kerberos disabled, skipping PAC test")
131 settings["lp_ctx"] = lp
132 settings["target_hostname"] = lp.get("netbios name")
134 gensec_client = gensec.Security.start_client(settings)
135 gensec_client.set_credentials(creds)
136 gensec_client.want_feature(gensec.FEATURE_SEAL)
137 gensec_client.start_mech_by_sasl_name("GSSAPI")
139 auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[])
141 gensec_server = gensec.Security.start_server(settings, auth_context)
142 machine_creds = Credentials()
143 machine_creds.guess(lp)
144 machine_creds.set_machine_account(lp)
145 gensec_server.set_credentials(machine_creds)
147 gensec_server.want_feature(gensec.FEATURE_SEAL)
148 gensec_server.start_mech_by_sasl_name("GSSAPI")
150 client_finished = False
151 server_finished = False
152 server_to_client = b""
154 # Run the actual call loop.
155 while client_finished == False and server_finished == False:
156 if not client_finished:
157 print("running client gensec_update")
158 (client_finished, client_to_server) = gensec_client.update(server_to_client)
159 if not server_finished:
160 print("running server gensec_update")
161 (server_finished, server_to_client) = gensec_server.update(client_to_server)
163 session = gensec_server.session_info()
165 token = session.security_token
168 pac_sids.append(str(s))
170 sidset1 = set(pac_sids)
171 sidset2 = set(self.user_sids)
172 if len(sidset1.difference(sidset2)):
173 print("token sids don't match")
174 print("difference : %s" % sidset1.difference(sidset2))
175 self.fail(msg="calculated groups don't match against user PAC tokenGroups")
178 class DynamicTokenTest(samba.tests.TestCase):
180 def get_creds(self, target_username, target_password):
181 creds_tmp = Credentials()
182 creds_tmp.set_username(target_username)
183 creds_tmp.set_password(target_password)
184 creds_tmp.set_domain(creds.get_domain())
185 creds_tmp.set_realm(creds.get_realm())
186 creds_tmp.set_workstation(creds.get_workstation())
187 creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
188 | gensec.FEATURE_SEAL)
191 def get_ldb_connection(self, target_username, target_password):
192 creds_tmp = self.get_creds(target_username, target_password)
193 ldb_target = SamDB(url=url, credentials=creds_tmp, lp=lp)
197 super(DynamicTokenTest, self).setUp()
198 self.admin_ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
200 self.base_dn = self.admin_ldb.domain_dn()
202 self.test_user = "tokengroups_user1"
203 self.test_user_pass = "samba123@"
204 self.admin_ldb.newuser(self.test_user, self.test_user_pass)
205 self.test_group0 = "tokengroups_group0"
206 self.admin_ldb.newgroup(self.test_group0, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
207 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group0, self.base_dn),
208 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
209 self.test_group0_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
211 self.admin_ldb.add_remove_group_members(self.test_group0, [self.test_user],
212 add_members_operation=True)
214 self.test_group1 = "tokengroups_group1"
215 self.admin_ldb.newgroup(self.test_group1, grouptype=dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
216 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group1, self.base_dn),
217 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
218 self.test_group1_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
220 self.admin_ldb.add_remove_group_members(self.test_group1, [self.test_user],
221 add_members_operation=True)
223 self.test_group2 = "tokengroups_group2"
224 self.admin_ldb.newgroup(self.test_group2, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
226 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group2, self.base_dn),
227 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
228 self.test_group2_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
230 self.admin_ldb.add_remove_group_members(self.test_group2, [self.test_user],
231 add_members_operation=True)
233 self.test_group3 = "tokengroups_group3"
234 self.admin_ldb.newgroup(self.test_group3, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
236 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group3, self.base_dn),
237 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
238 self.test_group3_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
240 self.admin_ldb.add_remove_group_members(self.test_group3, [self.test_group1],
241 add_members_operation=True)
243 self.test_group4 = "tokengroups_group4"
244 self.admin_ldb.newgroup(self.test_group4, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
246 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group4, self.base_dn),
247 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
248 self.test_group4_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
250 self.admin_ldb.add_remove_group_members(self.test_group4, [self.test_group3],
251 add_members_operation=True)
253 self.test_group5 = "tokengroups_group5"
254 self.admin_ldb.newgroup(self.test_group5, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
256 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group5, self.base_dn),
257 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
258 self.test_group5_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
260 self.admin_ldb.add_remove_group_members(self.test_group5, [self.test_group4],
261 add_members_operation=True)
263 self.test_group6 = "tokengroups_group6"
264 self.admin_ldb.newgroup(self.test_group6, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
266 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group6, self.base_dn),
267 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
268 self.test_group6_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
270 self.admin_ldb.add_remove_group_members(self.test_group6, [self.test_user],
271 add_members_operation=True)
273 self.ldb = self.get_ldb_connection(self.test_user, self.test_user_pass)
275 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
276 self.assertEquals(len(res), 1)
278 self.user_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0])
279 self.user_sid_dn = "<SID=%s>" % str(self.user_sid)
281 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=[])
282 self.assertEquals(len(res), 1)
284 self.test_user_dn = res[0].dn
286 session_info_flags = (AUTH_SESSION_INFO_DEFAULT_GROUPS |
287 AUTH_SESSION_INFO_AUTHENTICATED |
288 AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
290 if creds.get_kerberos_state() == DONT_USE_KERBEROS:
291 session_info_flags |= AUTH_SESSION_INFO_NTLM
293 session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn,
294 session_info_flags=session_info_flags)
296 token = session.security_token
299 self.user_sids.append(str(s))
302 super(DynamicTokenTest, self).tearDown()
303 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
304 (self.test_user, "cn=users", self.base_dn))
305 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
306 (self.test_group0, "cn=users", self.base_dn))
307 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
308 (self.test_group1, "cn=users", self.base_dn))
309 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
310 (self.test_group2, "cn=users", self.base_dn))
311 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
312 (self.test_group3, "cn=users", self.base_dn))
313 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
314 (self.test_group4, "cn=users", self.base_dn))
315 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
316 (self.test_group5, "cn=users", self.base_dn))
317 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
318 (self.test_group6, "cn=users", self.base_dn))
320 def test_rootDSE_tokenGroups(self):
321 """Testing rootDSE tokengroups against internal calculation"""
322 if not url.startswith("ldap"):
323 self.fail(msg="This test is only valid on ldap")
325 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
326 self.assertEquals(len(res), 1)
328 print("Getting tokenGroups from rootDSE")
330 for sid in res[0]['tokenGroups']:
331 tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
333 sidset1 = set(tokengroups)
334 sidset2 = set(self.user_sids)
335 if len(sidset1.difference(sidset2)):
336 print("token sids don't match")
337 print("tokengroups: %s" % tokengroups)
338 print("calculated : %s" % self.user_sids)
339 print("difference : %s" % sidset1.difference(sidset2))
340 self.fail(msg="calculated groups don't match against rootDSE tokenGroups")
342 def test_dn_tokenGroups(self):
343 print("Getting tokenGroups from user DN")
344 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
345 self.assertEquals(len(res), 1)
348 for sid in res[0]['tokenGroups']:
349 dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
351 sidset1 = set(dn_tokengroups)
352 sidset2 = set(self.user_sids)
354 # The SIDs on the DN do not include the NTLM authentication SID
355 sidset2.discard(samba.dcerpc.security.SID_NT_NTLM_AUTHENTICATION)
357 if len(sidset1.difference(sidset2)):
358 print("token sids don't match")
359 print("difference : %s" % sidset1.difference(sidset2))
360 self.fail(msg="calculated groups don't match against user DN tokenGroups")
362 def test_pac_groups(self):
364 settings["lp_ctx"] = lp
365 settings["target_hostname"] = lp.get("netbios name")
367 gensec_client = gensec.Security.start_client(settings)
368 gensec_client.set_credentials(self.get_creds(self.test_user, self.test_user_pass))
369 gensec_client.want_feature(gensec.FEATURE_SEAL)
370 gensec_client.start_mech_by_sasl_name("GSSAPI")
372 auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[])
374 gensec_server = gensec.Security.start_server(settings, auth_context)
375 machine_creds = Credentials()
376 machine_creds.guess(lp)
377 machine_creds.set_machine_account(lp)
378 gensec_server.set_credentials(machine_creds)
380 gensec_server.want_feature(gensec.FEATURE_SEAL)
381 gensec_server.start_mech_by_sasl_name("GSSAPI")
383 client_finished = False
384 server_finished = False
385 server_to_client = b""
387 # Run the actual call loop.
388 while client_finished == False and server_finished == False:
389 if not client_finished:
390 print("running client gensec_update")
391 (client_finished, client_to_server) = gensec_client.update(server_to_client)
392 if not server_finished:
393 print("running server gensec_update")
394 (server_finished, server_to_client) = gensec_server.update(client_to_server)
396 session = gensec_server.session_info()
398 token = session.security_token
401 pac_sids.append(str(s))
403 sidset1 = set(pac_sids)
404 sidset2 = set(self.user_sids)
405 if len(sidset1.difference(sidset2)):
406 print("token sids don't match")
407 print("difference : %s" % sidset1.difference(sidset2))
408 self.fail(msg="calculated groups don't match against user PAC tokenGroups")
410 def test_tokenGroups_manual(self):
411 # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
412 # and compare the result
413 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
414 expression="(|(objectclass=user)(objectclass=group))",
420 if "memberOf" in obj:
421 for dn in obj["memberOf"]:
422 first = obj.dn.get_casefold()
423 second = ldb.Dn(self.admin_ldb, dn.decode('utf8')).get_casefold()
424 aSet.add((first, second))
425 aSetR.add((second, first))
429 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
430 expression="(objectclass=user)",
431 attrs=["primaryGroupID"])
433 if "primaryGroupID" in obj:
434 sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0]))
435 res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
437 first = obj.dn.get_casefold()
438 second = res2[0].dn.get_casefold()
440 aSet.add((first, second))
441 aSetR.add((second, first))
446 wSet.add(self.test_user_dn.get_casefold())
447 closure(vSet, wSet, aSet)
448 wSet.remove(self.test_user_dn.get_casefold())
450 tokenGroupsSet = set()
452 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
453 self.assertEquals(len(res), 1)
456 for sid in res[0]['tokenGroups']:
457 sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
458 res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
460 tokenGroupsSet.add(res3[0].dn.get_casefold())
462 if len(wSet.difference(tokenGroupsSet)):
463 self.fail(msg="additional calculated: %s" % wSet.difference(tokenGroupsSet))
465 if len(tokenGroupsSet.difference(wSet)):
466 self.fail(msg="additional tokenGroups: %s" % tokenGroupsSet.difference(wSet))
468 def filtered_closure(self, wSet, filter_grouptype):
469 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
470 expression="(|(objectclass=user)(objectclass=group))",
476 vSet.add(obj.dn.get_casefold())
477 if "memberOf" in obj:
478 for dn in obj["memberOf"]:
479 first = obj.dn.get_casefold()
480 second = ldb.Dn(self.admin_ldb, dn.decode('utf8')).get_casefold()
481 aSet.add((first, second))
482 aSetR.add((second, first))
486 res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
487 expression="(objectclass=user)",
488 attrs=["primaryGroupID"])
490 if "primaryGroupID" in obj:
491 sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0]))
492 res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
494 first = obj.dn.get_casefold()
495 second = res2[0].dn.get_casefold()
497 aSet.add((first, second))
498 aSetR.add((second, first))
504 res_group = self.admin_ldb.search(base=v, scope=ldb.SCOPE_BASE,
506 expression="objectClass=group")
507 if len(res_group) == 1:
508 if hex(int(res_group[0]["groupType"][0]) & 0x00000000FFFFFFFF) == hex(filter_grouptype):
513 closure(uSet, wSet, aSet)
515 def test_tokenGroupsGlobalAndUniversal_manual(self):
516 # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
517 # and compare the result
519 # The variable names come from MS-ADTS May 15, 2014
522 S.add(self.test_user_dn.get_casefold())
524 self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP)
527 # Not really a SID, we do this on DNs...
531 self.filtered_closure(X, GTYPE_SECURITY_UNIVERSAL_GROUP)
535 T.remove(self.test_user_dn.get_casefold())
537 tokenGroupsSet = set()
539 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"])
540 self.assertEquals(len(res), 1)
543 for sid in res[0]['tokenGroupsGlobalAndUniversal']:
544 sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
545 res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
547 tokenGroupsSet.add(res3[0].dn.get_casefold())
549 if len(T.difference(tokenGroupsSet)):
550 self.fail(msg="additional calculated: %s" % T.difference(tokenGroupsSet))
552 if len(tokenGroupsSet.difference(T)):
553 self.fail(msg="additional tokenGroupsGlobalAndUniversal: %s" % tokenGroupsSet.difference(T))
555 def test_samr_GetGroupsForUser(self):
556 # Confirm that we get the correct results against SAMR also
557 if not url.startswith("ldap://"):
558 self.fail(msg="This test is only valid on ldap (so we an find the hostname and use SAMR)")
559 host = url.split("://")[1]
560 (domain_sid, user_rid) = self.user_sid.split()
561 samr_conn = samba.dcerpc.samr.samr("ncacn_ip_tcp:%s[seal]" % host, lp, creds)
562 samr_handle = samr_conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
563 samr_domain = samr_conn.OpenDomain(samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED,
565 user_handle = samr_conn.OpenUser(samr_domain, security.SEC_FLAG_MAXIMUM_ALLOWED, user_rid)
566 rids = samr_conn.GetGroupsForUser(user_handle)
568 for rid in rids.rids:
569 self.assertEqual(rid.attributes, security.SE_GROUP_MANDATORY | security.SE_GROUP_ENABLED_BY_DEFAULT | security.SE_GROUP_ENABLED)
570 sid = "%s-%d" % (domain_sid, rid.rid)
571 res = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
573 samr_dns.add(res[0].dn.get_casefold())
575 user_info = samr_conn.QueryUserInfo(user_handle, 1)
576 self.assertEqual(rids.rids[0].rid, user_info.primary_gid)
578 tokenGroupsSet = set()
579 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"])
580 for sid in res[0]['tokenGroupsGlobalAndUniversal']:
581 sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
582 res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
584 expression="(&(|(grouptype=%d)(grouptype=%d))(objectclass=group))"
585 % (GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP))
587 tokenGroupsSet.add(res3[0].dn.get_casefold())
589 if len(samr_dns.difference(tokenGroupsSet)):
590 self.fail(msg="additional samr_GetUserGroups over tokenGroups: %s" % samr_dns.difference(tokenGroupsSet))
593 # Add the primary group
594 primary_group_sid = "%s-%d" % (domain_sid, user_info.primary_gid)
595 res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
598 memberOf.add(res2[0].dn.get_casefold())
599 res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["memberOf"])
600 for dn in res[0]['memberOf']:
601 res3 = self.admin_ldb.search(base=dn, scope=ldb.SCOPE_BASE,
603 expression="(&(|(grouptype=%d)(grouptype=%d))(objectclass=group))"
604 % (GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP))
606 memberOf.add(res3[0].dn.get_casefold())
608 if len(memberOf.difference(samr_dns)):
609 self.fail(msg="additional memberOf over samr_GetUserGroups: %s" % memberOf.difference(samr_dns))
611 if len(samr_dns.difference(memberOf)):
612 self.fail(msg="additional samr_GetUserGroups over memberOf: %s" % samr_dns.difference(memberOf))
615 S.add(self.test_user_dn.get_casefold())
617 self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP)
618 self.filtered_closure(S, GTYPE_SECURITY_UNIVERSAL_GROUP)
620 # Now remove the user DN and primary group
621 S.remove(self.test_user_dn.get_casefold())
623 if len(samr_dns.difference(S)):
624 self.fail(msg="additional samr_GetUserGroups over filtered_closure: %s" % samr_dns.difference(S))
626 def test_samr_GetGroupsForUser_nomember(self):
627 # Confirm that we get the correct results against SAMR also
628 if not url.startswith("ldap://"):
629 self.fail(msg="This test is only valid on ldap (so we an find the hostname and use SAMR)")
630 host = url.split("://")[1]
632 test_user = "tokengroups_user2"
633 self.admin_ldb.newuser(test_user, self.test_user_pass)
634 res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (test_user, self.base_dn),
635 attrs=["objectSid"], scope=ldb.SCOPE_BASE)
636 user_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
638 (domain_sid, user_rid) = user_sid.split()
639 samr_conn = samba.dcerpc.samr.samr("ncacn_ip_tcp:%s[seal]" % host, lp, creds)
640 samr_handle = samr_conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED)
641 samr_domain = samr_conn.OpenDomain(samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED,
643 user_handle = samr_conn.OpenUser(samr_domain, security.SEC_FLAG_MAXIMUM_ALLOWED, user_rid)
644 rids = samr_conn.GetGroupsForUser(user_handle)
645 user_info = samr_conn.QueryUserInfo(user_handle, 1)
646 delete_force(self.admin_ldb, "CN=%s,%s,%s" %
647 (test_user, "cn=users", self.base_dn))
648 self.assertEqual(len(rids.rids), 1)
649 self.assertEqual(rids.rids[0].rid, user_info.primary_gid)
653 if os.path.isfile(url):
654 url = "tdb://%s" % url
656 url = "ldap://%s" % url
658 TestProgram(module=__name__, opts=subunitopts)