}
+/**
+ * Encrypt a data blob using the session key and the negotiated encryption
+ * algorithm
+ *
+ * @param state Credential state, contains the session key and algorithm
+ * @param data Data blob containing the data to be encrypted.
+ *
+ */
+_PUBLIC_ NTSTATUS netlogon_creds_session_encrypt(
+ struct netlogon_creds_CredentialState *state,
+ DATA_BLOB data)
+{
+ if (data.data == NULL || data.length == 0) {
+ DBG_ERR("Nothing to encrypt "
+ "data.data == NULL or data.length == 0");
+ return NT_STATUS_INVALID_PARAMETER;
+ }
+ /*
+ * Don't crypt an all-zero password it will give away the
+ * NETLOGON pipe session key .
+ */
+ if (all_zero(data.data, data.length)) {
+ DBG_ERR("Supplied data all zeros, could leak session key");
+ return NT_STATUS_INVALID_PARAMETER;
+ }
+ if (state->negotiate_flags & NETLOGON_NEG_SUPPORTS_AES) {
+ netlogon_creds_aes_encrypt(state,
+ data.data,
+ data.length);
+ } else if (state->negotiate_flags & NETLOGON_NEG_ARCFOUR) {
+ netlogon_creds_arcfour_crypt(state,
+ data.data,
+ data.length);
+ } else {
+ DBG_ERR("Unsupported encryption option negotiated");
+ return NT_STATUS_NOT_SUPPORTED;
+ }
+ return NT_STATUS_OK;
+}
+
*/
struct netlogon_creds_CredentialState *cli_credentials_get_netlogon_creds(struct cli_credentials *cred);
+NTSTATUS netlogon_creds_session_encrypt(
+ struct netlogon_creds_CredentialState *state,
+ DATA_BLOB data);
+
#endif /* __CREDENTIALS_H__ */
#include "pycredentials.h"
#include "param/param.h"
#include "lib/cmdline/credentials.h"
+#include "auth/credentials/credentials_internal.h"
#include "librpc/gen_ndr/samr.h" /* for struct samr_Password */
+#include "librpc/gen_ndr/netlogon.h"
#include "libcli/util/pyerrors.h"
+#include "libcli/auth/libcli_auth.h"
#include "param/pyparam.h"
#include <tevent.h>
#include "libcli/auth/libcli_auth.h"
Py_RETURN_NONE;
}
+static PyObject *py_creds_encrypt_netr_crypt_password(PyObject *self,
+ PyObject *args)
+{
+ DATA_BLOB data = data_blob_null;
+ struct cli_credentials *creds = NULL;
+ struct netr_CryptPassword *pwd = NULL;
+ NTSTATUS status;
+ PyObject *py_cp = Py_None;
+
+ creds = PyCredentials_AsCliCredentials(self);
+
+ if (!PyArg_ParseTuple(args, "|O", &py_cp)) {
+ return NULL;
+ }
+ pwd = pytalloc_get_type(py_cp, struct netr_CryptPassword);
+ data.length = sizeof(struct netr_CryptPassword);
+ data.data = (uint8_t *)pwd;
+ status = netlogon_creds_session_encrypt(creds->netlogon_creds, data);
+
+ PyErr_NTSTATUS_IS_ERR_RAISE(status);
+
+ Py_RETURN_NONE;
+}
static PyMethodDef py_creds_methods[] = {
{ "get_username", py_creds_get_username, METH_NOARGS,
"Get a new client NETLOGON_AUTHENTICATOR"},
{ "set_secure_channel_type", py_creds_set_secure_channel_type,
METH_VARARGS, NULL },
+ { "encrypt_netr_crypt_password",
+ py_creds_encrypt_netr_crypt_password,
+ METH_VARARGS,
+ "S.encrypt_netr_crypt_password(password) -> NTSTATUS\n"
+ "Encrypt the supplied password using the session key and\n"
+ "the negotiated encryption algorithm in place\n"
+ "i.e. it overwrites the original data"},
{ NULL }
};
(authenticator, subsequent) = self.get_authenticator(c)
self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
+ # Test Credentials.encrypt_netr_crypt_password
+ # By performing a NetrServerPasswordSet2
+ # And the logging on using the new password.
+ def test_encrypt_netr_password(self):
+ # Change the password
+ self.do_Netr_ServerPasswordSet2()
+ # Now use the new password to perform an operation
+ self.do_DsrEnumerateDomainTrusts()
+ # Change the current machine account pazssword with a
+ # netr_ServerPasswordSet2 call.
+
+ def do_Netr_ServerPasswordSet2(self):
+ c = self.get_netlogon_connection()
+ (authenticator, subsequent) = self.get_authenticator(c)
+ PWD_LEN = 32
+ DATA_LEN = 512
+ newpass = samba.generate_random_password(PWD_LEN, PWD_LEN)
+ filler = [ord(x) for x in os.urandom(DATA_LEN-PWD_LEN)]
+ pwd = netlogon.netr_CryptPassword()
+ pwd.length = PWD_LEN
+ pwd.data = filler + [ord(x) for x in newpass]
+ self.machine_creds.encrypt_netr_crypt_password(pwd)
+ c.netr_ServerPasswordSet2(self.server,
+ self.machine_creds.get_workstation(),
+ SEC_CHAN_WKSTA,
+ self.machine_name,
+ authenticator,
+ pwd)
+
+ self.machine_pass = newpass
+ self.machine_creds.set_password(newpass)
+
+ # Perform a DsrEnumerateDomainTrusts, this provides confirmation that
+ # a netlogon connection has been correctly established
+ def do_DsrEnumerateDomainTrusts(self):
+ c = self.get_netlogon_connection()
+ trusts = c.netr_DsrEnumerateDomainTrusts(
+ self.server,
+ netlogon.NETR_TRUST_FLAG_IN_FOREST |
+ netlogon.NETR_TRUST_FLAG_OUTBOUND |
+ netlogon.NETR_TRUST_FLAG_INBOUND)
+
+ # Establish sealed schannel netlogon connection over TCP/IP
#
- # Establish aealed schannel netlogon connection over TCP/IP
def get_netlogon_connection(self):
return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server,
self.lp,
self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
self.machine_creds.set_password(self.machine_pass)
self.machine_creds.set_username(self.machine_name + "$")
+ self.machine_creds.set_workstation(self.machine_name)
#
# Create a test user account
self.user_creds.guess(self.get_loadparm())
self.user_creds.set_password(self.user_pass)
self.user_creds.set_username(self.user_name)
+ self.user_creds.set_workstation(self.machine_name)
pass
#