pycredentials: add function to return the netr_Authenticator
authorGary Lockyer <gary@catalyst.net.nz>
Thu, 15 Jun 2017 03:55:43 +0000 (15:55 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Thu, 22 Jun 2017 06:56:22 +0000 (08:56 +0200)
Add method new_client_authenticator that returns data to allow a
netr_Authenticator to be constructed.
Allows python to make netr_LogonSamLogonWithFlags,
netr_LogonGetDomainInfo and similar calls

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
auth/credentials/pycredentials.c
python/samba/tests/py_credentials.py [new file with mode: 0644]
source4/selftest/tests.py

index fee9556b180c763c09dc5deb75e9a97e23c94a2e..30698d49dc5e7d0b01f5bb661401cb49762ede7a 100644 (file)
@@ -26,6 +26,8 @@
 #include "libcli/util/pyerrors.h"
 #include "param/pyparam.h"
 #include <tevent.h>
+#include "libcli/auth/libcli_auth.h"
+#include "auth/credentials/credentials_internal.h"
 
 void initcredentials(void);
 
@@ -584,6 +586,39 @@ static PyObject *py_creds_get_gensec_features(PyObject *self, PyObject *args)
        return PyInt_FromLong(gensec_features);
 }
 
+static PyObject *py_creds_new_client_authenticator(PyObject *self,
+                                                  PyObject *args)
+{
+       struct netr_Authenticator auth;
+       struct cli_credentials *creds = NULL;
+       struct netlogon_creds_CredentialState *nc = NULL;
+       PyObject *ret = NULL;
+
+       creds = PyCredentials_AsCliCredentials(self);
+       if (creds == NULL) {
+               PyErr_SetString(PyExc_RuntimeError,
+                               "Failed to get credentials from python");
+               return NULL;
+       }
+
+       nc = creds->netlogon_creds;
+       if (nc == NULL) {
+               PyErr_SetString(PyExc_ValueError,
+                               "No netlogon credentials cannot make "
+                               "client authenticator");
+               return NULL;
+       }
+
+       netlogon_creds_client_authenticator(
+               nc,
+               &auth);
+       ret = Py_BuildValue("{ss#si}",
+                           "credential",
+                           (const char *) &auth.cred, sizeof(auth.cred),
+                           "timestamp", auth.timestamp);
+       return ret;
+}
+
 static PyObject *py_creds_set_secure_channel_type(PyObject *self, PyObject *args)
 {
        unsigned int channel_type;
@@ -700,6 +735,11 @@ static PyMethodDef py_creds_methods[] = {
        { "set_forced_sasl_mech", py_creds_set_forced_sasl_mech, METH_VARARGS,
                "S.set_forced_sasl_mech(name) -> None\n"
                "Set forced SASL mechanism." },
+       { "new_client_authenticator",
+               py_creds_new_client_authenticator,
+               METH_NOARGS,
+               "S.new_client_authenticator() -> Authenticator\n"
+               "Get a new client NETLOGON_AUTHENTICATOR"},
        { "set_secure_channel_type", py_creds_set_secure_channel_type,
          METH_VARARGS, NULL },
        { NULL }
diff --git a/python/samba/tests/py_credentials.py b/python/samba/tests/py_credentials.py
new file mode 100644 (file)
index 0000000..fd9853a
--- /dev/null
@@ -0,0 +1,241 @@
+# Integration tests for pycredentials
+#
+# Copyright (C) Catalyst IT Ltd. 2017
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+from samba.tests import TestCase, delete_force
+import os
+
+import samba
+from samba.auth import system_session
+from samba.credentials import Credentials, CLI_CRED_NTLMv2_AUTH
+from samba.dcerpc import netlogon, ntlmssp
+from samba.dcerpc.netlogon import netr_Authenticator, netr_WorkstationInformation
+from samba.dcerpc.misc import SEC_CHAN_WKSTA
+from samba.dsdb import (
+    UF_WORKSTATION_TRUST_ACCOUNT,
+    UF_PASSWD_NOTREQD,
+    UF_NORMAL_ACCOUNT)
+from samba.ndr import ndr_pack
+from samba.samdb import SamDB
+"""
+Integration tests for pycredentials
+"""
+
+MACHINE_NAME = "PCTM"
+USER_NAME    = "PCTU"
+
+class PyCredentialsTests(TestCase):
+
+    def setUp(self):
+        super(PyCredentialsTests, self).setUp()
+
+        self.server      = os.environ["SERVER"]
+        self.domain      = os.environ["DOMAIN"]
+        self.host        = os.environ["SERVER_IP"]
+        self.lp          = self.get_loadparm()
+
+        self.credentials = self.get_credentials()
+
+        self.session     = system_session()
+        self.ldb = SamDB(url="ldap://%s" % self.host,
+                         session_info=self.session,
+                         credentials=self.credentials,
+                         lp=self.lp)
+
+        self.create_machine_account()
+        self.create_user_account()
+
+
+    def tearDown(self):
+        super(PyCredentialsTests, self).tearDown()
+        delete_force(self.ldb, self.machine_dn)
+        delete_force(self.ldb, self.user_dn)
+
+    # Until a successful netlogon connection has been established there will
+    # not be a valid authenticator associated with the credentials
+    # and new_client_authenticator should throw a ValueError
+    def test_no_netlogon_connection(self):
+        self.assertRaises(ValueError,
+                          self.machine_creds.new_client_authenticator)
+
+    # Once a netlogon connection has been established,
+    # new_client_authenticator should return a value
+    #
+    def test_have_netlogon_connection(self):
+        c = self.get_netlogon_connection()
+        a = self.machine_creds.new_client_authenticator()
+        self.assertIsNotNone(a)
+
+    # Get an authenticator and use it on a sequence of operations requiring
+    # an authenticator
+    def test_client_authenticator(self):
+        c = self.get_netlogon_connection()
+        (authenticator, subsequent) = self.get_authenticator(c)
+        self.do_NetrLogonSamLogonWithFlags(c, authenticator, subsequent)
+        (authenticator, subsequent) = self.get_authenticator(c)
+        self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
+        (authenticator, subsequent) = self.get_authenticator(c)
+        self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
+        (authenticator, subsequent) = self.get_authenticator(c)
+        self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
+
+
+
+    #
+    # Establish aealed schannel netlogon connection over TCP/IP
+    def get_netlogon_connection(self):
+        return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server,
+                                 self.lp,
+                                 self.machine_creds)
+
+    #
+    # Create the machine account
+    def create_machine_account(self):
+        self.machine_pass = samba.generate_random_password(32, 32)
+        self.machine_name = MACHINE_NAME
+        self.machine_dn = "cn=%s,%s" % (self.machine_name, self.ldb.domain_dn())
+
+        # remove the account if it exists, this will happen if a previous test
+        # run failed
+        delete_force(self.ldb, self.machine_dn)
+
+        utf16pw = unicode(
+            '"' + self.machine_pass.encode('utf-8') + '"', 'utf-8'
+        ).encode('utf-16-le')
+        self.ldb.add({
+            "dn": self.machine_dn,
+            "objectclass": "computer",
+            "sAMAccountName": "%s$" % self.machine_name,
+            "userAccountControl":
+                str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
+            "unicodePwd": utf16pw})
+
+        self.machine_creds = Credentials()
+        self.machine_creds.guess(self.get_loadparm())
+        self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
+        self.machine_creds.set_password(self.machine_pass)
+        self.machine_creds.set_username(self.machine_name + "$")
+
+    #
+    # Create a test user account
+    def create_user_account(self):
+        self.user_pass = samba.generate_random_password(32, 32)
+        self.user_name = USER_NAME
+        self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
+
+        # remove the account if it exists, this will happen if a previous test
+        # run failed
+        delete_force(self.ldb, self.user_dn)
+
+        utf16pw = unicode(
+            '"' + self.user_pass.encode('utf-8') + '"', 'utf-8'
+        ).encode('utf-16-le')
+        self.ldb.add({
+            "dn": self.user_dn,
+            "objectclass": "user",
+            "sAMAccountName": "%s" % self.user_name,
+            "userAccountControl": str(UF_NORMAL_ACCOUNT),
+            "unicodePwd": utf16pw})
+
+        self.user_creds = Credentials()
+        self.user_creds.guess(self.get_loadparm())
+        self.user_creds.set_password(self.user_pass)
+        self.user_creds.set_username(self.user_name)
+        pass
+
+    #
+    # Get the authenticator from the machine creds.
+    def get_authenticator(self, c):
+        auth = self.machine_creds.new_client_authenticator();
+        current  = netr_Authenticator()
+        current.cred.data = [ord(x) for x in auth["credential"]]
+        current.timestamp = auth["timestamp"]
+
+        subsequent = netr_Authenticator()
+        return (current, subsequent)
+
+    def do_NetrLogonSamLogonWithFlags(self, c, current, subsequent):
+        logon = samlogon_logon_info(self.domain,
+                                    self.machine_name,
+                                    self.user_creds)
+
+        logon_level = netlogon.NetlogonNetworkTransitiveInformation
+        validation_level = netlogon.NetlogonValidationSamInfo4
+        netr_flags = 0
+        c.netr_LogonSamLogonWithFlags(self.server,
+                                      self.user_creds.get_workstation(),
+                                      current,
+                                      subsequent,
+                                      logon_level,
+                                      logon,
+                                      validation_level,
+                                      netr_flags)
+
+    def do_NetrLogonGetDomainInfo(self, c, current, subsequent):
+        query = netr_WorkstationInformation()
+
+        c.netr_LogonGetDomainInfo(self.server,
+                                  self.user_creds.get_workstation(),
+                                  current,
+                                  subsequent,
+                                  2,
+                                  query)
+
+#
+# Build the logon data required by NetrLogonSamLogonWithFlags
+def samlogon_logon_info(domain_name, computer_name, creds):
+
+    target_info_blob = samlogon_target(domain_name, computer_name)
+
+    challenge = b"abcdefgh"
+    # User account under test
+    response = creds.get_ntlm_response(flags=CLI_CRED_NTLMv2_AUTH,
+                                       challenge=challenge,
+                                       target_info=target_info_blob)
+
+    logon = netlogon.netr_NetworkInfo()
+
+    logon.challenge     = [ord(x) for x in challenge]
+    logon.nt            = netlogon.netr_ChallengeResponse()
+    logon.nt.length     = len(response["nt_response"])
+    logon.nt.data       = [ord(x) for x in response["nt_response"]]
+    logon.identity_info = netlogon.netr_IdentityInfo()
+
+    (username, domain)  = creds.get_ntlm_username_domain()
+    logon.identity_info.domain_name.string  = domain
+    logon.identity_info.account_name.string = username
+    logon.identity_info.workstation.string  = creds.get_workstation()
+
+    return logon
+
+#
+# Build the samlogon target info.
+def samlogon_target(domain_name, computer_name):
+    target_info = ntlmssp.AV_PAIR_LIST()
+    target_info.count = 3
+    computername = ntlmssp.AV_PAIR()
+    computername.AvId = ntlmssp.MsvAvNbComputerName
+    computername.Value = computer_name
+
+    domainname = ntlmssp.AV_PAIR()
+    domainname.AvId = ntlmssp.MsvAvNbDomainName
+    domainname.Value = domain_name
+
+    eol = ntlmssp.AV_PAIR()
+    eol.AvId = ntlmssp.MsvAvEOL
+    target_info.pair = [domainname, computername, eol]
+
+    return ndr_pack(target_info)
index b70faafa5e5524a2addef1fb1841664f5991b3e3..049009fe125fd1a89c4411de04ec6297da48b5a5 100755 (executable)
@@ -659,6 +659,9 @@ planoldpythontestsuite("ad_dc",
                        extra_args=['-U"$USERNAME%$PASSWORD"'])
 
 planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.lsa_string")
+planoldpythontestsuite("ad_dc_ntvfs",
+                       "samba.tests.py_credentials",
+                       extra_args=['-U"$USERNAME%$PASSWORD"'])
 
 plantestsuite_loadlist("samba4.ldap.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
 plantestsuite_loadlist("samba4.tokengroups.krb5.python(ad_dc_ntvfs)", "ad_dc_ntvfs:local", [python, os.path.join(samba4srcdir, "dsdb/tests/python/token_group.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '-k', 'yes', '$LOADLIST', '$LISTOPT'])