* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "includes.h"
+#include "replace.h"
+#include <tevent.h>
+#include "lib/util/server_id.h"
+#include "lib/util/data_blob.h"
#include "librpc/gen_ndr/notify.h"
#include "librpc/gen_ndr/messaging.h"
#include "librpc/gen_ndr/server_id.h"
#include "lib/dbwrap/dbwrap.h"
#include "lib/dbwrap/dbwrap_rbt.h"
#include "messages.h"
-#include "proto.h"
#include "tdb.h"
#include "util_tdb.h"
#include "notifyd.h"
#include "lib/util/server_id_db.h"
#include "lib/util/tevent_unix.h"
+#include "lib/util/tevent_ntstatus.h"
#include "ctdbd_conn.h"
#include "ctdb_srvids.h"
-#include "source3/smbd/proto.h"
#include "server_id_db_util.h"
#include "lib/util/iov_buf.h"
#include "messages_util.h"
/*
* Database of everything clients show interest in. Indexed by
* absolute path. The database keys are not 0-terminated
- * because the criticial operation, notifyd_trigger, can walk
+ * to allow the criticial operation, notifyd_trigger, to walk
* the structure from the top without adding intermediate 0s.
* The database records contain an array of
*
* struct notifyd_instance
*
- * to be maintained by parsed by notifyd_entry_parse()
+ * to be maintained and parsed by notifyd_entry_parse()
*/
struct db_context *entries;
* broadcasts its messaging_reclog to every other notifyd in
* the cluster. This is done by making ctdb send a message to
* srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
- * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
+ * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who
* had called register_with_ctdbd this srvid will receive the
* broadcasts.
*
time_t last_broadcast;
};
-static bool notifyd_rec_change(struct messaging_context *msg_ctx,
- struct messaging_rec **prec,
- void *private_data);
-static bool notifyd_trigger(struct messaging_context *msg_ctx,
- struct messaging_rec **prec,
- void *private_data);
-static bool notifyd_get_db(struct messaging_context *msg_ctx,
- struct messaging_rec **prec,
- void *private_data);
-static bool notifyd_got_db(struct messaging_context *msg_ctx,
- struct messaging_rec **prec,
- void *private_data);
+static void notifyd_rec_change(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data);
+static void notifyd_trigger(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data);
+static void notifyd_get_db(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data);
#ifdef CLUSTER_SUPPORT
+static void notifyd_got_db(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data);
static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
struct server_id src,
struct messaging_reclog *log);
return 0;
}
-static void notifyd_handler_done(struct tevent_req *subreq);
-
#ifdef CLUSTER_SUPPORT
static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
static void notifyd_clean_peers_finished(struct tevent_req *subreq);
-static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
+static int notifyd_snoop_broadcast(struct tevent_context *ev,
+ uint32_t src_vnn, uint32_t dst_vnn,
uint64_t dst_srvid,
const uint8_t *msg, size_t msglen,
void *private_data);
sys_notify_watch_fn sys_notify_watch,
struct sys_notify_context *sys_notify_ctx)
{
- struct tevent_req *req, *subreq;
+ struct tevent_req *req;
+#ifdef CLUSTER_SUPPORT
+ struct tevent_req *subreq;
+#endif
struct notifyd_state *state;
struct server_id_db *names_db;
+ NTSTATUS status;
int ret;
req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
return tevent_req_post(req, ev);
}
- subreq = messaging_handler_send(state, ev, msg_ctx,
- MSG_SMB_NOTIFY_REC_CHANGE,
- notifyd_rec_change, state);
- if (tevent_req_nomem(subreq, req)) {
+ status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
+ notifyd_rec_change);
+ if (tevent_req_nterror(req, status)) {
return tevent_req_post(req, ev);
}
- tevent_req_set_callback(subreq, notifyd_handler_done, req);
- subreq = messaging_handler_send(state, ev, msg_ctx,
- MSG_SMB_NOTIFY_TRIGGER,
- notifyd_trigger, state);
- if (tevent_req_nomem(subreq, req)) {
- return tevent_req_post(req, ev);
+ status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER,
+ notifyd_trigger);
+ if (tevent_req_nterror(req, status)) {
+ goto deregister_rec_change;
}
- tevent_req_set_callback(subreq, notifyd_handler_done, req);
- subreq = messaging_handler_send(state, ev, msg_ctx,
- MSG_SMB_NOTIFY_GET_DB,
- notifyd_get_db, state);
- if (tevent_req_nomem(subreq, req)) {
- return tevent_req_post(req, ev);
+ status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB,
+ notifyd_get_db);
+ if (tevent_req_nterror(req, status)) {
+ goto deregister_trigger;
}
- tevent_req_set_callback(subreq, notifyd_handler_done, req);
-
- subreq = messaging_handler_send(state, ev, msg_ctx,
- MSG_SMB_NOTIFY_DB,
- notifyd_got_db, state);
- if (tevent_req_nomem(subreq, req)) {
- return tevent_req_post(req, ev);
- }
- tevent_req_set_callback(subreq, notifyd_handler_done, req);
names_db = messaging_names_db(msg_ctx);
DEBUG(10, ("%s: server_id_db_add failed: %s\n",
__func__, strerror(ret)));
tevent_req_error(req, ret);
- return tevent_req_post(req, ev);
+ goto deregister_get_db;
}
- /* Block those signals that we are not handling */
- BlockSignals(True, SIGHUP);
- BlockSignals(True, SIGUSR1);
-
if (ctdbd_conn == NULL) {
/*
* No cluster around, skip the database replication
}
#ifdef CLUSTER_SUPPORT
+ status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB,
+ notifyd_got_db);
+ if (tevent_req_nterror(req, status)) {
+ goto deregister_get_db;
+ }
+
state->log = talloc_zero(state, struct messaging_reclog);
if (tevent_req_nomem(state->log, req)) {
- return tevent_req_post(req, ev);
+ goto deregister_db;
}
subreq = notifyd_broadcast_reclog_send(
- state->log, ev, ctdbd_conn, messaging_server_id(msg_ctx),
+ state->log, ev, ctdbd_conn,
+ messaging_server_id(msg_ctx),
state->log);
if (tevent_req_nomem(subreq, req)) {
- return tevent_req_post(req, ev);
+ goto deregister_db;
}
- tevent_req_set_callback(subreq, notifyd_broadcast_reclog_finished,
+ tevent_req_set_callback(subreq,
+ notifyd_broadcast_reclog_finished,
req);
subreq = notifyd_clean_peers_send(state, ev, state);
if (tevent_req_nomem(subreq, req)) {
- return tevent_req_post(req, ev);
+ goto deregister_db;
}
tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
req);
- ret = register_with_ctdbd(ctdbd_conn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
+ ret = register_with_ctdbd(ctdbd_conn,
+ CTDB_SRVID_SAMBA_NOTIFY_PROXY,
notifyd_snoop_broadcast, state);
if (ret != 0) {
tevent_req_error(req, ret);
- return tevent_req_post(req, ev);
+ goto deregister_db;
}
#endif
return req;
-}
-static void notifyd_handler_done(struct tevent_req *subreq)
-{
- struct tevent_req *req = tevent_req_callback_data(
- subreq, struct tevent_req);
- int ret;
-
- ret = messaging_handler_recv(subreq);
- TALLOC_FREE(subreq);
- tevent_req_error(req, ret);
+#ifdef CLUSTER_SUPPORT
+deregister_db:
+ messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state);
+#endif
+deregister_get_db:
+ messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state);
+deregister_trigger:
+ messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state);
+deregister_rec_change:
+ messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
+ return tevent_req_post(req, ev);
}
#ifdef CLUSTER_SUPPORT
notifyd_sys_callback, msg_ctx,
&instance->sys_watch);
if (ret != 0) {
- DEBUG(1, ("%s: inotify_watch returned %s\n",
- __func__, strerror(errno)));
+ DBG_WARNING("sys_notify_watch for [%s] returned %s\n",
+ path, strerror(errno));
}
}
return true;
}
-static bool notifyd_rec_change(struct messaging_context *msg_ctx,
- struct messaging_rec **prec,
- void *private_data)
+static void notifyd_rec_change(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data)
{
struct notifyd_state *state = talloc_get_type_abort(
private_data, struct notifyd_state);
struct server_id_buf idbuf;
- struct messaging_rec *rec = *prec;
struct notify_rec_change_msg *msg;
size_t pathlen;
bool ok;
- DEBUG(10, ("%s: Got %d bytes from %s\n", __func__,
- (unsigned)rec->buf.length,
- server_id_str_buf(rec->src, &idbuf)));
+ DBG_DEBUG("Got %zu bytes from %s\n", data->length,
+ server_id_str_buf(src, &idbuf));
- ok = notifyd_parse_rec_change(rec->buf.data, rec->buf.length,
+ ok = notifyd_parse_rec_change(data->data, data->length,
&msg, &pathlen);
if (!ok) {
- return true;
+ return;
}
ok = notifyd_apply_rec_change(
- &rec->src, msg->path, pathlen, &msg->instance,
+ &src, msg->path, pathlen, &msg->instance,
state->entries, state->sys_notify_watch, state->sys_notify_ctx,
state->msg_ctx);
if (!ok) {
DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
__func__));
- return true;
+ return;
}
if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
- return true;
+ return;
}
#ifdef CLUSTER_SUPPORT
struct messaging_rec **tmp;
struct messaging_reclog *log;
+ struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
log = state->log;
log->num_recs+1);
if (tmp == NULL) {
DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
- return true;
+ return;
}
log->recs = tmp;
- log->recs[log->num_recs] = talloc_move(log->recs, prec);
+ log->recs[log->num_recs] = messaging_rec_create(
+ log->recs, src, messaging_server_id(msg_ctx),
+ msg_type, &iov, 1, NULL, 0);
+
+ if (log->recs[log->num_recs] == NULL) {
+ DBG_WARNING("messaging_rec_create failed, ignoring\n");
+ return;
+ }
+
log->num_recs += 1;
if (log->num_recs >= 100) {
}
#endif
-
- return true;
}
struct notifyd_trigger_state {
static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
void *private_data);
-static bool notifyd_trigger(struct messaging_context *msg_ctx,
- struct messaging_rec **prec,
- void *private_data)
+static void notifyd_trigger(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data)
{
struct notifyd_state *state = talloc_get_type_abort(
private_data, struct notifyd_state);
struct server_id my_id = messaging_server_id(msg_ctx);
- struct messaging_rec *rec = *prec;
struct notifyd_trigger_state tstate;
const char *path;
const char *p, *next_p;
- if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) {
- DEBUG(1, ("message too short, ignoring: %u\n",
- (unsigned)rec->buf.length));
- return true;
+ if (data->length < offsetof(struct notify_trigger_msg, path) + 1) {
+ DBG_WARNING("message too short, ignoring: %zu\n",
+ data->length);
+ return;
}
- if (rec->buf.data[rec->buf.length-1] != 0) {
+ if (data->data[data->length-1] != 0) {
DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
- return true;
+ return;
}
tstate.msg_ctx = msg_ctx;
- tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn);
- tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id);
+ tstate.covered_by_sys_notify = (src.vnn == my_id.vnn);
+ tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id);
- tstate.msg = (struct notify_trigger_msg *)rec->buf.data;
+ tstate.msg = (struct notify_trigger_msg *)data->data;
path = tstate.msg->path;
DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
if (path[0] != '/') {
DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
__func__, path));
- return true;
+ return;
}
for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
continue;
}
- if (rec->src.vnn != my_id.vnn) {
+ if (src.vnn != my_id.vnn) {
continue;
}
notifyd_trigger_parser, &tstate);
}
}
-
- return true;
}
static void notifyd_send_delete(struct messaging_context *msg_ctx,
{
struct notifyd_trigger_state *tstate = private_data;
- struct notify_event_msg msg = { .action = tstate->msg->action };
+ struct notify_event_msg msg = { .action = tstate->msg->action,
+ .when = tstate->msg->when };
struct iovec iov[2];
size_t path_len = key.dsize;
struct notifyd_instance *instances = NULL;
}
}
-static bool notifyd_get_db(struct messaging_context *msg_ctx,
- struct messaging_rec **prec,
- void *private_data)
+static void notifyd_get_db(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data)
{
struct notifyd_state *state = talloc_get_type_abort(
private_data, struct notifyd_state);
- struct messaging_rec *rec = *prec;
struct server_id_buf id1, id2;
NTSTATUS status;
uint64_t rec_index = UINT64_MAX;
dbsize = dbwrap_marshall(state->entries, NULL, 0);
- buf = talloc_array(rec, uint8_t, dbsize);
+ buf = talloc_array(talloc_tos(), uint8_t, dbsize);
if (buf == NULL) {
DEBUG(1, ("%s: talloc_array(%ju) failed\n",
__func__, (uintmax_t)dbsize));
- return true;
+ return;
}
dbsize = dbwrap_marshall(state->entries, buf, dbsize);
(uintmax_t)talloc_get_size(buf),
(uintmax_t)dbsize));
TALLOC_FREE(buf);
- return true;
+ return;
}
if (state->log != NULL) {
DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
(uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
server_id_str_buf(messaging_server_id(msg_ctx), &id1),
- server_id_str_buf(rec->src, &id2)));
+ server_id_str_buf(src, &id2)));
- status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
+ status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB,
iov, ARRAY_SIZE(iov), NULL, 0);
TALLOC_FREE(buf);
if (!NT_STATUS_IS_OK(status)) {
DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
__func__, nt_errstr(status)));
}
-
- return true;
}
+#ifdef CLUSTER_SUPPORT
+
static int notifyd_add_proxy_syswatches(struct db_record *rec,
void *private_data);
-static bool notifyd_got_db(struct messaging_context *msg_ctx,
- struct messaging_rec **prec,
- void *private_data)
+static void notifyd_got_db(struct messaging_context *msg_ctx,
+ void *private_data, uint32_t msg_type,
+ struct server_id src, DATA_BLOB *data)
{
struct notifyd_state *state = talloc_get_type_abort(
private_data, struct notifyd_state);
- struct messaging_rec *rec = *prec;
struct notifyd_peer *p = NULL;
struct server_id_buf idbuf;
NTSTATUS status;
size_t i;
for (i=0; i<state->num_peers; i++) {
- if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
+ if (server_id_equal(&src, &state->peers[i]->pid)) {
p = state->peers[i];
break;
}
}
if (p == NULL) {
- DEBUG(10, ("%s: Did not find peer for db from %s\n",
- __func__, server_id_str_buf(rec->src, &idbuf)));
- return true;
+ DBG_DEBUG("Did not find peer for db from %s\n",
+ server_id_str_buf(src, &idbuf));
+ return;
}
- if (rec->buf.length < 8) {
- DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
- (unsigned)rec->buf.length,
- server_id_str_buf(rec->src, &idbuf)));
+ if (data->length < 8) {
+ DBG_DEBUG("Got short db length %zu from %s\n", data->length,
+ server_id_str_buf(src, &idbuf));
TALLOC_FREE(p);
- return true;
+ return;
}
- p->rec_index = BVAL(rec->buf.data, 0);
+ p->rec_index = BVAL(data->data, 0);
p->db = db_open_rbt(p);
if (p->db == NULL) {
DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
TALLOC_FREE(p);
- return true;
+ return;
}
- status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
- rec->buf.length - 8);
+ status = dbwrap_unmarshall(p->db, data->data + 8,
+ data->length - 8);
if (!NT_STATUS_IS_OK(status)) {
DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
__func__, nt_errstr(status),
- server_id_str_buf(rec->src, &idbuf)));
+ server_id_str_buf(src, &idbuf)));
TALLOC_FREE(p);
- return true;
+ return;
}
dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
&count);
DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
- server_id_str_buf(rec->src, &idbuf), count));
-
- return true;
+ server_id_str_buf(src, &idbuf), count));
}
-#ifdef CLUSTER_SUPPORT
-
static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
struct server_id src,
struct messaging_reclog *log)
.iov_len = blob.length };
ret = ctdbd_messaging_send_iov(
- ctdbd_conn, CTDB_BROADCAST_VNNMAP,
+ ctdbd_conn, CTDB_BROADCAST_CONNECTED,
CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
TALLOC_FREE(blob.data);
if (ret != 0) {
return tevent_req_simple_recv_unix(req);
}
-#endif
-
static int notifyd_add_proxy_syswatches(struct db_record *rec,
void *private_data)
{
uint32_t subdir_filter = instance->instance.subdir_filter;
int ret;
+ /*
+ * This is a remote database. Pointers that we were
+ * given don't make sense locally. Initialize to NULL
+ * in case sys_notify_watch fails.
+ */
+ instances[i].sys_watch = NULL;
+
ret = state->sys_notify_watch(
db, state->sys_notify_ctx, path,
&filter, &subdir_filter,
return 0;
}
-#ifdef CLUSTER_SUPPORT
-
static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
{
TDB_DATA key = dbwrap_record_get_key(rec);
* broadcast, which will then trigger a fresh database pull.
*/
-static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
+static int notifyd_snoop_broadcast(struct tevent_context *ev,
+ uint32_t src_vnn, uint32_t dst_vnn,
uint64_t dst_srvid,
const uint8_t *msg, size_t msglen,
void *private_data)