-/*
+/*
Unix SMB/CIFS implementation.
Samba internal messaging functions
Copyright (C) Andrew Tridgell 2004
-
+
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
-
+
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
-
+
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
#include "lib/events/events.h"
+#include "lib/util/server_id.h"
#include "system/filesys.h"
#include "messaging/messaging.h"
+#include "messaging/messaging_internal.h"
#include "../lib/util/dlinklist.h"
#include "lib/socket/socket.h"
#include "librpc/gen_ndr/ndr_irpc.h"
#include "lib/messaging/irpc.h"
-#include "lib/tdb_wrap/tdb_wrap.h"
#include "../lib/util/unix_privs.h"
#include "librpc/rpc/dcerpc.h"
-#include "../lib/tdb_compat/tdb_compat.h"
-#include "../lib/util/util_tdb.h"
#include "cluster/cluster.h"
#include "../lib/util/tevent_ntstatus.h"
#include "lib/param/param.h"
+#include "lib/util/server_id_db.h"
+#include "lib/util/talloc_report.h"
+#include "../source3/lib/messages_dgm.h"
+#include "../source3/lib/messages_dgm_ref.h"
+#include "../source3/lib/messages_util.h"
+#include <tdb.h>
/* change the message version with any incompatible changes in the protocol */
#define IMESSAGING_VERSION 1
} incoming;
};
-struct imessaging_context {
- struct server_id server_id;
- struct socket_context *sock;
- const char *base_path;
- const char *path;
- struct loadparm_context *lp_ctx;
- struct dispatch_fn **dispatch;
- uint32_t num_types;
- struct idr_context *dispatch_tree;
- struct imessaging_rec *pending;
- struct imessaging_rec *retry_queue;
- struct irpc_list *irpc;
- struct idr_context *idr;
- const char **names;
- struct timeval start_time;
- struct tevent_timer *retry_te;
- struct {
- struct tevent_context *ev;
- struct tevent_fd *fde;
- } event;
-};
-
/* we have a linked list of dispatch handlers for each msg_type that
this messaging server can deal with */
struct dispatch_fn {
};
/* an individual message */
-struct imessaging_rec {
- struct imessaging_rec *next, *prev;
- struct imessaging_context *msg;
- const char *path;
-
- struct imessaging_header {
- uint32_t version;
- uint32_t msg_type;
- struct server_id from;
- struct server_id to;
- uint32_t length;
- } *header;
-
- DATA_BLOB packet;
- uint32_t retries;
-};
-
static void irpc_handler(struct imessaging_context *, void *,
uint32_t, struct server_id, DATA_BLOB *);
static void ping_message(struct imessaging_context *msg, void *private_data,
uint32_t msg_type, struct server_id src, DATA_BLOB *data)
{
- char *task_id = server_id_str(NULL, &src);
+ struct server_id_buf idbuf;
DEBUG(1,("INFO: Received PING message from server %s [%.*s]\n",
- task_id, (int)data->length,
+ server_id_str_buf(src, &idbuf), (int)data->length,
data->data?(const char *)data->data:""));
- talloc_free(task_id);
imessaging_send(msg, src, MSG_PONG, data);
}
-/*
- return uptime of messaging server via irpc
-*/
-static NTSTATUS irpc_uptime(struct irpc_message *msg,
- struct irpc_uptime *r)
-{
- struct imessaging_context *ctx = talloc_get_type(msg->private_data, struct imessaging_context);
- *r->out.start_time = timeval_to_nttime(&ctx->start_time);
- return NT_STATUS_OK;
-}
-
-/*
- return the path to a messaging socket
-*/
-static char *imessaging_path(struct imessaging_context *msg, struct server_id server_id)
-{
- TALLOC_CTX *tmp_ctx = talloc_new(msg);
- const char *id = server_id_str(tmp_ctx, &server_id);
- char *s;
- if (id == NULL) {
- return NULL;
- }
- s = talloc_asprintf(msg, "%s/msg.%s", msg->base_path, id);
- talloc_steal(s, tmp_ctx);
- return s;
-}
-
-/*
- dispatch a fully received message
-
- note that this deliberately can match more than one message handler
- per message. That allows a single messasging context to register
- (for example) a debug handler for more than one piece of code
-*/
-static void imessaging_dispatch(struct imessaging_context *msg, struct imessaging_rec *rec)
+static void pool_message(struct imessaging_context *msg, void *private_data,
+ uint32_t msg_type, struct server_id src,
+ DATA_BLOB *data)
{
- struct dispatch_fn *d, *next;
+ char *report;
- /* temporary IDs use an idtree, the rest use a array of pointers */
- if (rec->header->msg_type >= MSG_TMP_BASE) {
- d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
- rec->header->msg_type);
- } else if (rec->header->msg_type < msg->num_types) {
- d = msg->dispatch[rec->header->msg_type];
- } else {
- d = NULL;
- }
+ report = talloc_report_str(msg, NULL);
- for (; d; d = next) {
- DATA_BLOB data;
- next = d->next;
- data.data = rec->packet.data + sizeof(*rec->header);
- data.length = rec->header->length;
- d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data);
+ if (report != NULL) {
+ DATA_BLOB blob = { .data = (uint8_t *)report,
+ .length = talloc_get_size(report) - 1};
+ imessaging_send(msg, src, MSG_POOL_USAGE, &blob);
}
- rec->header->length = 0;
+ talloc_free(report);
}
-/*
- handler for messages that arrive from other nodes in the cluster
-*/
-static void cluster_message_handler(struct imessaging_context *msg, DATA_BLOB packet)
-{
- struct imessaging_rec *rec;
-
- rec = talloc(msg, struct imessaging_rec);
- if (rec == NULL) {
- smb_panic("Unable to allocate imessaging_rec");
- }
-
- rec->msg = msg;
- rec->path = msg->path;
- rec->header = (struct imessaging_header *)packet.data;
- rec->packet = packet;
- rec->retries = 0;
-
- if (packet.length != sizeof(*rec->header) + rec->header->length) {
- DEBUG(0,("messaging: bad message header size %d should be %d\n",
- rec->header->length, (int)(packet.length - sizeof(*rec->header))));
- talloc_free(rec);
- return;
- }
-
- imessaging_dispatch(msg, rec);
- talloc_free(rec);
-}
-
-
-
-/*
- try to send the message
-*/
-static NTSTATUS try_send(struct imessaging_rec *rec)
+static void ringbuf_log_msg(struct imessaging_context *msg,
+ void *private_data,
+ uint32_t msg_type,
+ struct server_id src,
+ DATA_BLOB *data)
{
- struct imessaging_context *msg = rec->msg;
- size_t nsent;
- void *priv;
- NTSTATUS status;
- struct socket_address *path;
+ char *log = debug_get_ringbuf();
+ size_t logsize = debug_get_ringbuf_size();
+ DATA_BLOB blob;
- /* rec->path is the path of the *other* socket, where we want
- * this to end up */
- path = socket_address_from_strings(msg, msg->sock->backend_name,
- rec->path, 0);
- if (!path) {
- return NT_STATUS_NO_MEMORY;
+ if (log == NULL) {
+ log = discard_const_p(char, "*disabled*\n");
+ logsize = strlen(log) + 1;
}
- /* we send with privileges so messages work from any context */
- priv = root_privileges();
- status = socket_sendto(msg->sock, &rec->packet, &nsent, path);
- talloc_free(path);
- talloc_free(priv);
+ blob.data = (uint8_t *)log;
+ blob.length = logsize;
- return status;
+ imessaging_send(msg, src, MSG_RINGBUF_LOG, &blob);
}
/*
- retry backed off messages
-*/
-static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te,
- struct timeval t, void *private_data)
-{
- struct imessaging_context *msg = talloc_get_type(private_data,
- struct imessaging_context);
- msg->retry_te = NULL;
-
- /* put the messages back on the main queue */
- while (msg->retry_queue) {
- struct imessaging_rec *rec = msg->retry_queue;
- DLIST_REMOVE(msg->retry_queue, rec);
- DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
- }
-
- TEVENT_FD_WRITEABLE(msg->event.fde);
-}
-
-/*
- handle a socket write event
-*/
-static void imessaging_send_handler(struct imessaging_context *msg)
-{
- while (msg->pending) {
- struct imessaging_rec *rec = msg->pending;
- NTSTATUS status;
- status = try_send(rec);
- if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
- rec->retries++;
- if (rec->retries > 3) {
- /* we're getting continuous write errors -
- backoff this record */
- DLIST_REMOVE(msg->pending, rec);
- DLIST_ADD_END(msg->retry_queue, rec,
- struct imessaging_rec *);
- if (msg->retry_te == NULL) {
- msg->retry_te =
- tevent_add_timer(msg->event.ev, msg,
- timeval_current_ofs(1, 0),
- msg_retry_timer, msg);
- }
- }
- break;
- }
- rec->retries = 0;
- if (!NT_STATUS_IS_OK(status)) {
- TALLOC_CTX *tmp_ctx = talloc_new(msg);
- DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n",
- server_id_str(tmp_ctx, &rec->header->from),
- server_id_str(tmp_ctx, &rec->header->to),
- rec->header->msg_type,
- nt_errstr(status)));
- talloc_free(tmp_ctx);
- }
- DLIST_REMOVE(msg->pending, rec);
- talloc_free(rec);
- }
- if (msg->pending == NULL) {
- TEVENT_FD_NOT_WRITEABLE(msg->event.fde);
- }
-}
-
-/*
- handle a new incoming packet
+ return uptime of messaging server via irpc
*/
-static void imessaging_recv_handler(struct imessaging_context *msg)
+static NTSTATUS irpc_uptime(struct irpc_message *msg,
+ struct irpc_uptime *r)
{
- struct imessaging_rec *rec;
- NTSTATUS status;
- DATA_BLOB packet;
- size_t msize;
-
- /* see how many bytes are in the next packet */
- status = socket_pending(msg->sock, &msize);
- if (!NT_STATUS_IS_OK(status)) {
- DEBUG(0,("socket_pending failed in messaging - %s\n",
- nt_errstr(status)));
- return;
- }
-
- packet = data_blob_talloc(msg, NULL, msize);
- if (packet.data == NULL) {
- /* assume this is temporary and retry */
- return;
- }
-
- status = socket_recv(msg->sock, packet.data, msize, &msize);
- if (!NT_STATUS_IS_OK(status)) {
- data_blob_free(&packet);
- return;
- }
-
- if (msize < sizeof(*rec->header)) {
- DEBUG(0,("messaging: bad message of size %d\n", (int)msize));
- data_blob_free(&packet);
- return;
- }
-
- rec = talloc(msg, struct imessaging_rec);
- if (rec == NULL) {
- smb_panic("Unable to allocate imessaging_rec");
- }
-
- talloc_steal(rec, packet.data);
- rec->msg = msg;
- rec->path = msg->path;
- rec->header = (struct imessaging_header *)packet.data;
- rec->packet = packet;
- rec->retries = 0;
-
- if (msize != sizeof(*rec->header) + rec->header->length) {
- DEBUG(0,("messaging: bad message header size %d should be %d\n",
- rec->header->length, (int)(msize - sizeof(*rec->header))));
- talloc_free(rec);
- return;
- }
-
- imessaging_dispatch(msg, rec);
- talloc_free(rec);
+ struct imessaging_context *ctx = talloc_get_type(msg->private_data, struct imessaging_context);
+ *r->out.start_time = timeval_to_nttime(&ctx->start_time);
+ return NT_STATUS_OK;
}
-
-/*
- handle a socket event
-*/
-static void imessaging_handler(struct tevent_context *ev, struct tevent_fd *fde,
- uint16_t flags, void *private_data)
+static struct dispatch_fn *imessaging_find_dispatch(
+ struct imessaging_context *msg, uint32_t msg_type)
{
- struct imessaging_context *msg = talloc_get_type(private_data,
- struct imessaging_context);
- if (flags & TEVENT_FD_WRITE) {
- imessaging_send_handler(msg);
+ /* temporary IDs use an idtree, the rest use a array of pointers */
+ if (msg_type >= MSG_TMP_BASE) {
+ return (struct dispatch_fn *)idr_find(msg->dispatch_tree,
+ msg_type);
}
- if (flags & TEVENT_FD_READ) {
- imessaging_recv_handler(msg);
+ if (msg_type < msg->num_types) {
+ return msg->dispatch[msg_type];
}
+ return NULL;
}
-
/*
Register a dispatch function for a particular message type.
*/
/* possibly expand dispatch array */
if (msg_type >= msg->num_types) {
struct dispatch_fn **dp;
- int i;
+ uint32_t i;
dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
NT_STATUS_HAVE_NO_MEMORY(dp);
msg->dispatch = dp;
struct dispatch_fn *d, *next;
if (msg_type >= msg->num_types) {
- d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
+ d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
msg_type);
if (!d) return;
idr_remove(msg->dispatch_tree, msg_type);
}
/*
- Send a message to a particular server
*/
-NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server,
- uint32_t msg_type, const DATA_BLOB *data)
+int imessaging_cleanup(struct imessaging_context *msg)
{
- struct imessaging_rec *rec;
- NTSTATUS status;
- size_t dlength = data?data->length:0;
-
- rec = talloc(msg, struct imessaging_rec);
- if (rec == NULL) {
- return NT_STATUS_NO_MEMORY;
+ if (!msg) {
+ return 0;
}
+ return 0;
+}
+
+static void imessaging_dgm_recv(struct tevent_context *ev,
+ const uint8_t *buf, size_t buf_len,
+ int *fds, size_t num_fds,
+ void *private_data);
+
+/* Keep a list of imessaging contexts */
+static struct imessaging_context *msg_ctxs;
- rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);
- if (rec->packet.data == NULL) {
- talloc_free(rec);
+/*
+ * A process has terminated, clean-up any names it has registered.
+ */
+NTSTATUS imessaging_process_cleanup(
+ struct imessaging_context *msg_ctx,
+ pid_t pid)
+{
+ struct irpc_name_records *names = NULL;
+ int i = 0;
+ int j = 0;
+ TALLOC_CTX *mem_ctx = talloc_new(NULL);
+
+ if (mem_ctx == NULL) {
+ DBG_ERR("OOM unable to clean up messaging for process (%d)\n",
+ pid);
return NT_STATUS_NO_MEMORY;
}
- rec->retries = 0;
- rec->msg = msg;
- rec->header = (struct imessaging_header *)rec->packet.data;
- /* zero padding */
- ZERO_STRUCTP(rec->header);
- rec->header->version = IMESSAGING_VERSION;
- rec->header->msg_type = msg_type;
- rec->header->from = msg->server_id;
- rec->header->to = server;
- rec->header->length = dlength;
- if (dlength != 0) {
- memcpy(rec->packet.data + sizeof(*rec->header),
- data->data, dlength);
- }
-
- if (!cluster_node_equal(&msg->server_id, &server)) {
- /* the destination is on another node - dispatch via
- the cluster layer */
- status = cluster_message_send(server, &rec->packet);
- talloc_free(rec);
- return status;
+ names = irpc_all_servers(msg_ctx, mem_ctx);
+ if (names == NULL) {
+ TALLOC_FREE(mem_ctx);
+ return NT_STATUS_OK;
+ }
+ for (i = 0; i < names->num_records; i++) {
+ for (j = 0; j < names->names[i]->count; j++) {
+ if (names->names[i]->ids[j].pid == pid) {
+ int ret = server_id_db_prune_name(
+ msg_ctx->names,
+ names->names[i]->name,
+ names->names[i]->ids[j]);
+ if (ret != 0 && ret != ENOENT) {
+ TALLOC_FREE(mem_ctx);
+ return map_nt_error_from_unix_common(
+ ret);
+ }
+ }
+ }
}
+ TALLOC_FREE(mem_ctx);
+ return NT_STATUS_OK;
+}
- rec->path = imessaging_path(msg, server);
- talloc_steal(rec, rec->path);
+static int imessaging_context_destructor(struct imessaging_context *msg)
+{
+ DLIST_REMOVE(msg_ctxs, msg);
+ TALLOC_FREE(msg->msg_dgm_ref);
+ return 0;
+}
- if (msg->pending != NULL) {
- status = STATUS_MORE_ENTRIES;
- } else {
- status = try_send(rec);
- }
+/*
+ * Cleanup messaging dgm contexts on a specific event context.
+ *
+ * We must make sure to unref all messaging_dgm_ref's *before* the
+ * tevent context goes away. Only when the last ref is freed, the
+ * refcounted messaging dgm context will be freed.
+ */
+void imessaging_dgm_unref_ev(struct tevent_context *ev)
+{
+ struct imessaging_context *msg = NULL;
- if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
- if (msg->pending == NULL) {
- TEVENT_FD_WRITEABLE(msg->event.fde);
+ for (msg = msg_ctxs; msg != NULL; msg = msg->next) {
+ if (msg->ev == ev) {
+ TALLOC_FREE(msg->msg_dgm_ref);
}
- DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
- return NT_STATUS_OK;
}
-
- talloc_free(rec);
-
- return status;
}
-/*
- Send a message to a particular server, with the message containing a single pointer
-*/
-NTSTATUS imessaging_send_ptr(struct imessaging_context *msg, struct server_id server,
- uint32_t msg_type, void *ptr)
+static NTSTATUS imessaging_reinit(struct imessaging_context *msg)
{
- DATA_BLOB blob;
+ int ret = -1;
- blob.data = (uint8_t *)&ptr;
- blob.length = sizeof(void *);
+ TALLOC_FREE(msg->msg_dgm_ref);
- return imessaging_send(msg, server, msg_type, &blob);
-}
+ msg->server_id.pid = getpid();
+ msg->msg_dgm_ref = messaging_dgm_ref(msg,
+ msg->ev,
+ &msg->server_id.unique_id,
+ msg->sock_dir,
+ msg->lock_dir,
+ imessaging_dgm_recv,
+ msg,
+ &ret);
+
+ if (msg->msg_dgm_ref == NULL) {
+ DEBUG(2, ("messaging_dgm_ref failed: %s\n",
+ strerror(ret)));
+ return map_nt_error_from_unix_common(ret);
+ }
+
+ server_id_db_reinit(msg->names, msg->server_id);
+ return NT_STATUS_OK;
+}
/*
- remove our messaging socket and database entry
-*/
-int imessaging_cleanup(struct imessaging_context *msg)
+ * Must be called after a fork.
+ */
+NTSTATUS imessaging_reinit_all(void)
{
- if (!msg) {
- return 0;
- }
+ struct imessaging_context *msg = NULL;
- DEBUG(5,("imessaging: cleaning up %s\n", msg->path));
- unlink(msg->path);
- while (msg->names && msg->names[0]) {
- irpc_remove_name(msg, msg->names[0]);
+ for (msg = msg_ctxs; msg != NULL; msg = msg->next) {
+ NTSTATUS status = imessaging_reinit(msg);
+ if (!NT_STATUS_IS_OK(status)) {
+ return status;
+ }
}
- return 0;
+ return NT_STATUS_OK;
}
/*
create the listening socket and setup the dispatcher
-
- use temporary=true when you want a destructor to remove the
- associated messaging socket and database entry on talloc free. Don't
- use this in processes that may fork and a child may talloc free this
- memory
*/
-struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
+static struct imessaging_context *imessaging_init_internal(TALLOC_CTX *mem_ctx,
struct loadparm_context *lp_ctx,
struct server_id server_id,
- struct tevent_context *ev,
- bool auto_remove)
+ struct tevent_context *ev)
{
- struct imessaging_context *msg;
NTSTATUS status;
- struct socket_address *path;
+ struct imessaging_context *msg;
+ bool ok;
+ int ret;
+ const char *lock_dir = NULL;
+ int tdb_flags = TDB_INCOMPATIBLE_HASH | TDB_CLEAR_IF_FIRST;
if (ev == NULL) {
return NULL;
if (msg == NULL) {
return NULL;
}
+ msg->ev = ev;
- /* setup a handler for messages from other cluster nodes, if appropriate */
- status = cluster_message_init(msg, server_id, cluster_message_handler);
- if (!NT_STATUS_IS_OK(status)) {
- talloc_free(msg);
- return NULL;
- }
+ talloc_set_destructor(msg, imessaging_context_destructor);
/* create the messaging directory if needed */
- msg->lp_ctx = talloc_reference(msg, lp_ctx);
- if (!msg->lp_ctx) {
- talloc_free(msg);
- return NULL;
+ lock_dir = lpcfg_lock_directory(lp_ctx);
+ if (lock_dir == NULL) {
+ goto fail;
+ }
+
+ msg->sock_dir = lpcfg_private_path(msg, lp_ctx, "msg.sock");
+ if (msg->sock_dir == NULL) {
+ goto fail;
+ }
+ ok = directory_create_or_exist_strict(msg->sock_dir, geteuid(), 0700);
+ if (!ok) {
+ goto fail;
}
- msg->base_path = lpcfg_imessaging_path(msg, lp_ctx);
+ msg->lock_dir = lpcfg_lock_path(msg, lp_ctx, "msg.lock");
+ if (msg->lock_dir == NULL) {
+ goto fail;
+ }
+ ok = directory_create_or_exist_strict(msg->lock_dir, geteuid(), 0755);
+ if (!ok) {
+ goto fail;
+ }
- mkdir(msg->base_path, 0700);
+ msg->msg_dgm_ref = messaging_dgm_ref(
+ msg, ev, &server_id.unique_id, msg->sock_dir, msg->lock_dir,
+ imessaging_dgm_recv, msg, &ret);
+
+ if (msg->msg_dgm_ref == NULL) {
+ goto fail;
+ }
- msg->path = imessaging_path(msg, server_id);
msg->server_id = server_id;
msg->idr = idr_init(msg);
+ if (msg->idr == NULL) {
+ goto fail;
+ }
+
msg->dispatch_tree = idr_init(msg);
+ if (msg->dispatch_tree == NULL) {
+ goto fail;
+ }
+
msg->start_time = timeval_current();
- status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
+ tdb_flags |= lpcfg_tdb_flags(lp_ctx, 0);
+
+ /*
+ * This context holds a destructor that cleans up any names
+ * registered on this context on talloc_free()
+ */
+ msg->names = server_id_db_init(msg, server_id, lock_dir, 0, tdb_flags);
+ if (msg->names == NULL) {
+ goto fail;
+ }
+
+ status = imessaging_register(msg, NULL, MSG_PING, ping_message);
if (!NT_STATUS_IS_OK(status)) {
- talloc_free(msg);
- return NULL;
+ goto fail;
+ }
+ status = imessaging_register(msg, NULL, MSG_REQ_POOL_USAGE,
+ pool_message);
+ if (!NT_STATUS_IS_OK(status)) {
+ goto fail;
+ }
+ status = imessaging_register(msg, NULL, MSG_IRPC, irpc_handler);
+ if (!NT_STATUS_IS_OK(status)) {
+ goto fail;
+ }
+ status = imessaging_register(msg, NULL, MSG_REQ_RINGBUF_LOG,
+ ringbuf_log_msg);
+ if (!NT_STATUS_IS_OK(status)) {
+ goto fail;
+ }
+ status = IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
+ if (!NT_STATUS_IS_OK(status)) {
+ goto fail;
}
- /* by stealing here we ensure that the socket is cleaned up (and even
- deleted) on exit */
- talloc_steal(msg, msg->sock);
+ DLIST_ADD(msg_ctxs, msg);
- path = socket_address_from_strings(msg, msg->sock->backend_name,
- msg->path, 0);
- if (!path) {
- talloc_free(msg);
- return NULL;
+ return msg;
+fail:
+ talloc_free(msg);
+ return NULL;
+}
+
+struct imessaging_post_state {
+ struct imessaging_context *msg_ctx;
+ struct imessaging_post_state **busy_ref;
+ size_t buf_len;
+ uint8_t buf[];
+};
+
+static int imessaging_post_state_destructor(struct imessaging_post_state *state)
+{
+ if (state->busy_ref != NULL) {
+ *state->busy_ref = NULL;
+ state->busy_ref = NULL;
}
+ return 0;
+}
- status = socket_listen(msg->sock, path, 50, 0);
- if (!NT_STATUS_IS_OK(status)) {
- DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status)));
- talloc_free(msg);
- return NULL;
+static void imessaging_post_handler(struct tevent_context *ev,
+ struct tevent_immediate *ti,
+ void *private_data)
+{
+ struct imessaging_post_state *state = talloc_get_type_abort(
+ private_data, struct imessaging_post_state);
+
+ /*
+ * In usecases like using messaging_client_init() with irpc processing
+ * we may free the imessaging_context during the messaging handler.
+ * imessaging_post_state is a child of imessaging_context and
+ * might be implicitly free'ed before the explicit TALLOC_FREE(state).
+ *
+ * The busy_ref pointer makes sure the destructor clears
+ * the local 'state' variable.
+ */
+
+ SMB_ASSERT(state->busy_ref == NULL);
+ state->busy_ref = &state;
+
+ imessaging_dgm_recv(ev, state->buf, state->buf_len, NULL, 0,
+ state->msg_ctx);
+
+ if (state == NULL) {
+ return;
}
- /* it needs to be non blocking for sends */
- set_blocking(socket_get_fd(msg->sock), false);
+ state->busy_ref = NULL;
+ TALLOC_FREE(state);
+}
+
+static int imessaging_post_self(struct imessaging_context *msg,
+ const uint8_t *buf, size_t buf_len)
+{
+ struct tevent_immediate *ti;
+ struct imessaging_post_state *state;
+
+ state = talloc_size(
+ msg, offsetof(struct imessaging_post_state, buf) + buf_len);
+ if (state == NULL) {
+ return ENOMEM;
+ }
+ talloc_set_name_const(state, "struct imessaging_post_state");
- msg->event.ev = ev;
- msg->event.fde = tevent_add_fd(ev, msg, socket_get_fd(msg->sock),
- TEVENT_FD_READ, imessaging_handler, msg);
- tevent_fd_set_auto_close(msg->event.fde);
+ talloc_set_destructor(state, imessaging_post_state_destructor);
- if (auto_remove) {
- talloc_set_destructor(msg, imessaging_cleanup);
+ ti = tevent_create_immediate(state);
+ if (ti == NULL) {
+ TALLOC_FREE(state);
+ return ENOMEM;
}
-
- imessaging_register(msg, NULL, MSG_PING, ping_message);
- imessaging_register(msg, NULL, MSG_IRPC, irpc_handler);
- IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
- return msg;
+ state->msg_ctx = msg;
+ state->busy_ref = NULL;
+ state->buf_len = buf_len;
+ memcpy(state->buf, buf, buf_len);
+
+ tevent_schedule_immediate(ti, msg->ev, imessaging_post_handler,
+ state);
+
+ return 0;
+}
+
+static void imessaging_dgm_recv(struct tevent_context *ev,
+ const uint8_t *buf, size_t buf_len,
+ int *fds, size_t num_fds,
+ void *private_data)
+{
+ struct imessaging_context *msg = talloc_get_type_abort(
+ private_data, struct imessaging_context);
+ uint32_t msg_type;
+ struct server_id src, dst;
+ struct server_id_buf srcbuf, dstbuf;
+ DATA_BLOB data;
+
+ if (buf_len < MESSAGE_HDR_LENGTH) {
+ /* Invalid message, ignore */
+ return;
+ }
+
+ if (num_fds != 0) {
+ /*
+ * Source4 based messaging does not expect fd's yet
+ */
+ return;
+ }
+
+ if (ev != msg->ev) {
+ int ret;
+ ret = imessaging_post_self(msg, buf, buf_len);
+ if (ret != 0) {
+ DBG_WARNING("imessaging_post_self failed: %s\n",
+ strerror(ret));
+ }
+ return;
+ }
+
+ message_hdr_get(&msg_type, &src, &dst, buf);
+
+ data.data = discard_const_p(uint8_t, buf + MESSAGE_HDR_LENGTH);
+ data.length = buf_len - MESSAGE_HDR_LENGTH;
+
+ if ((cluster_id_equal(&dst, &msg->server_id)) ||
+ ((dst.task_id == 0) && (msg->server_id.pid == 0))) {
+ struct dispatch_fn *d, *next;
+
+ DEBUG(10, ("%s: dst %s matches my id: %s, type=0x%x\n",
+ __func__,
+ server_id_str_buf(dst, &dstbuf),
+ server_id_str_buf(msg->server_id, &srcbuf),
+ (unsigned)msg_type));
+
+ d = imessaging_find_dispatch(msg, msg_type);
+
+ for (; d; d = next) {
+ next = d->next;
+ d->fn(msg, d->private_data, d->msg_type, src, &data);
+ }
+ } else {
+ DEBUG(10, ("%s: Ignoring type=0x%x dst %s, I am %s, \n",
+ __func__, (unsigned)msg_type,
+ server_id_str_buf(dst, &dstbuf),
+ server_id_str_buf(msg->server_id, &srcbuf)));
+ }
+}
+
+struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
+ struct loadparm_context *lp_ctx,
+ struct server_id server_id,
+ struct tevent_context *ev)
+{
+ if (ev == NULL) {
+ return NULL;
+ }
+
+ if (tevent_context_is_wrapper(ev)) {
+ /*
+ * This is really a programmer error!
+ *
+ * The main/raw tevent context should
+ * have been registered first!
+ */
+ DBG_ERR("Should not be used with a wrapper tevent context\n");
+ errno = EINVAL;
+ return NULL;
+ }
+
+ return imessaging_init_internal(mem_ctx, lp_ctx, server_id, ev);
}
-/*
- A hack, for the short term until we get 'client only' messaging in place
+/*
+ A hack, for the short term until we get 'client only' messaging in place
*/
struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx,
struct loadparm_context *lp_ctx,
/* This is because we are not in the s3 serverid database */
id.unique_id = SERVERID_UNIQUE_ID_NOT_TO_VERIFY;
- return imessaging_init(mem_ctx, lp_ctx, id, ev, true);
+ return imessaging_init_internal(mem_ctx, lp_ctx, id, ev);
}
/*
a list of registered irpc server functions
register a irpc server function
*/
NTSTATUS irpc_register(struct imessaging_context *msg_ctx,
- const struct ndr_interface_table *table,
+ const struct ndr_interface_table *table,
int callnum, irpc_function_t fn, void *private_data)
{
struct irpc_list *irpc;
m->msg_ctx = msg_ctx;
m->irpc = i;
m->data = r;
- m->ev = msg_ctx->event.ev;
m->header.status = i->fn(m, r);
return 0;
}
-/*
- open the naming database
-*/
-static struct tdb_wrap *irpc_namedb_open(struct imessaging_context *msg_ctx)
-{
- struct tdb_wrap *t;
- char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path);
- if (path == NULL) {
- return NULL;
- }
- t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660, msg_ctx->lp_ctx);
- talloc_free(path);
- return t;
-}
-
-
/*
add a string name that this irpc server can be called on
-*/
-NTSTATUS irpc_add_name(struct imessaging_context *msg_ctx, const char *name)
-{
- struct tdb_wrap *t;
- TDB_DATA rec;
- int count;
- NTSTATUS status = NT_STATUS_OK;
-
- t = irpc_namedb_open(msg_ctx);
- NT_STATUS_HAVE_NO_MEMORY(t);
-
- if (tdb_lock_bystring(t->tdb, name) != 0) {
- talloc_free(t);
- return NT_STATUS_LOCK_NOT_GRANTED;
- }
- rec = tdb_fetch_bystring(t->tdb, name);
- count = rec.dsize / sizeof(struct server_id);
- rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1);
- rec.dsize += sizeof(struct server_id);
- if (rec.dptr == NULL) {
- tdb_unlock_bystring(t->tdb, name);
- talloc_free(t);
- return NT_STATUS_NO_MEMORY;
- }
- ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id;
- if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) {
- status = NT_STATUS_INTERNAL_ERROR;
- }
- free(rec.dptr);
- tdb_unlock_bystring(t->tdb, name);
- talloc_free(t);
- msg_ctx->names = str_list_add(msg_ctx->names, name);
- talloc_steal(msg_ctx, msg_ctx->names);
-
- return status;
-}
-
-/*
- return a list of server ids for a server name
+ It will be removed from the DB either via irpc_remove_name or on
+ talloc_free(msg_ctx->names).
*/
-struct server_id *irpc_servers_byname(struct imessaging_context *msg_ctx,
- TALLOC_CTX *mem_ctx,
- const char *name)
+NTSTATUS irpc_add_name(struct imessaging_context *msg_ctx, const char *name)
{
- struct tdb_wrap *t;
- TDB_DATA rec;
- int count, i;
- struct server_id *ret;
-
- t = irpc_namedb_open(msg_ctx);
- if (t == NULL) {
- return NULL;
- }
+ int ret;
- if (tdb_lock_bystring(t->tdb, name) != 0) {
- talloc_free(t);
- return NULL;
- }
- rec = tdb_fetch_bystring(t->tdb, name);
- if (rec.dptr == NULL) {
- tdb_unlock_bystring(t->tdb, name);
- talloc_free(t);
- return NULL;
- }
- count = rec.dsize / sizeof(struct server_id);
- ret = talloc_array(mem_ctx, struct server_id, count+1);
- if (ret == NULL) {
- tdb_unlock_bystring(t->tdb, name);
- talloc_free(t);
- return NULL;
+ ret = server_id_db_add(msg_ctx->names, name);
+ if (ret != 0) {
+ return map_nt_error_from_unix_common(ret);
}
- for (i=0;i<count;i++) {
- ret[i] = ((struct server_id *)rec.dptr)[i];
- }
- ret[i] = cluster_id(0, 0);
- free(rec.dptr);
- tdb_unlock_bystring(t->tdb, name);
- talloc_free(t);
-
- return ret;
+ return NT_STATUS_OK;
}
-static int all_servers_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
+static int all_servers_func(const char *name, unsigned num_servers,
+ const struct server_id *servers,
+ void *private_data)
{
- struct irpc_name_records *name_records = talloc_get_type(state, struct irpc_name_records);
+ struct irpc_name_records *name_records = talloc_get_type(
+ private_data, struct irpc_name_records);
struct irpc_name_record *name_record;
- int i;
+ uint32_t i;
name_records->names
= talloc_realloc(name_records, name_records->names,
name_records->num_records++;
- name_record->name
- = talloc_strndup(name_record,
- (const char *)key.dptr, key.dsize);
+ name_record->name = talloc_strdup(name_record, name);
if (!name_record->name) {
return -1;
}
- name_record->count = data.dsize / sizeof(struct server_id);
- name_record->ids = talloc_array(name_record,
- struct server_id,
- name_record->count);
+ name_record->count = num_servers;
+ name_record->ids = talloc_array(name_record, struct server_id,
+ num_servers);
if (name_record->ids == NULL) {
return -1;
}
for (i=0;i<name_record->count;i++) {
- name_record->ids[i] = ((struct server_id *)data.dptr)[i];
+ name_record->ids[i] = servers[i];
}
return 0;
}
struct irpc_name_records *irpc_all_servers(struct imessaging_context *msg_ctx,
TALLOC_CTX *mem_ctx)
{
- struct tdb_wrap *t;
int ret;
struct irpc_name_records *name_records = talloc_zero(mem_ctx, struct irpc_name_records);
if (name_records == NULL) {
return NULL;
}
- t = irpc_namedb_open(msg_ctx);
- if (t == NULL) {
- return NULL;
- }
-
- ret = tdb_traverse_read(t->tdb, all_servers_func, name_records);
+ ret = server_id_db_traverse_read(msg_ctx->names, all_servers_func,
+ name_records);
if (ret == -1) {
- talloc_free(t);
+ TALLOC_FREE(name_records);
return NULL;
}
- talloc_free(t);
-
return name_records;
}
*/
void irpc_remove_name(struct imessaging_context *msg_ctx, const char *name)
{
- struct tdb_wrap *t;
- TDB_DATA rec;
- int count, i;
- struct server_id *ids;
-
- str_list_remove(msg_ctx->names, name);
-
- t = irpc_namedb_open(msg_ctx);
- if (t == NULL) {
- return;
- }
-
- if (tdb_lock_bystring(t->tdb, name) != 0) {
- talloc_free(t);
- return;
- }
- rec = tdb_fetch_bystring(t->tdb, name);
- if (rec.dptr == NULL) {
- tdb_unlock_bystring(t->tdb, name);
- talloc_free(t);
- return;
- }
- count = rec.dsize / sizeof(struct server_id);
- if (count == 0) {
- free(rec.dptr);
- tdb_unlock_bystring(t->tdb, name);
- talloc_free(t);
- return;
- }
- ids = (struct server_id *)rec.dptr;
- for (i=0;i<count;i++) {
- if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) {
- if (i < count-1) {
- memmove(ids+i, ids+i+1,
- sizeof(struct server_id) * (count-(i+1)));
- }
- rec.dsize -= sizeof(struct server_id);
- break;
- }
- }
- tdb_store_bystring(t->tdb, name, rec, 0);
- free(rec.dptr);
- tdb_unlock_bystring(t->tdb, name);
- talloc_free(t);
+ server_id_db_remove(msg_ctx->names, name);
}
struct server_id imessaging_get_server_id(struct imessaging_context *msg_ctx)
/* initialise a irpc binding handle */
struct dcerpc_binding_handle *irpc_binding_handle(TALLOC_CTX *mem_ctx,
- struct imessaging_context *msg_ctx,
- struct server_id server_id,
- const struct ndr_interface_table *table)
+ struct imessaging_context *msg_ctx,
+ struct server_id server_id,
+ const struct ndr_interface_table *table)
{
struct dcerpc_binding_handle *h;
struct irpc_bh_state *hs;
hs->table = table;
hs->timeout = IRPC_CALL_TIMEOUT;
- dcerpc_binding_handle_set_sync_ev(h, msg_ctx->event.ev);
-
return h;
}
struct dcerpc_binding_handle *irpc_binding_handle_by_name(TALLOC_CTX *mem_ctx,
- struct imessaging_context *msg_ctx,
- const char *dest_task,
- const struct ndr_interface_table *table)
+ struct imessaging_context *msg_ctx,
+ const char *dest_task,
+ const struct ndr_interface_table *table)
{
struct dcerpc_binding_handle *h;
+ unsigned num_sids;
struct server_id *sids;
struct server_id sid;
+ NTSTATUS status;
/* find the server task */
- sids = irpc_servers_byname(msg_ctx, mem_ctx, dest_task);
- if (sids == NULL) {
- errno = EADDRNOTAVAIL;
- return NULL;
- }
- if (sids[0].pid == 0) {
- talloc_free(sids);
+
+ status = irpc_servers_byname(msg_ctx, mem_ctx, dest_task,
+ &num_sids, &sids);
+ if (!NT_STATUS_IS_OK(status)) {
errno = EADDRNOTAVAIL;
return NULL;
}