#include "lib/util/tevent_unix.h"
#include "lib/background.h"
#include "lib/messages_dgm.h"
-#include "lib/messages_ctdbd.h"
#include "lib/util/iov_buf.h"
#include "lib/util/server_id_db.h"
#include "lib/messages_dgm_ref.h"
+#include "lib/messages_ctdb.h"
+#include "lib/messages_ctdb_ref.h"
#include "lib/messages_util.h"
+#include "cluster_support.h"
+#include "ctdbd_conn.h"
+#include "ctdb_srvids.h"
+
+#ifdef CLUSTER_SUPPORT
+#include "ctdb_protocol.h"
+#endif
struct messaging_callback {
struct messaging_callback *prev, *next;
size_t num_waiters;
void *msg_dgm_ref;
- struct messaging_backend *remote;
+ void *msg_ctdb_ref;
struct server_id_db *names_db;
};
for (i=0; i<num_event_contexts; i++) {
struct messaging_registered_ev *reg = &ctx->event_contexts[i];
- if (reg->ev == ev) {
- reg->refcount += 1;
- return true;
- }
if (reg->refcount == 0) {
if (reg->ev != NULL) {
abort();
}
free_reg = reg;
+ /*
+ * We continue here and may find another
+ * free_req, but the important thing is
+ * that we continue to search for an
+ * existing registration in the loop.
+ */
+ continue;
+ }
+
+ if (reg->ev == ev) {
+ reg->refcount += 1;
+ return true;
}
}
if (free_reg == NULL) {
+ struct tevent_immediate *im = NULL;
+
+ im = tevent_create_immediate(ctx);
+ if (im == NULL) {
+ return false;
+ }
+
tmp = talloc_realloc(ctx, ctx->event_contexts,
struct messaging_registered_ev,
num_event_contexts+1);
ctx->event_contexts = tmp;
free_reg = &ctx->event_contexts[num_event_contexts];
+ free_reg->im = talloc_move(ctx->event_contexts, &im);
}
- *free_reg = (struct messaging_registered_ev) { .ev = ev, .refcount = 1 };
+ /*
+ * free_reg->im might be cached
+ */
+ free_reg->ev = ev;
+ free_reg->refcount = 1;
return true;
}
for (i=0; i<num_event_contexts; i++) {
struct messaging_registered_ev *reg = &ctx->event_contexts[i];
+ if (reg->refcount == 0) {
+ continue;
+ }
+
if (reg->ev == ev) {
- if (reg->refcount == 0) {
- return false;
- }
reg->refcount -= 1;
if (reg->refcount == 0) {
+ /*
+ * The primary event context
+ * is never unregistered using
+ * messaging_deregister_event_context()
+ * it's only registered using
+ * messaging_register_event_context().
+ */
+ SMB_ASSERT(ev != ctx->event_ctx);
+ SMB_ASSERT(reg->ev != ctx->event_ctx);
+
/*
* Not strictly necessary, just
* paranoia
/*
* Do not talloc_free(reg->im),
* recycle immediates events.
+ *
+ * We just invalidate it using
+ * the primary event context,
+ * which is never unregistered.
*/
+ tevent_schedule_immediate(reg->im,
+ ctx->event_ctx,
+ NULL, NULL);
}
return true;
}
continue;
}
- if (reg->im == NULL) {
- reg->im = tevent_create_immediate(
- ctx->event_contexts);
- }
- if (reg->im == NULL) {
- DBG_WARNING("Could not create immediate\n");
- continue;
- }
-
/*
* We depend on schedule_immediate to work
* multiple times. Might be a bit inefficient,
* alternatively would be to track whether the
* immediate has already been scheduled. For
* now, avoid that complexity here.
+ *
+ * reg->ev and ctx->event_ctx can't
+ * be wrapper tevent_context pointers
+ * so we don't need to use
+ * tevent_context_same_loop().
*/
if (reg->ev == ctx->event_ctx) {
(unsigned)rec.msg_type, rec.buf.length, num_fds,
server_id_str_buf(rec.src, &idbuf));
+ if (server_id_same_process(&rec.src, &msg_ctx->id)) {
+ DBG_DEBUG("Ignoring self-send\n");
+ goto close_fail;
+ }
+
messaging_dispatch_rec(msg_ctx, ev, &rec);
return;
const char *priv_path;
bool ok;
- lck_path = lock_path("msg.lock");
+ /*
+ * sec_init() *must* be called before any other
+ * functions that use sec_XXX(). e.g. sec_initial_uid().
+ */
+
+ sec_init();
+
+ if (tevent_context_is_wrapper(ev)) {
+ /* This is really a programmer error! */
+ DBG_ERR("Should not be used with a wrapper tevent context\n");
+ return NT_STATUS_INVALID_PARAMETER;
+ }
+
+ lck_path = lock_path(talloc_tos(), "msg.lock");
if (lck_path == NULL) {
return NT_STATUS_NO_MEMORY;
}
goto done;
}
- sec_init();
-
ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
ctx->event_ctx,
&ctx->id.unique_id,
}
talloc_set_destructor(ctx, messaging_context_destructor);
+#ifdef CLUSTER_SUPPORT
if (lp_clustering()) {
- ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
-
- if (ret != 0) {
- DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
- strerror(ret)));
+ ctx->msg_ctdb_ref = messaging_ctdb_ref(
+ ctx, ctx->event_ctx,
+ lp_ctdbd_socket(), lp_ctdb_timeout(),
+ ctx->id.unique_id, messaging_recv_cb, ctx, &ret);
+ if (ctx->msg_ctdb_ref == NULL) {
+ DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
+ strerror(ret));
status = map_nt_error_from_unix(ret);
goto done;
}
}
+#endif
+
ctx->id.vnn = get_my_vnn();
ctx->names_db = server_id_db_init(ctx,
return ctx;
}
-NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- struct messaging_context **pmsg_ctx)
-{
- return messaging_init_internal(mem_ctx,
- ev,
- pmsg_ctx);
-}
-
struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
{
return msg_ctx->id;
char *lck_path;
TALLOC_FREE(msg_ctx->msg_dgm_ref);
+ TALLOC_FREE(msg_ctx->msg_ctdb_ref);
msg_ctx->id = (struct server_id) {
.pid = getpid(), .vnn = msg_ctx->id.vnn
};
- lck_path = lock_path("msg.lock");
+ lck_path = lock_path(talloc_tos(), "msg.lock");
if (lck_path == NULL) {
return NT_STATUS_NO_MEMORY;
}
}
if (lp_clustering()) {
- ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
- msg_ctx->remote);
-
- if (ret != 0) {
- DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
- strerror(ret)));
+ msg_ctx->msg_ctdb_ref = messaging_ctdb_ref(
+ msg_ctx, msg_ctx->event_ctx,
+ lp_ctdbd_socket(), lp_ctdb_timeout(),
+ msg_ctx->id.unique_id, messaging_recv_cb, msg_ctx,
+ &ret);
+ if (msg_ctx->msg_ctdb_ref == NULL) {
+ DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
+ strerror(ret));
return map_nt_error_from_unix(ret);
}
}
return EINVAL;
}
- if (dst.vnn != msg_ctx->id.vnn) {
- if (num_fds > 0) {
- return ENOSYS;
- }
-
- ret = msg_ctx->remote->send_fn(src, dst,
- msg_type, iov, iovlen,
- NULL, 0,
- msg_ctx->remote);
- return ret;
- }
-
if (server_id_equal(&dst, &msg_ctx->id)) {
ret = messaging_post_self(msg_ctx, src, dst, msg_type,
iov, iovlen, fds, num_fds);
iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
+ if (dst.vnn != msg_ctx->id.vnn) {
+ if (num_fds > 0) {
+ return ENOSYS;
+ }
+
+ ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
+ return ret;
+ }
+
ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
if (ret == EACCES) {
return NT_STATUS_OK;
}
+struct send_all_state {
+ struct messaging_context *msg_ctx;
+ int msg_type;
+ const void *buf;
+ size_t len;
+};
+
+static int send_all_fn(pid_t pid, void *private_data)
+{
+ struct send_all_state *state = private_data;
+ NTSTATUS status;
+
+ if (pid == getpid()) {
+ DBG_DEBUG("Skip ourselves in messaging_send_all\n");
+ return 0;
+ }
+
+ status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
+ state->msg_type, state->buf, state->len);
+ if (!NT_STATUS_IS_OK(status)) {
+ DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
+ (uintmax_t)pid, nt_errstr(status));
+ }
+
+ return 0;
+}
+
+void messaging_send_all(struct messaging_context *msg_ctx,
+ int msg_type, const void *buf, size_t len)
+{
+ struct send_all_state state = {
+ .msg_ctx = msg_ctx, .msg_type = msg_type,
+ .buf = buf, .len = len
+ };
+ int ret;
+
+#ifdef CLUSTER_SUPPORT
+ if (lp_clustering()) {
+ struct ctdbd_connection *conn = messaging_ctdb_connection();
+ uint8_t msghdr[MESSAGE_HDR_LENGTH];
+ struct iovec iov[] = {
+ { .iov_base = msghdr,
+ .iov_len = sizeof(msghdr) },
+ { .iov_base = discard_const_p(void, buf),
+ .iov_len = len }
+ };
+
+ message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
+ (struct server_id) {0});
+
+ ret = ctdbd_messaging_send_iov(
+ conn, CTDB_BROADCAST_CONNECTED,
+ CTDB_SRVID_SAMBA_PROCESS,
+ iov, ARRAY_SIZE(iov));
+ if (ret != 0) {
+ DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
+ strerror(ret));
+ }
+
+ return;
+ }
+#endif
+
+ ret = messaging_dgm_forall(send_all_fn, &state);
+ if (ret != 0) {
+ DBG_WARNING("messaging_dgm_forall failed: %s\n",
+ strerror(ret));
+ }
+}
+
static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
struct messaging_rec *rec)
{
struct tevent_context *ev;
struct messaging_context *msg_ctx;
struct messaging_dgm_fde *fde;
+ struct messaging_ctdb_fde *cluster_fde;
bool (*filter)(struct messaging_rec *rec, void *private_data);
void *private_data;
state->filter = filter;
state->private_data = private_data;
+ if (tevent_context_is_wrapper(ev)) {
+ /* This is really a programmer error! */
+ DBG_ERR("Wrapper tevent context doesn't use main context.\n");
+ tevent_req_error(req, EINVAL);
+ return tevent_req_post(req, ev);
+ }
+
/*
* We have to defer the callback here, as we might be called from
* within a different tevent_context than state->ev
return tevent_req_post(req, ev);
}
+ if (lp_clustering()) {
+ state->cluster_fde =
+ messaging_ctdb_register_tevent_context(state, ev);
+ if (tevent_req_nomem(state->cluster_fde, req)) {
+ return tevent_req_post(req, ev);
+ }
+ }
+
/*
* We add ourselves to the "new_waiters" array, not the "waiters"
* array. If we are called from within messaging_read_done,
tevent_req_set_cleanup_fn(req, NULL);
TALLOC_FREE(state->fde);
+ TALLOC_FREE(state->cluster_fde);
ok = messaging_deregister_event_context(msg_ctx, state->ev);
if (!ok) {
bool consumed;
size_t i;
+ /*
+ * ev and msg_ctx->event_ctx can't be wrapper tevent_context pointers
+ * so we don't need to use tevent_context_same_loop().
+ */
+
if (ev == msg_ctx->event_ctx) {
consumed = messaging_dispatch_classic(msg_ctx, rec);
if (consumed) {
60*15),
mess_parent_dgm_cleanup, msg);
if (req == NULL) {
+ DBG_WARNING("background_job_send failed\n");
return false;
}
tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);