struct ldb_dn *aggregate_dn;
};
+enum search_type {
+ TOKEN_GROUPS,
+ TOKEN_GROUPS_GLOBAL_AND_UNIVERSAL,
+ TOKEN_GROUPS_NO_GC_ACCEPTABLE
+};
+
/*
construct a canonical name from a message
*/
/*
construct the token groups for SAM objects from a message
*/
-static int construct_token_groups(struct ldb_module *module,
- struct ldb_message *msg, enum ldb_scope scope,
- struct ldb_request *parent)
+static int construct_generic_token_groups(struct ldb_module *module,
+ struct ldb_message *msg, enum ldb_scope scope,
+ struct ldb_request *parent,
+ const char *attribute_string,
+ enum search_type type)
{
struct ldb_context *ldb = ldb_module_get_ctx(module);
TALLOC_CTX *tmp_ctx = talloc_new(msg);
}
/* only return security groups */
- filter = talloc_asprintf(tmp_ctx, "(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=%u))",
- GROUP_TYPE_SECURITY_ENABLED);
+ switch(type) {
+ case TOKEN_GROUPS_GLOBAL_AND_UNIVERSAL:
+ filter = talloc_asprintf(tmp_ctx, "(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=%u)(|(groupType:1.2.840.113556.1.4.803:=%u)(groupType:1.2.840.113556.1.4.803:=%u)))",
+ GROUP_TYPE_SECURITY_ENABLED, GROUP_TYPE_ACCOUNT_GROUP, GROUP_TYPE_UNIVERSAL_GROUP);
+ break;
+ case TOKEN_GROUPS_NO_GC_ACCEPTABLE:
+ case TOKEN_GROUPS:
+ filter = talloc_asprintf(tmp_ctx, "(&(objectClass=group)(groupType:1.2.840.113556.1.4.803:=%u))",
+ GROUP_TYPE_SECURITY_ENABLED);
+ break;
+ }
+
if (!filter) {
talloc_free(tmp_ctx);
return ldb_oom(ldb);
}
for (i=0; i < num_groupSIDs; i++) {
- ret = samdb_msg_add_dom_sid(ldb, msg, msg, "tokenGroups", &groupSIDs[i]);
+ ret = samdb_msg_add_dom_sid(ldb, msg, msg, attribute_string, &groupSIDs[i]);
if (ret) {
talloc_free(tmp_ctx);
return ret;
return LDB_SUCCESS;
}
+static int construct_token_groups(struct ldb_module *module,
+ struct ldb_message *msg, enum ldb_scope scope,
+ struct ldb_request *parent)
+{
+ /**
+ * TODO: Add in a limiting domain when we start to support
+ * trusted domains.
+ */
+ return construct_generic_token_groups(module, msg, scope, parent,
+ "tokenGroups",
+ TOKEN_GROUPS);
+}
+
+static int construct_token_groups_no_gc(struct ldb_module *module,
+ struct ldb_message *msg, enum ldb_scope scope,
+ struct ldb_request *parent)
+{
+ /**
+ * TODO: Add in a limiting domain when we start to support
+ * trusted domains.
+ */
+ return construct_generic_token_groups(module, msg, scope, parent,
+ "tokenGroupsNoGCAcceptable",
+ TOKEN_GROUPS);
+}
+
+static int construct_global_universal_token_groups(struct ldb_module *module,
+ struct ldb_message *msg, enum ldb_scope scope,
+ struct ldb_request *parent)
+{
+ return construct_generic_token_groups(module, msg, scope, parent,
+ "tokenGroupsGlobalAndUniversal",
+ TOKEN_GROUPS_GLOBAL_AND_UNIVERSAL);
+}
/*
construct the parent GUID for an entry from a message
*/
{ "canonicalName", NULL, NULL , construct_canonical_name },
{ "primaryGroupToken", "objectClass", objectSid_attr, construct_primary_group_token },
{ "tokenGroups", "primaryGroupID", objectSid_attr, construct_token_groups },
+ { "tokenGroupsNoGCAcceptable", "primaryGroupID", objectSid_attr, construct_token_groups_no_gc},
+ { "tokenGroupsGlobalAndUniversal", "primaryGroupID", objectSid_attr, construct_global_universal_token_groups },
{ "parentGUID", NULL, NULL, construct_parent_guid },
{ "subSchemaSubEntry", NULL, NULL, construct_subschema_subentry },
{ "msDS-isRODC", "objectClass", objectCategory_attr, construct_msds_isrodc },
import samba.getopt as options
from samba.auth import system_session
-from samba import ldb
+from samba import ldb, dsdb
from samba.samdb import SamDB
from samba.auth import AuthContext
from samba.ndr import ndr_unpack
from samba import gensec
-from samba.credentials import Credentials
-
+from samba.credentials import Credentials, DONT_USE_KERBEROS
+from samba.dsdb import GTYPE_SECURITY_GLOBAL_GROUP, GTYPE_SECURITY_UNIVERSAL_GROUP
import samba.tests
+from samba.tests import delete_force
from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
+creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+
+def closure(vSet, wSet, aSet):
+ for edge in aSet:
+ start, end = edge
+ if start in wSet:
+ if end not in wSet and end in vSet:
+ wSet.add(end)
+ closure(vSet, wSet, aSet)
-class TokenTest(samba.tests.TestCase):
+class StaticTokenTest(samba.tests.TestCase):
def setUp(self):
- super(TokenTest, self).setUp()
- self.ldb = samdb
- self.base_dn = samdb.domain_dn()
+ super(StaticTokenTest, self).setUp()
+ self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
+ self.base_dn = self.ldb.domain_dn()
res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
self.assertEquals(len(res), 1)
print("difference : %s" % sidset1.difference(sidset2))
self.fail(msg="calculated groups don't match against user PAC tokenGroups")
+class DynamicTokenTest(samba.tests.TestCase):
+
+ def get_creds(self, target_username, target_password):
+ creds_tmp = Credentials()
+ creds_tmp.set_username(target_username)
+ creds_tmp.set_password(target_password)
+ creds_tmp.set_domain(creds.get_domain())
+ creds_tmp.set_realm(creds.get_realm())
+ creds_tmp.set_workstation(creds.get_workstation())
+ creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
+ | gensec.FEATURE_SEAL)
+ return creds_tmp
+
+ def get_ldb_connection(self, target_username, target_password):
+ creds_tmp = self.get_creds(target_username, target_password)
+ ldb_target = SamDB(url=url, credentials=creds_tmp, lp=lp)
+ return ldb_target
+
+ def setUp(self):
+ super(DynamicTokenTest, self).setUp()
+ self.admin_ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
+
+ self.base_dn = self.admin_ldb.domain_dn()
+
+ self.test_user = "tokengroups_user1"
+ self.test_user_pass = "samba123@"
+ self.admin_ldb.newuser(self.test_user, self.test_user_pass)
+ self.test_group0 = "tokengroups_group0"
+ self.admin_ldb.newgroup(self.test_group0, grouptype=dsdb.GTYPE_SECURITY_DOMAIN_LOCAL_GROUP)
+ res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group0, self.base_dn),
+ attrs=["objectSid"], scope=ldb.SCOPE_BASE)
+ self.test_group0_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
+
+ self.admin_ldb.add_remove_group_members(self.test_group0, [self.test_user],
+ add_members_operation=True)
+
+ self.test_group1 = "tokengroups_group1"
+ self.admin_ldb.newgroup(self.test_group1, grouptype=dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
+ res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group1, self.base_dn),
+ attrs=["objectSid"], scope=ldb.SCOPE_BASE)
+ self.test_group1_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
+
+ self.admin_ldb.add_remove_group_members(self.test_group1, [self.test_user],
+ add_members_operation=True)
+
+ self.test_group2 = "tokengroups_group2"
+ self.admin_ldb.newgroup(self.test_group2, grouptype=dsdb.GTYPE_SECURITY_UNIVERSAL_GROUP)
+
+ res = self.admin_ldb.search(base="cn=%s,cn=users,%s" % (self.test_group2, self.base_dn),
+ attrs=["objectSid"], scope=ldb.SCOPE_BASE)
+ self.test_group2_sid = ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["objectSid"][0])
+
+ self.admin_ldb.add_remove_group_members(self.test_group2, [self.test_user],
+ add_members_operation=True)
+
+ self.ldb = self.get_ldb_connection(self.test_user, self.test_user_pass)
+
+ res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
+ self.assertEquals(len(res), 1)
+
+ self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
+
+ res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=[])
+ self.assertEquals(len(res), 1)
+
+ self.test_user_dn = res[0].dn
+
+ session_info_flags = ( AUTH_SESSION_INFO_DEFAULT_GROUPS |
+ AUTH_SESSION_INFO_AUTHENTICATED |
+ AUTH_SESSION_INFO_SIMPLE_PRIVILEGES)
+ session = samba.auth.user_session(self.ldb, lp_ctx=lp, dn=self.user_sid_dn,
+ session_info_flags=session_info_flags)
+
+ token = session.security_token
+ self.user_sids = []
+ for s in token.sids:
+ self.user_sids.append(str(s))
+
+ def tearDown(self):
+ super(DynamicTokenTest, self).tearDown()
+ delete_force(self.admin_ldb, "CN=%s,%s,%s" %
+ (self.test_user, "cn=users", self.base_dn))
+ delete_force(self.admin_ldb, "CN=%s,%s,%s" %
+ (self.test_group0, "cn=users", self.base_dn))
+ delete_force(self.admin_ldb, "CN=%s,%s,%s" %
+ (self.test_group1, "cn=users", self.base_dn))
+ delete_force(self.admin_ldb, "CN=%s,%s,%s" %
+ (self.test_group2, "cn=users", self.base_dn))
+
+ def test_rootDSE_tokenGroups(self):
+ """Testing rootDSE tokengroups against internal calculation"""
+ if not url.startswith("ldap"):
+ self.fail(msg="This test is only valid on ldap")
+
+ res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
+ self.assertEquals(len(res), 1)
+
+ print("Getting tokenGroups from rootDSE")
+ tokengroups = []
+ for sid in res[0]['tokenGroups']:
+ tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
+
+ sidset1 = set(tokengroups)
+ sidset2 = set(self.user_sids)
+ if len(sidset1.difference(sidset2)):
+ print("token sids don't match")
+ print("tokengroups: %s" % tokengroups)
+ print("calculated : %s" % self.user_sids)
+ print("difference : %s" % sidset1.difference(sidset2))
+ self.fail(msg="calculated groups don't match against rootDSE tokenGroups")
+
+ def test_dn_tokenGroups(self):
+ print("Getting tokenGroups from user DN")
+ res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
+ self.assertEquals(len(res), 1)
+
+ dn_tokengroups = []
+ for sid in res[0]['tokenGroups']:
+ dn_tokengroups.append(str(ndr_unpack(samba.dcerpc.security.dom_sid, sid)))
+
+ sidset1 = set(dn_tokengroups)
+ sidset2 = set(self.user_sids)
+ if len(sidset1.difference(sidset2)):
+ print("token sids don't match")
+ print("difference : %s" % sidset1.difference(sidset2))
+ self.fail(msg="calculated groups don't match against user DN tokenGroups")
+
+ def test_pac_groups(self):
+ settings = {}
+ settings["lp_ctx"] = lp
+ settings["target_hostname"] = lp.get("netbios name")
+
+ gensec_client = gensec.Security.start_client(settings)
+ gensec_client.set_credentials(self.get_creds(self.test_user, self.test_user_pass))
+ gensec_client.want_feature(gensec.FEATURE_SEAL)
+ gensec_client.start_mech_by_sasl_name("GSSAPI")
+
+ auth_context = AuthContext(lp_ctx=lp, ldb=self.ldb, methods=[])
+
+ gensec_server = gensec.Security.start_server(settings, auth_context)
+ machine_creds = Credentials()
+ machine_creds.guess(lp)
+ machine_creds.set_machine_account(lp)
+ gensec_server.set_credentials(machine_creds)
+
+ gensec_server.want_feature(gensec.FEATURE_SEAL)
+ gensec_server.start_mech_by_sasl_name("GSSAPI")
+
+ client_finished = False
+ server_finished = False
+ server_to_client = ""
+
+ # Run the actual call loop.
+ while client_finished == False and server_finished == False:
+ if not client_finished:
+ print "running client gensec_update"
+ (client_finished, client_to_server) = gensec_client.update(server_to_client)
+ if not server_finished:
+ print "running server gensec_update"
+ (server_finished, server_to_client) = gensec_server.update(client_to_server)
+
+ session = gensec_server.session_info()
+
+ token = session.security_token
+ pac_sids = []
+ for s in token.sids:
+ pac_sids.append(str(s))
+
+ sidset1 = set(pac_sids)
+ sidset2 = set(self.user_sids)
+ if len(sidset1.difference(sidset2)):
+ print("token sids don't match")
+ print("difference : %s" % sidset1.difference(sidset2))
+ self.fail(msg="calculated groups don't match against user PAC tokenGroups")
+
+
+ def test_tokenGroups_manual(self):
+ # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
+ # and compare the result
+ res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
+ expression="(|(objectclass=user)(objectclass=group))",
+ attrs=["memberOf"])
+ aSet = set()
+ aSetR = set()
+ vSet = set()
+ for obj in res:
+ if "memberOf" in obj:
+ for dn in obj["memberOf"]:
+ first = obj.dn.get_casefold()
+ second = ldb.Dn(self.admin_ldb, dn).get_casefold()
+ aSet.add((first, second))
+ aSetR.add((second, first))
+ vSet.add(first)
+ vSet.add(second)
+
+ res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
+ expression="(objectclass=user)",
+ attrs=["primaryGroupID"])
+ for obj in res:
+ if "primaryGroupID" in obj:
+ sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0]))
+ res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
+ attrs=[])
+ first = obj.dn.get_casefold()
+ second = res2[0].dn.get_casefold()
+
+ aSet.add((first, second))
+ aSetR.add((second, first))
+ vSet.add(first)
+ vSet.add(second)
+
+ wSet = set()
+ wSet.add(self.test_user_dn.get_casefold())
+ closure(vSet, wSet, aSet)
+ wSet.remove(self.test_user_dn.get_casefold())
+
+ tokenGroupsSet = set()
+
+ res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
+ self.assertEquals(len(res), 1)
+
+ dn_tokengroups = []
+ for sid in res[0]['tokenGroups']:
+ sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
+ res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
+ attrs=[])
+ tokenGroupsSet.add(res3[0].dn.get_casefold())
+
+ if len(wSet.difference(tokenGroupsSet)):
+ self.fail(msg="additional calculated: %s" % wSet.difference(tokenGroupsSet))
+
+ if len(tokenGroupsSet.difference(wSet)):
+ self.fail(msg="additional tokenGroups: %s" % tokenGroupsSet.difference(wSet))
+
+
+ def filtered_closure(self, wSet, filter_grouptype):
+ res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
+ expression="(|(objectclass=user)(objectclass=group))",
+ attrs=["memberOf"])
+ aSet = set()
+ aSetR = set()
+ vSet = set()
+ for obj in res:
+ vSet.add(obj.dn.get_casefold())
+ if "memberOf" in obj:
+ for dn in obj["memberOf"]:
+ first = obj.dn.get_casefold()
+ second = ldb.Dn(self.admin_ldb, dn).get_casefold()
+ aSet.add((first, second))
+ aSetR.add((second, first))
+ vSet.add(first)
+ vSet.add(second)
+
+ res = self.admin_ldb.search(base=self.base_dn, scope=ldb.SCOPE_SUBTREE,
+ expression="(objectclass=user)",
+ attrs=["primaryGroupID"])
+ for obj in res:
+ if "primaryGroupID" in obj:
+ sid = "%s-%d" % (self.admin_ldb.get_domain_sid(), int(obj["primaryGroupID"][0]))
+ res2 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
+ attrs=[])
+ first = obj.dn.get_casefold()
+ second = res2[0].dn.get_casefold()
+
+ aSet.add((first, second))
+ aSetR.add((second, first))
+ vSet.add(first)
+ vSet.add(second)
+
+ uSet = set()
+ for v in vSet:
+ res_group = self.admin_ldb.search(base=v, scope=ldb.SCOPE_BASE,
+ attrs=["groupType"],
+ expression="objectClass=group")
+ if len(res_group) == 1:
+ if hex(int(res_group[0]["groupType"][0]) & 0x00000000FFFFFFFF) == hex(filter_grouptype):
+ uSet.add(v)
+ else:
+ uSet.add(v)
+
+ closure(uSet, wSet, aSet)
+
+
+ def test_tokenGroupsGlobalAndUniversal_manual(self):
+ # Manually run the tokenGroups algorithm from MS-ADTS 3.1.1.4.5.19 and MS-DRSR 4.1.8.3
+ # and compare the result
+
+ # The variable names come from MS-ADTS May 15, 2014
+
+ S = set()
+ S.add(self.test_user_dn.get_casefold())
+
+ self.filtered_closure(S, GTYPE_SECURITY_GLOBAL_GROUP)
+
+ T = set()
+ # Not really a SID, we do this on DNs...
+ for sid in S:
+ X = set()
+ X.add(sid)
+ self.filtered_closure(X, GTYPE_SECURITY_UNIVERSAL_GROUP)
+
+ T = T.union(X)
+
+ T.remove(self.test_user_dn.get_casefold())
+
+ tokenGroupsSet = set()
+
+ res = self.ldb.search(self.user_sid_dn, scope=ldb.SCOPE_BASE, attrs=["tokenGroupsGlobalAndUniversal"])
+ self.assertEquals(len(res), 1)
+
+ dn_tokengroups = []
+ for sid in res[0]['tokenGroupsGlobalAndUniversal']:
+ sid = ndr_unpack(samba.dcerpc.security.dom_sid, sid)
+ res3 = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
+ attrs=[])
+ tokenGroupsSet.add(res3[0].dn.get_casefold())
+
+ if len(T.difference(tokenGroupsSet)):
+ self.fail(msg="additional calculated: %s" % T.difference(tokenGroupsSet))
+
+ if len(tokenGroupsSet.difference(T)):
+ self.fail(msg="additional tokenGroupsGlobalAndUniversal: %s" % tokenGroupsSet.difference(T))
if not "://" in url:
if os.path.isfile(url):
else:
url = "ldap://%s" % url
-samdb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
-
TestProgram(module=__name__, opts=subunitopts)