void *private_data;
};
+struct messaging_context {
+ struct server_id id;
+ struct tevent_context *event_ctx;
+ struct messaging_callback *callbacks;
+
+ struct tevent_req **new_waiters;
+ unsigned num_new_waiters;
+
+ struct tevent_req **waiters;
+ unsigned num_waiters;
+
+ struct messaging_backend *local;
+ struct messaging_backend *remote;
+
+ bool *have_context;
+};
+
+static int messaging_context_destructor(struct messaging_context *msg_ctx);
+
/****************************************************************************
A useful function for testing the message system.
****************************************************************************/
struct server_id src,
DATA_BLOB *data)
{
- const char *msg = "none";
- char *free_me = NULL;
+ struct server_id_buf idbuf;
+
+ DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
+ server_id_str_buf(src, &idbuf), (int)data->length,
+ data->data ? (char *)data->data : ""));
- if (data->data != NULL) {
- free_me = talloc_strndup(talloc_tos(), (char *)data->data,
- data->length);
- msg = free_me;
- }
- DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
- procid_str_static(&src), msg));
- TALLOC_FREE(free_me);
messaging_send(msg_ctx, src, MSG_PONG, data);
}
(const uint8_t *)msg_all->buf, msg_all->len);
if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
+ struct server_id_buf idbuf;
/*
* If the pid was not found delete the entry from
* serverid.tdb
*/
- DEBUG(2, ("pid %s doesn't exist\n", procid_str_static(id)));
+ DEBUG(2, ("pid %s doesn't exist\n",
+ server_id_str_buf(*id, &idbuf)));
dbwrap_record_delete(rec);
}
return true;
}
+static void messaging_recv_cb(int msg_type,
+ struct server_id src, struct server_id dst,
+ const uint8_t *msg, size_t msg_len,
+ void *private_data)
+{
+ struct messaging_context *msg_ctx = talloc_get_type_abort(
+ private_data, struct messaging_context);
+ struct messaging_rec rec;
+
+ rec = (struct messaging_rec) {
+ .msg_version = MESSAGE_VERSION,
+ .msg_type = msg_type,
+ .src = src,
+ .dest = dst,
+ .buf.data = discard_const_p(uint8, msg),
+ .buf.length = msg_len
+ };
+
+ messaging_dispatch_rec(msg_ctx, &rec);
+}
+
struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
struct tevent_context *ev)
{
struct messaging_context *ctx;
NTSTATUS status;
+ int ret;
+ static bool have_context = false;
+
+ if (have_context) {
+ DEBUG(0, ("No two messaging contexts per process\n"));
+ return NULL;
+ }
+
if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
return NULL;
ctx->id = procid_self();
ctx->event_ctx = ev;
+ ctx->have_context = &have_context;
- status = messaging_dgm_init(ctx, ctx, &ctx->local);
+ ret = messaging_dgm_init(ctx, ctx->event_ctx, ctx->id,
+ &ctx->local, messaging_recv_cb, ctx);
- if (!NT_STATUS_IS_OK(status)) {
- DEBUG(2, ("messaging_dgm_init failed: %s\n",
- nt_errstr(status)));
+ if (ret != 0) {
+ DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
TALLOC_FREE(ctx);
return NULL;
}
register_dmalloc_msgs(ctx);
debug_register_msgs(ctx);
+ have_context = true;
+ talloc_set_destructor(ctx, messaging_context_destructor);
+
return ctx;
}
+static int messaging_context_destructor(struct messaging_context *msg_ctx)
+{
+ SMB_ASSERT(*msg_ctx->have_context);
+ *msg_ctx->have_context = false;
+ return 0;
+}
+
struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
{
return msg_ctx->id;
NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
{
NTSTATUS status;
+ int ret;
TALLOC_FREE(msg_ctx->local);
msg_ctx->id = procid_self();
- status = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
- if (!NT_STATUS_IS_OK(status)) {
- DEBUG(0, ("messaging_dgm_init failed: %s\n",
- nt_errstr(status)));
- return status;
+ ret = messaging_dgm_init(msg_ctx, msg_ctx->event_ctx,
+ msg_ctx->id, &msg_ctx->local,
+ messaging_recv_cb, msg_ctx);
+ if (ret != 0) {
+ DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
+ return map_nt_error_from_unix(ret);
}
TALLOC_FREE(msg_ctx->remote);
}
}
+static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
+ const struct server_id *dst)
+{
+ return ((msg_ctx->id.vnn == dst->vnn) &&
+ (msg_ctx->id.pid == dst->pid));
+}
+
/*
Send a message to a particular server
*/
struct server_id server, uint32_t msg_type,
const DATA_BLOB *data)
{
- if (server_id_is_disconnected(&server)) {
- return NT_STATUS_INVALID_PARAMETER_MIX;
- }
+ struct iovec iov;
- if (!procid_is_local(&server)) {
- return msg_ctx->remote->send_fn(msg_ctx, server,
- msg_type, data,
- msg_ctx->remote);
- }
+ iov.iov_base = data->data;
+ iov.iov_len = data->length;
- if (server_id_equal(&msg_ctx->id, &server)) {
- struct messaging_rec rec;
- rec.msg_version = MESSAGE_VERSION;
- rec.msg_type = msg_type & MSG_TYPE_MASK;
- rec.dest = server;
- rec.src = msg_ctx->id;
- rec.buf = *data;
- messaging_dispatch_rec(msg_ctx, &rec);
- return NT_STATUS_OK;
- }
-
- return msg_ctx->local->send_fn(msg_ctx, server, msg_type, data,
- msg_ctx->local);
+ return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
}
NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
struct server_id server, uint32_t msg_type,
const struct iovec *iov, int iovlen)
{
- uint8_t *buf;
- NTSTATUS status;
+ int ret;
- buf = iov_buf(talloc_tos(), iov, iovlen);
- if (buf == NULL) {
- return NT_STATUS_NO_MEMORY;
+ if (server_id_is_disconnected(&server)) {
+ return NT_STATUS_INVALID_PARAMETER_MIX;
}
- status = messaging_send_buf(msg_ctx, server, msg_type,
- buf, talloc_get_size(buf));
+ if (!procid_is_local(&server)) {
+ ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
+ msg_type, iov, iovlen,
+ msg_ctx->remote);
+ if (ret != 0) {
+ return map_nt_error_from_unix(ret);
+ }
+ return NT_STATUS_OK;
+ }
+
+ if (messaging_is_self_send(msg_ctx, &server)) {
+ struct messaging_rec rec;
+ uint8_t *buf;
+
+ buf = iov_buf(talloc_tos(), iov, iovlen);
+ if (buf == NULL) {
+ return NT_STATUS_NO_MEMORY;
+ }
+
+ rec.msg_version = MESSAGE_VERSION;
+ rec.msg_type = msg_type & MSG_TYPE_MASK;
+ rec.dest = server;
+ rec.src = msg_ctx->id;
+ rec.buf = data_blob_const(buf, talloc_get_size(buf));
+ messaging_dispatch_rec(msg_ctx, &rec);
+ TALLOC_FREE(buf);
+ return NT_STATUS_OK;
+ }
- TALLOC_FREE(buf);
- return status;
+ ret = msg_ctx->local->send_fn(msg_ctx->id, server, msg_type,
+ iov, iovlen, msg_ctx->local);
+ if (ret != 0) {
+ return map_nt_error_from_unix(ret);
+ }
+ return NT_STATUS_OK;
}
static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
continue;
}
- if (server_id_equal(&msg_ctx->id, &rec->dest)) {
+ if (messaging_is_self_send(msg_ctx, &rec->dest)) {
/*
* This is a self-send. We are called here from
* messaging_send(), and we don't want to directly
* to keep the order of waiters, as
* other code may depend on this.
*/
- if (i < msg_ctx->num_waiters - 1) {
+ if (i < msg_ctx->num_waiters - 1) {
memmove(&msg_ctx->waiters[i],
&msg_ctx->waiters[i+1],
sizeof(struct tevent_req *) *
i += 1;
}
- return;
}
static int mess_parent_dgm_cleanup(void *private_data);
{
struct messaging_context *msg_ctx = talloc_get_type_abort(
private_data, struct messaging_context);
- NTSTATUS status;
+ int ret;
- status = messaging_dgm_wipe(msg_ctx);
- DEBUG(10, ("messaging_dgm_wipe returned %s\n", nt_errstr(status)));
+ ret = messaging_dgm_wipe(msg_ctx);
+ DEBUG(10, ("messaging_dgm_wipe returned %s\n",
+ ret ? strerror(ret) : "ok"));
return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
60*15);
}
tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
}
+struct messaging_backend *messaging_local_backend(
+ struct messaging_context *msg_ctx)
+{
+ return msg_ctx->local;
+}
+
+struct tevent_context *messaging_tevent_context(
+ struct messaging_context *msg_ctx)
+{
+ return msg_ctx->event_ctx;
+}
+
/** @} **/