smbd: Add the notify daemon
authorVolker Lendecke <vl@samba.org>
Fri, 21 Nov 2014 15:52:47 +0000 (16:52 +0100)
committerJeremy Allison <jra@samba.org>
Tue, 7 Jul 2015 21:51:24 +0000 (23:51 +0200)
This adds the notify daemon listening on MSG_SMB_NOTIFY_REC_CHANGE
and MSG_SMB_NOTIFY_TRIGGER messages. It relies on ctdbd to distribute
the notify database and events in a cluster.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
librpc/idl/messaging.idl
source3/smbd/notifyd/notifyd.c [new file with mode: 0644]
source3/smbd/notifyd/notifyd.h [new file with mode: 0644]
source3/smbd/notifyd/tests.c [new file with mode: 0644]
source3/smbd/notifyd/wscript_build [new file with mode: 0644]
source3/wscript_build

index 2b902ec0a1dcd9ade21a1b8ca91866d026c127ba..ca99f414a1ee3c956b6e50b30490be2b1a45b91a 100644 (file)
@@ -99,6 +99,13 @@ interface messaging
                /* Cancel a notify, directory got deleted */
                MSG_SMB_NOTIFY_CANCEL_DELETED   = 0x0319,
 
+               /* notifyd messages */
+               MSG_SMB_NOTIFY_REC_CHANGE       = 0x031A,
+               MSG_SMB_NOTIFY_TRIGGER          = 0x031B,
+               MSG_SMB_NOTIFY_GET_DB           = 0x031C,
+               MSG_SMB_NOTIFY_DB               = 0x031D,
+               MSG_SMB_NOTIFY_REC_CHANGES      = 0x031E,
+
                /* winbind messages */
                MSG_WINBIND_FINISHED            = 0x0401,
                MSG_WINBIND_FORGET_STATE        = 0x0402,
@@ -152,4 +159,10 @@ interface messaging
                uint8 num_fds;
                dlong fds[num_fds];
        } messaging_rec;
+
+       typedef [public] struct {
+               hyper rec_index;
+               uint32 num_recs;
+               messaging_rec *recs[num_recs];
+       } messaging_reclog;
 }
diff --git a/source3/smbd/notifyd/notifyd.c b/source3/smbd/notifyd/notifyd.c
new file mode 100644 (file)
index 0000000..0f31a61
--- /dev/null
@@ -0,0 +1,1419 @@
+/*
+ * Unix SMB/CIFS implementation.
+ *
+ * Copyright (C) Volker Lendecke 2014
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "includes.h"
+#include "librpc/gen_ndr/notify.h"
+#include "librpc/gen_ndr/messaging.h"
+#include "librpc/gen_ndr/server_id.h"
+#include "lib/dbwrap/dbwrap.h"
+#include "lib/dbwrap/dbwrap_rbt.h"
+#include "messages.h"
+#include "proto.h"
+#include "tdb.h"
+#include "util_tdb.h"
+#include "notifyd.h"
+#include "lib/util/server_id_db.h"
+#include "lib/util/tevent_unix.h"
+#include "ctdbd_conn.h"
+#include "ctdb_srvids.h"
+#include "source3/smbd/proto.h"
+#include "ctdb/include/ctdb_protocol.h"
+#include "server_id_db_util.h"
+#include "lib/util/iov_buf.h"
+#include "messages_util.h"
+
+struct notifyd_peer;
+
+/*
+ * All of notifyd's state
+ */
+
+struct notifyd_state {
+       struct tevent_context *ev;
+       struct messaging_context *msg_ctx;
+       struct ctdbd_connection *ctdbd_conn;
+
+       /*
+        * Database of everything clients show interest in. Indexed by
+        * absolute path. The database keys are not 0-terminated
+        * because the criticial operation, notifyd_trigger, can walk
+        * the structure from the top without adding intermediate 0s.
+        * The database records contain an array of
+        *
+        * struct notifyd_instance
+        *
+        * to be maintained by parsed by notifyd_entry_parse()
+        */
+       struct db_context *entries;
+
+       /*
+        * In the cluster case, this is the place where we store a log
+        * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
+        * forward them to our peer notifyd's in the cluster once a
+        * second or when the log grows too large.
+        */
+
+       struct messaging_reclog *log;
+
+       /*
+        * Array of companion notifyd's in a cluster. Every notifyd
+        * broadcasts its messaging_reclog to every other notifyd in
+        * the cluster. This is done by making ctdb send a message to
+        * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
+        * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
+        * had called register_with_ctdbd this srvid will receive the
+        * broadcasts.
+        *
+        * Database replication happens via these broadcasts. Also,
+        * they serve as liveness indication. If a notifyd receives a
+        * broadcast from an unknown peer, it will create one for this
+        * srvid. Also when we don't hear anything from a peer for a
+        * while, we will discard it.
+        */
+
+       struct notifyd_peer **peers;
+       size_t num_peers;
+
+       sys_notify_watch_fn sys_notify_watch;
+       struct sys_notify_context *sys_notify_ctx;
+};
+
+/*
+ * notifyd's representation of a notify instance
+ */
+struct notifyd_instance {
+       struct server_id client;
+       struct notify_instance instance;
+
+       void *sys_watch; /* inotify/fam/etc handle */
+
+       /*
+        * Filters after sys_watch took responsibility of some bits
+        */
+       uint32_t internal_filter;
+       uint32_t internal_subdir_filter;
+};
+
+struct notifyd_peer {
+       struct notifyd_state *state;
+       struct server_id pid;
+       uint64_t rec_index;
+       struct db_context *db;
+       time_t last_broadcast;
+};
+
+static bool notifyd_rec_change(struct messaging_context *msg_ctx,
+                              struct messaging_rec **prec,
+                              void *private_data);
+static bool notifyd_trigger(struct messaging_context *msg_ctx,
+                           struct messaging_rec **prec,
+                           void *private_data);
+static bool notifyd_get_db(struct messaging_context *msg_ctx,
+                          struct messaging_rec **prec,
+                          void *private_data);
+static bool notifyd_got_db(struct messaging_context *msg_ctx,
+                          struct messaging_rec **prec,
+                          void *private_data);
+static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
+                                    struct server_id src,
+                                    struct messaging_reclog *log);
+static void notifyd_sys_callback(struct sys_notify_context *ctx,
+                                void *private_data, struct notify_event *ev);
+
+static struct tevent_req *notifyd_broadcast_reclog_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       struct ctdbd_connection *ctdbd_conn, struct server_id src,
+       struct messaging_reclog *log);
+static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
+
+static struct tevent_req *notifyd_clean_peers_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       struct notifyd_state *notifyd);
+static int notifyd_clean_peers_recv(struct tevent_req *req);
+
+static int sys_notify_watch_dummy(
+       TALLOC_CTX *mem_ctx,
+       struct sys_notify_context *ctx,
+       const char *path,
+       uint32_t *filter,
+       uint32_t *subdir_filter,
+       void (*callback)(struct sys_notify_context *ctx,
+                        void *private_data,
+                        struct notify_event *ev),
+       void *private_data,
+       void *handle_p)
+{
+       void **handle = handle_p;
+       *handle = NULL;
+       return 0;
+}
+
+static void notifyd_handler_done(struct tevent_req *subreq);
+static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
+static void notifyd_clean_peers_finished(struct tevent_req *subreq);
+static void notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
+                                   uint64_t dst_srvid,
+                                   const uint8_t *msg, size_t msglen,
+                                   void *private_data);
+
+struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+                               struct messaging_context *msg_ctx,
+                               struct ctdbd_connection *ctdbd_conn,
+                               sys_notify_watch_fn sys_notify_watch,
+                               struct sys_notify_context *sys_notify_ctx)
+{
+       struct tevent_req *req, *subreq;
+       struct notifyd_state *state;
+       struct server_id_db *names_db;
+       NTSTATUS status;
+       int ret;
+
+       req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+       state->msg_ctx = msg_ctx;
+       state->ctdbd_conn = ctdbd_conn;
+
+       if (sys_notify_watch == NULL) {
+               sys_notify_watch = sys_notify_watch_dummy;
+       }
+
+       state->sys_notify_watch = sys_notify_watch;
+       state->sys_notify_ctx = sys_notify_ctx;
+
+       state->entries = db_open_rbt(state);
+       if (tevent_req_nomem(state->entries, req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       subreq = messaging_handler_send(state, ev, msg_ctx,
+                                       MSG_SMB_NOTIFY_REC_CHANGE,
+                                       notifyd_rec_change, state);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, notifyd_handler_done, req);
+
+       subreq = messaging_handler_send(state, ev, msg_ctx,
+                                       MSG_SMB_NOTIFY_TRIGGER,
+                                       notifyd_trigger, state);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, notifyd_handler_done, req);
+
+       subreq = messaging_handler_send(state, ev, msg_ctx,
+                                       MSG_SMB_NOTIFY_GET_DB,
+                                       notifyd_get_db, state);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, notifyd_handler_done, req);
+
+       subreq = messaging_handler_send(state, ev, msg_ctx,
+                                       MSG_SMB_NOTIFY_DB,
+                                       notifyd_got_db, state);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, notifyd_handler_done, req);
+
+       names_db = messaging_names_db(msg_ctx);
+
+       ret = server_id_db_set_exclusive(names_db, "notify-daemon");
+       if (ret != 0) {
+               DEBUG(10, ("%s: server_id_db_add failed: %s\n",
+                          __func__, strerror(ret)));
+               tevent_req_error(req, ret);
+               return tevent_req_post(req, ev);
+       }
+
+       if (ctdbd_conn == NULL) {
+               /*
+                * No cluster around, skip the database replication
+                * engine
+                */
+               return req;
+       }
+
+       state->log = talloc_zero(state, struct messaging_reclog);
+       if (tevent_req_nomem(state->log, req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       subreq = notifyd_broadcast_reclog_send(
+               state->log, ev, ctdbd_conn, messaging_server_id(msg_ctx),
+               state->log);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, notifyd_broadcast_reclog_finished,
+                               req);
+
+       subreq = notifyd_clean_peers_send(state, ev, state);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
+                               req);
+
+       status = register_with_ctdbd(ctdbd_conn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
+                                    notifyd_snoop_broadcast, state);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_error(req, map_errno_from_nt_status(status));
+               return tevent_req_post(req, ev);
+       }
+
+       return req;
+}
+
+static void notifyd_handler_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       int ret;
+
+       ret = messaging_handler_recv(subreq);
+       TALLOC_FREE(subreq);
+       tevent_req_error(req, ret);
+}
+
+static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       int ret;
+
+       ret = notifyd_broadcast_reclog_recv(subreq);
+       TALLOC_FREE(subreq);
+       tevent_req_error(req, ret);
+}
+
+static void notifyd_clean_peers_finished(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       int ret;
+
+       ret = notifyd_clean_peers_recv(subreq);
+       TALLOC_FREE(subreq);
+       tevent_req_error(req, ret);
+}
+
+int notifyd_recv(struct tevent_req *req)
+{
+       return tevent_req_simple_recv_unix(req);
+}
+
+/*
+ * Parse an entry in the notifyd_context->entries database
+ */
+
+static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
+                               struct notifyd_instance **instances,
+                               size_t *num_instances)
+{
+       if ((buflen % sizeof(struct notifyd_instance)) != 0) {
+               DEBUG(1, ("%s: invalid buffer size: %u\n",
+                         __func__, (unsigned)buflen));
+               return false;
+       }
+
+       if (instances != NULL) {
+               *instances = (struct notifyd_instance *)buf;
+       }
+       if (num_instances != NULL) {
+               *num_instances = buflen / sizeof(struct notifyd_instance);
+       }
+       return true;
+}
+
+static bool notifyd_apply_rec_change(
+       const struct server_id *client,
+       const char *path, size_t pathlen,
+       const struct notify_instance *chg,
+       struct db_context *entries,
+       sys_notify_watch_fn sys_notify_watch,
+       struct sys_notify_context *sys_notify_ctx,
+       struct messaging_context *msg_ctx)
+{
+       struct db_record *rec;
+       struct notifyd_instance *instances;
+       size_t num_instances;
+       size_t i;
+       struct notifyd_instance *instance;
+       TDB_DATA value;
+       NTSTATUS status;
+       bool ok = false;
+
+       if (pathlen == 0) {
+               DEBUG(1, ("%s: pathlen==0\n", __func__));
+               return false;
+       }
+       if (path[pathlen-1] != '\0') {
+               DEBUG(1, ("%s: path not 0-terminated\n", __func__));
+               return false;
+       }
+
+       DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
+                  "private_data=%p\n", __func__, path,
+                  (unsigned)chg->filter, (unsigned)chg->subdir_filter,
+                  chg->private_data));
+
+       rec = dbwrap_fetch_locked(
+               entries, entries,
+               make_tdb_data((const uint8_t *)path, pathlen-1));
+
+       if (rec == NULL) {
+               DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
+               goto fail;
+       }
+
+       num_instances = 0;
+       value = dbwrap_record_get_value(rec);
+
+       if (value.dsize != 0) {
+               if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
+                                        &num_instances)) {
+                       goto fail;
+               }
+       }
+
+       /*
+        * Overallocate by one instance to avoid a realloc when adding
+        */
+       instances = talloc_array(rec, struct notifyd_instance,
+                                num_instances + 1);
+       if (instances == NULL) {
+               DEBUG(1, ("%s: talloc failed\n", __func__));
+               goto fail;
+       }
+
+       if (value.dsize != 0) {
+               memcpy(instances, value.dptr, value.dsize);
+       }
+
+       for (i=0; i<num_instances; i++) {
+               instance = &instances[i];
+
+               if (server_id_equal(&instance->client, client) &&
+                   (instance->instance.private_data == chg->private_data)) {
+                       break;
+               }
+       }
+
+       if (i < num_instances) {
+               instance->instance = *chg;
+       } else {
+               /*
+                * We've overallocated for one instance
+                */
+               instance = &instances[num_instances];
+
+               *instance = (struct notifyd_instance) {
+                       .client = *client,
+                       .instance = *chg,
+                       .internal_filter = chg->filter,
+                       .internal_subdir_filter = chg->subdir_filter
+               };
+
+               num_instances += 1;
+       }
+
+       if ((instance->instance.filter != 0) ||
+           (instance->instance.subdir_filter != 0)) {
+               int ret;
+
+               TALLOC_FREE(instance->sys_watch);
+
+               ret = sys_notify_watch(entries, sys_notify_ctx, path,
+                                      &instance->internal_filter,
+                                      &instance->internal_subdir_filter,
+                                      notifyd_sys_callback, msg_ctx,
+                                      &instance->sys_watch);
+               if (ret != 0) {
+                       DEBUG(1, ("%s: inotify_watch returned %s\n",
+                                 __func__, strerror(errno)));
+               }
+       }
+
+       if ((instance->instance.filter == 0) &&
+           (instance->instance.subdir_filter == 0)) {
+               /* This is a delete request */
+               TALLOC_FREE(instance->sys_watch);
+               *instance = instances[num_instances-1];
+               num_instances -= 1;
+       }
+
+       DEBUG(10, ("%s: %s has %u instances\n", __func__,
+                  path, (unsigned)num_instances));
+
+       if (num_instances == 0) {
+               status = dbwrap_record_delete(rec);
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
+                                 __func__, nt_errstr(status)));
+                       goto fail;
+               }
+       } else {
+               value = make_tdb_data(
+                       (uint8_t *)instances,
+                       sizeof(struct notifyd_instance) * num_instances);
+
+               status = dbwrap_record_store(rec, value, 0);
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
+                                 __func__, nt_errstr(status)));
+                       goto fail;
+               }
+       }
+
+       ok = true;
+fail:
+       TALLOC_FREE(rec);
+       return ok;
+}
+
+static void notifyd_sys_callback(struct sys_notify_context *ctx,
+                                void *private_data, struct notify_event *ev)
+{
+       struct messaging_context *msg_ctx = talloc_get_type_abort(
+               private_data, struct messaging_context);
+       struct notify_trigger_msg msg;
+       struct iovec iov[4];
+       char slash = '/';
+
+       msg = (struct notify_trigger_msg) {
+               .when = timespec_current(),
+               .action = ev->action,
+               .filter = UINT32_MAX
+       };
+
+       iov[0].iov_base = &msg;
+       iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
+       iov[1].iov_base = discard_const_p(char, ev->dir);
+       iov[1].iov_len = strlen(ev->dir);
+       iov[2].iov_base = &slash;
+       iov[2].iov_len = 1;
+       iov[3].iov_base = discard_const_p(char, ev->path);
+       iov[3].iov_len = strlen(ev->path)+1;
+
+       messaging_send_iov(
+               msg_ctx, messaging_server_id(msg_ctx),
+               MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
+}
+
+static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
+                                    struct notify_rec_change_msg **pmsg,
+                                    size_t *pathlen)
+{
+       struct notify_rec_change_msg *msg;
+
+       if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
+               DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
+                         (unsigned)bufsize));
+               return false;
+       }
+
+       *pmsg = msg = (struct notify_rec_change_msg *)buf;
+       *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
+
+       DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
+                  "private_data=%p, path=%.*s\n",
+                  __func__, (unsigned)msg->instance.filter,
+                  (unsigned)msg->instance.subdir_filter,
+                  msg->instance.private_data, (int)(*pathlen), msg->path));
+
+       return true;
+}
+
+static bool notifyd_rec_change(struct messaging_context *msg_ctx,
+                              struct messaging_rec **prec,
+                              void *private_data)
+{
+       struct notifyd_state *state = talloc_get_type_abort(
+               private_data, struct notifyd_state);
+       struct server_id_buf idbuf;
+       struct messaging_rec *rec = *prec;
+       struct messaging_rec **tmp;
+       struct messaging_reclog *log;
+       struct notify_rec_change_msg *msg;
+       size_t pathlen;
+       bool ok;
+
+       DEBUG(10, ("%s: Got %d bytes from %s\n", __func__,
+                  (unsigned)rec->buf.length,
+                  server_id_str_buf(rec->src, &idbuf)));
+
+       ok = notifyd_parse_rec_change(rec->buf.data, rec->buf.length,
+                                     &msg, &pathlen);
+       if (!ok) {
+               return true;
+       }
+
+       ok = notifyd_apply_rec_change(
+               &rec->src, msg->path, pathlen, &msg->instance,
+               state->entries, state->sys_notify_watch, state->sys_notify_ctx,
+               state->msg_ctx);
+       if (!ok) {
+               DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
+                         __func__));
+               return true;
+       }
+
+       if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
+               return true;
+       }
+       log = state->log;
+
+       tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
+                            log->num_recs+1);
+       if (tmp == NULL) {
+               DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
+               return true;
+       }
+       log->recs = tmp;
+
+       log->recs[log->num_recs] = talloc_move(log->recs, prec);
+       log->num_recs += 1;
+
+       if (log->num_recs >= 100) {
+               /*
+                * Don't let the log grow too large
+                */
+               notifyd_broadcast_reclog(state->ctdbd_conn,
+                                        messaging_server_id(msg_ctx), log);
+       }
+
+       return true;
+}
+
+struct notifyd_trigger_state {
+       struct messaging_context *msg_ctx;
+       struct notify_trigger_msg *msg;
+       bool recursive;
+       bool covered_by_sys_notify;
+};
+
+static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
+                                  void *private_data);
+
+static bool notifyd_trigger(struct messaging_context *msg_ctx,
+                           struct messaging_rec **prec,
+                           void *private_data)
+{
+       struct notifyd_state *state = talloc_get_type_abort(
+               private_data, struct notifyd_state);
+       struct server_id my_id = messaging_server_id(msg_ctx);
+       struct messaging_rec *rec = *prec;
+       struct notifyd_trigger_state tstate;
+       const char *path;
+       const char *p, *next_p;
+
+       if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) {
+               DEBUG(1, ("message too short, ignoring: %u\n",
+                         (unsigned)rec->buf.length));
+               return true;
+       }
+       if (rec->buf.data[rec->buf.length-1] != 0) {
+               DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
+               return true;
+       }
+
+       tstate.msg_ctx = msg_ctx;
+
+       tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn);
+       tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id);
+
+       tstate.msg = (struct notify_trigger_msg *)rec->buf.data;
+       path = tstate.msg->path;
+
+       DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
+                  __func__, (unsigned)tstate.msg->action,
+                  (unsigned)tstate.msg->filter, path));
+
+       if (path[0] != '/') {
+               DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
+                         __func__, path));
+               return true;
+       }
+
+       for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
+               ptrdiff_t path_len = p - path;
+               TDB_DATA key;
+               uint32_t i;
+
+               next_p = strchr(p+1, '/');
+               tstate.recursive = (next_p != NULL);
+
+               DEBUG(10, ("%s: Trying path %.*s\n", __func__,
+                          (int)path_len, path));
+
+               key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
+                                  .dsize = path_len };
+
+               dbwrap_parse_record(state->entries, key,
+                                   notifyd_trigger_parser, &tstate);
+
+               if (state->peers == NULL) {
+                       continue;
+               }
+
+               if (rec->src.vnn != my_id.vnn) {
+                       continue;
+               }
+
+               for (i=0; i<state->num_peers; i++) {
+                       if (state->peers[i]->db == NULL) {
+                               /*
+                                * Inactive peer, did not get a db yet
+                                */
+                               continue;
+                       }
+                       dbwrap_parse_record(state->peers[i]->db, key,
+                                           notifyd_trigger_parser, &tstate);
+               }
+       }
+
+       return true;
+}
+
+static void notifyd_send_delete(struct messaging_context *msg_ctx,
+                               TDB_DATA key,
+                               struct notifyd_instance *instance);
+
+static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
+                                  void *private_data)
+
+{
+       struct notifyd_trigger_state *tstate = private_data;
+       struct notify_event_msg msg = { .action = tstate->msg->action };
+       struct iovec iov[2];
+       size_t path_len = key.dsize;
+       struct notifyd_instance *instances = NULL;
+       size_t num_instances = 0;
+       size_t i;
+
+       if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
+                                &num_instances)) {
+               DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
+               return;
+       }
+
+       DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
+                  (unsigned)num_instances, (int)key.dsize,
+                  (char *)key.dptr));
+
+       iov[0].iov_base = &msg;
+       iov[0].iov_len = offsetof(struct notify_event_msg, path);
+       iov[1].iov_base = tstate->msg->path + path_len + 1;
+       iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
+
+       for (i=0; i<num_instances; i++) {
+               struct notifyd_instance *instance = &instances[i];
+               struct server_id_buf idbuf;
+               uint32_t i_filter;
+               NTSTATUS status;
+
+               if (tstate->covered_by_sys_notify) {
+                       if (tstate->recursive) {
+                               i_filter = instance->internal_subdir_filter;
+                       } else {
+                               i_filter = instance->internal_filter;
+                       }
+               } else {
+                       if (tstate->recursive) {
+                               i_filter = instance->instance.subdir_filter;
+                       } else {
+                               i_filter = instance->instance.filter;
+                       }
+               }
+
+               if ((i_filter & tstate->msg->filter) == 0) {
+                       continue;
+               }
+
+               msg.private_data = instance->instance.private_data;
+
+               status = messaging_send_iov(
+                       tstate->msg_ctx, instance->client,
+                       MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
+
+               DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
+                          __func__,
+                          server_id_str_buf(instance->client, &idbuf),
+                          nt_errstr(status)));
+
+               if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
+                   procid_is_local(&instance->client)) {
+                       /*
+                        * That process has died
+                        */
+                       notifyd_send_delete(tstate->msg_ctx, key, instance);
+                       continue;
+               }
+
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(1, ("%s: messaging_send_iov returned %s\n",
+                                 __func__, nt_errstr(status)));
+               }
+       }
+}
+
+/*
+ * Send a delete request to ourselves to properly discard a notify
+ * record for an smbd that has died.
+ */
+
+static void notifyd_send_delete(struct messaging_context *msg_ctx,
+                               TDB_DATA key,
+                               struct notifyd_instance *instance)
+{
+       struct notify_rec_change_msg msg = {
+               .instance.private_data = instance->instance.private_data
+       };
+       uint8_t nul = 0;
+       struct iovec iov[3];
+       NTSTATUS status;
+
+       /*
+        * Send a rec_change to ourselves to delete a dead entry
+        */
+
+       iov[0] = (struct iovec) {
+               .iov_base = &msg,
+               .iov_len = offsetof(struct notify_rec_change_msg, path) };
+       iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
+       iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
+
+       status = messaging_send_iov_from(
+               msg_ctx, instance->client, messaging_server_id(msg_ctx),
+               MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
+
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
+                          __func__, nt_errstr(status)));
+       }
+}
+
+static bool notifyd_get_db(struct messaging_context *msg_ctx,
+                          struct messaging_rec **prec,
+                          void *private_data)
+{
+       struct notifyd_state *state = talloc_get_type_abort(
+               private_data, struct notifyd_state);
+       struct messaging_rec *rec = *prec;
+       struct server_id_buf id1, id2;
+       NTSTATUS status;
+       uint64_t rec_index = UINT64_MAX;
+       uint8_t index_buf[sizeof(uint64_t)];
+       size_t dbsize;
+       uint8_t *buf;
+       struct iovec iov[2];
+
+       dbsize = dbwrap_marshall(state->entries, NULL, 0);
+
+       buf = talloc_array(rec, uint8_t, dbsize);
+       if (buf == NULL) {
+               DEBUG(1, ("%s: talloc_array(%ju) failed\n",
+                         __func__, (uintmax_t)dbsize));
+               return true;
+       }
+
+       dbsize = dbwrap_marshall(state->entries, buf, dbsize);
+
+       if (dbsize != talloc_get_size(buf)) {
+               DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
+                         (uintmax_t)talloc_get_size(buf),
+                         (uintmax_t)dbsize));
+               TALLOC_FREE(buf);
+               return true;
+       }
+
+       if (state->log != NULL) {
+               rec_index = state->log->rec_index;
+       }
+       SBVAL(index_buf, 0, rec_index);
+
+       iov[0] = (struct iovec) { .iov_base = index_buf,
+                                 .iov_len = sizeof(index_buf) };
+       iov[1] = (struct iovec) { .iov_base = buf,
+                                 .iov_len = dbsize };
+
+       DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
+                  (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
+                  server_id_str_buf(messaging_server_id(msg_ctx), &id1),
+                  server_id_str_buf(rec->src, &id2)));
+
+       status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
+                                   iov, ARRAY_SIZE(iov), NULL, 0);
+       TALLOC_FREE(buf);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
+                         __func__, nt_errstr(status)));
+       }
+
+       return true;
+}
+
+static int notifyd_add_proxy_syswatches(struct db_record *rec,
+                                       void *private_data);
+
+static bool notifyd_got_db(struct messaging_context *msg_ctx,
+                          struct messaging_rec **prec,
+                          void *private_data)
+{
+       struct notifyd_state *state = talloc_get_type_abort(
+               private_data, struct notifyd_state);
+       struct messaging_rec *rec = *prec;
+       struct notifyd_peer *p = NULL;
+       struct server_id_buf idbuf;
+       NTSTATUS status;
+       int count;
+       size_t i;
+
+       for (i=0; i<state->num_peers; i++) {
+               if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
+                       p = state->peers[i];
+                       break;
+               }
+       }
+
+       if (p == NULL) {
+               DEBUG(10, ("%s: Did not find peer for db from %s\n",
+                          __func__, server_id_str_buf(rec->src, &idbuf)));
+               return true;
+       }
+
+       if (rec->buf.length < 8) {
+               DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
+                          (unsigned)rec->buf.length,
+                          server_id_str_buf(rec->src, &idbuf)));
+               TALLOC_FREE(p);
+               return true;
+       }
+
+       p->rec_index = BVAL(rec->buf.data, 0);
+
+       p->db = db_open_rbt(p);
+       if (p->db == NULL) {
+               DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
+               TALLOC_FREE(p);
+               return true;
+       }
+
+       status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
+                                  rec->buf.length - 8);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
+                          __func__, nt_errstr(status),
+                          server_id_str_buf(rec->src, &idbuf)));
+               TALLOC_FREE(p);
+               return true;
+       }
+
+       dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
+                            &count);
+
+       DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
+                  server_id_str_buf(rec->src, &idbuf), count));
+
+       return true;
+}
+
+static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
+                                    struct server_id src,
+                                    struct messaging_reclog *log)
+{
+       NTSTATUS status;
+       enum ndr_err_code ndr_err;
+       uint8_t msghdr[MESSAGE_HDR_LENGTH];
+       DATA_BLOB blob;
+       struct iovec iov[2];
+
+       if (log == NULL) {
+               return;
+       }
+
+       DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
+                  (uintmax_t)log->rec_index, (unsigned)log->num_recs));
+
+       message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
+                       (struct server_id) {0 });
+       iov[0] = (struct iovec) { .iov_base = msghdr,
+                                 .iov_len = sizeof(msghdr) };
+
+       ndr_err = ndr_push_struct_blob(
+               &blob, log, log,
+               (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
+                         __func__, ndr_errstr(ndr_err)));
+               goto done;
+       }
+       iov[1] = (struct iovec) { .iov_base = blob.data,
+                                 .iov_len = blob.length };
+
+       status = ctdbd_messaging_send_iov(
+               ctdbd_conn, CTDB_BROADCAST_VNNMAP,
+               CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
+       TALLOC_FREE(blob.data);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
+                         __func__, nt_errstr(status)));
+               goto done;
+       }
+
+       log->rec_index += 1;
+
+done:
+       log->num_recs = 0;
+       TALLOC_FREE(log->recs);
+}
+
+struct notifyd_broadcast_reclog_state {
+       struct tevent_context *ev;
+       struct ctdbd_connection *ctdbd_conn;
+       struct server_id src;
+       struct messaging_reclog *log;
+};
+
+static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
+
+static struct tevent_req *notifyd_broadcast_reclog_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       struct ctdbd_connection *ctdbd_conn, struct server_id src,
+       struct messaging_reclog *log)
+{
+       struct tevent_req *req, *subreq;
+       struct notifyd_broadcast_reclog_state *state;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct notifyd_broadcast_reclog_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+       state->ctdbd_conn = ctdbd_conn;
+       state->src = src;
+       state->log = log;
+
+       subreq = tevent_wakeup_send(state, state->ev,
+                                   timeval_current_ofs_msec(1000));
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
+       return req;
+}
+
+static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct notifyd_broadcast_reclog_state *state = tevent_req_data(
+               req, struct notifyd_broadcast_reclog_state);
+       bool ok;
+
+       ok = tevent_wakeup_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (!ok) {
+               tevent_req_oom(req);
+               return;
+       }
+
+       notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
+
+       subreq = tevent_wakeup_send(state, state->ev,
+                                   timeval_current_ofs_msec(1000));
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
+}
+
+static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
+{
+       return tevent_req_simple_recv_unix(req);
+}
+
+struct notifyd_clean_peers_state {
+       struct tevent_context *ev;
+       struct notifyd_state *notifyd;
+};
+
+static void notifyd_clean_peers_next(struct tevent_req *subreq);
+
+static struct tevent_req *notifyd_clean_peers_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       struct notifyd_state *notifyd)
+{
+       struct tevent_req *req, *subreq;
+       struct notifyd_clean_peers_state *state;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct notifyd_clean_peers_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+       state->notifyd = notifyd;
+
+       subreq = tevent_wakeup_send(state, state->ev,
+                                   timeval_current_ofs_msec(30000));
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
+       return req;
+}
+
+static void notifyd_clean_peers_next(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct notifyd_clean_peers_state *state = tevent_req_data(
+               req, struct notifyd_clean_peers_state);
+       struct notifyd_state *notifyd = state->notifyd;
+       size_t i;
+       bool ok;
+       time_t now = time(NULL);
+
+       ok = tevent_wakeup_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (!ok) {
+               tevent_req_oom(req);
+               return;
+       }
+
+       i = 0;
+       while (i < notifyd->num_peers) {
+               struct notifyd_peer *p = notifyd->peers[i];
+
+               if ((now - p->last_broadcast) > 60) {
+                       struct server_id_buf idbuf;
+
+                       /*
+                        * Haven't heard for more than 60 seconds. Call this
+                        * peer dead
+                        */
+
+                       DEBUG(10, ("%s: peer %s died\n", __func__,
+                                  server_id_str_buf(p->pid, &idbuf)));
+                       /*
+                        * This implicitly decrements notifyd->num_peers
+                        */
+                       TALLOC_FREE(p);
+               } else {
+                       i += 1;
+               }
+       }
+
+       subreq = tevent_wakeup_send(state, state->ev,
+                                   timeval_current_ofs_msec(30000));
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
+}
+
+static int notifyd_clean_peers_recv(struct tevent_req *req)
+{
+       return tevent_req_simple_recv_unix(req);
+}
+
+static int notifyd_add_proxy_syswatches(struct db_record *rec,
+                                       void *private_data)
+{
+       struct notifyd_state *state = talloc_get_type_abort(
+               private_data, struct notifyd_state);
+       struct db_context *db = dbwrap_record_get_db(rec);
+       TDB_DATA key = dbwrap_record_get_key(rec);
+       TDB_DATA value = dbwrap_record_get_value(rec);
+       struct notifyd_instance *instances = NULL;
+       size_t num_instances = 0;
+       size_t i;
+       char path[key.dsize+1];
+       bool ok;
+
+       memcpy(path, key.dptr, key.dsize);
+       path[key.dsize] = '\0';
+
+       ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
+                                &num_instances);
+       if (!ok) {
+               DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
+                         __func__, path));
+               return 0;
+       }
+
+       for (i=0; i<num_instances; i++) {
+               struct notifyd_instance *instance = &instances[i];
+               uint32_t filter = instance->instance.filter;
+               uint32_t subdir_filter = instance->instance.subdir_filter;
+               int ret;
+
+               ret = state->sys_notify_watch(
+                       db, state->sys_notify_ctx, path,
+                       &filter, &subdir_filter,
+                       notifyd_sys_callback, state->msg_ctx,
+                       &instance->sys_watch);
+               if (ret != 0) {
+                       DEBUG(1, ("%s: inotify_watch returned %s\n",
+                                 __func__, strerror(errno)));
+               }
+       }
+
+       return 0;
+}
+
+static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
+{
+       TDB_DATA key = dbwrap_record_get_key(rec);
+       TDB_DATA value = dbwrap_record_get_value(rec);
+       struct notifyd_instance *instances = NULL;
+       size_t num_instances = 0;
+       size_t i;
+       bool ok;
+
+       ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
+                                &num_instances);
+       if (!ok) {
+               DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
+                         __func__, (int)key.dsize, (char *)key.dptr));
+               return 0;
+       }
+       for (i=0; i<num_instances; i++) {
+               TALLOC_FREE(instances[i].sys_watch);
+       }
+       return 0;
+}
+
+static int notifyd_peer_destructor(struct notifyd_peer *p)
+{
+       struct notifyd_state *state = p->state;
+       size_t i;
+
+       dbwrap_traverse_read(p->db, notifyd_db_del_syswatches, NULL, NULL);
+
+       for (i = 0; i<state->num_peers; i++) {
+               if (p == state->peers[i]) {
+                       state->peers[i] = state->peers[state->num_peers-1];
+                       state->num_peers -= 1;
+                       break;
+               }
+       }
+       return 0;
+}
+
+static struct notifyd_peer *notifyd_peer_new(
+       struct notifyd_state *state, struct server_id pid)
+{
+       struct notifyd_peer *p, **tmp;
+
+       tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
+                            state->num_peers+1);
+       if (tmp == NULL) {
+               return NULL;
+       }
+       state->peers = tmp;
+
+       p = talloc_zero(state->peers, struct notifyd_peer);
+       if (p == NULL) {
+               return NULL;
+       }
+       p->state = state;
+       p->pid = pid;
+
+       state->peers[state->num_peers] = p;
+       state->num_peers += 1;
+
+       talloc_set_destructor(p, notifyd_peer_destructor);
+
+       return p;
+}
+
+static void notifyd_apply_reclog(struct notifyd_peer *peer,
+                                const uint8_t *msg, size_t msglen)
+{
+       struct notifyd_state *state = peer->state;
+       DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
+                          .length = msglen };
+       struct server_id_buf idbuf;
+       struct messaging_reclog *log;
+       enum ndr_err_code ndr_err;
+       uint32_t i;
+
+       if (peer->db == NULL) {
+               /*
+                * No db yet
+                */
+               return;
+       }
+
+       log = talloc(peer, struct messaging_reclog);
+       if (log == NULL) {
+               DEBUG(10, ("%s: talloc failed\n", __func__));
+               return;
+       }
+
+       ndr_err = ndr_pull_struct_blob_all(
+               &blob, log, log,
+               (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
+                          __func__, ndr_errstr(ndr_err)));
+               goto fail;
+       }
+
+       DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
+                  (unsigned)log->num_recs, (uintmax_t)log->rec_index,
+                  server_id_str_buf(peer->pid, &idbuf)));
+
+       if (log->rec_index != peer->rec_index) {
+               DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
+                         __func__, (uintmax_t)log->rec_index,
+                         server_id_str_buf(peer->pid, &idbuf),
+                         (uintmax_t)peer->rec_index));
+               goto fail;
+       }
+
+       for (i=0; i<log->num_recs; i++) {
+               struct messaging_rec *r = log->recs[i];
+               struct notify_rec_change_msg *chg;
+               size_t pathlen;
+               bool ok;
+
+               ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
+                                             &chg, &pathlen);
+               if (!ok) {
+                       DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
+                                 __func__));
+                       goto fail;
+               }
+
+               ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
+                                             &chg->instance, peer->db,
+                                             state->sys_notify_watch,
+                                             state->sys_notify_ctx,
+                                             state->msg_ctx);
+               if (!ok) {
+                       DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
+                                 __func__));
+                       goto fail;
+               }
+       }
+
+       peer->rec_index += 1;
+       peer->last_broadcast = time(NULL);
+
+       TALLOC_FREE(log);
+       return;
+
+fail:
+       DEBUG(10, ("%s: Dropping peer %s\n", __func__,
+                  server_id_str_buf(peer->pid, &idbuf)));
+       TALLOC_FREE(peer);
+}
+
+/*
+ * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
+ * messages) broadcasts by other notifyds. Several cases:
+ *
+ * We don't know the source. This creates a new peer. Creating a peer
+ * involves asking the peer for its full database. We assume ordered
+ * messages, so the new database will arrive before the next broadcast
+ * will.
+ *
+ * We know the source and the log index matches. We will apply the log
+ * locally to our peer's db as if we had received it from a local
+ * client.
+ *
+ * We know the source but the log index does not match. This means we
+ * lost a message. We just drop the whole peer and wait for the next
+ * broadcast, which will then trigger a fresh database pull.
+ */
+
+static void notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
+                                   uint64_t dst_srvid,
+                                   const uint8_t *msg, size_t msglen,
+                                   void *private_data)
+{
+       struct notifyd_state *state = talloc_get_type_abort(
+               private_data, struct notifyd_state);
+       struct server_id my_id = messaging_server_id(state->msg_ctx);
+       struct notifyd_peer *p;
+       uint32_t i;
+       uint32_t msg_type;
+       struct server_id src, dst;
+       struct server_id_buf idbuf;
+       NTSTATUS status;
+
+       if (msglen < MESSAGE_HDR_LENGTH) {
+               DEBUG(10, ("%s: Got short broadcast\n", __func__));
+               return;
+       }
+       message_hdr_get(&msg_type, &src, &dst, msg);
+
+       if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
+               DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
+                          (unsigned)msg_type));
+               return;
+       }
+       if (server_id_equal(&src, &my_id)) {
+               DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
+               return;
+       }
+
+       DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
+                  __func__, server_id_str_buf(src, &idbuf)));
+
+       for (i=0; i<state->num_peers; i++) {
+               if (server_id_equal(&state->peers[i]->pid, &src)) {
+
+                       DEBUG(10, ("%s: Applying changes to peer %u\n",
+                                  __func__, (unsigned)i));
+
+                       notifyd_apply_reclog(state->peers[i],
+                                            msg + MESSAGE_HDR_LENGTH,
+                                            msglen - MESSAGE_HDR_LENGTH);
+                       return;
+               }
+       }
+
+       DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
+                  server_id_str_buf(src, &idbuf)));
+
+       p = notifyd_peer_new(state, src);
+       if (p == NULL) {
+               DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
+               return;
+       }
+
+       status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
+                                   NULL, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
+                          __func__, nt_errstr(status)));
+               TALLOC_FREE(p);
+               return;
+       }
+}
diff --git a/source3/smbd/notifyd/notifyd.h b/source3/smbd/notifyd/notifyd.h
new file mode 100644 (file)
index 0000000..dc0a4e8
--- /dev/null
@@ -0,0 +1,143 @@
+/*
+ * Unix SMB/CIFS implementation.
+ *
+ * Copyright (C) Volker Lendecke 2014
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __NOTIFYD_NOTIFYD_H__
+#define __NOTIFYD_NOTIFYD_H__
+
+#include "includes.h"
+#include "librpc/gen_ndr/notify.h"
+#include "librpc/gen_ndr/messaging.h"
+#include "lib/dbwrap/dbwrap.h"
+#include "lib/dbwrap/dbwrap_rbt.h"
+#include "messages.h"
+#include "tdb.h"
+#include "util_tdb.h"
+
+/*
+ * Filechangenotify based on asynchronous messages
+ *
+ * smbds talk to local notify daemons to inform them about paths they are
+ * interested in. They also tell local notify daemons about changes they have
+ * done to the file system. There's two message types from smbd to
+ * notifyd. The first is used to inform notifyd about changes in notify
+ * interest. These are only sent from smbd to notifyd if the SMB client issues
+ * FileChangeNotify requests.
+ */
+
+/*
+ * The notifyd implementation is designed to cope with multiple daemons taking
+ * care of just a subset of smbds. The goal is to minimize the traffic between
+ * the notify daemons. The idea behind this is a samba/ctdb cluster, but it
+ * could also be used to spread the load of notifyd instances on a single
+ * node, should this become a bottleneck. The following diagram illustrates
+ * the setup. The numbers in the boxes are node:process ids.
+ *
+ *         +-----------+                  +-----------+
+ *         |notifyd 0:5|------------------|notifyd 1:6|
+ *         +-----------+                  +-----------+
+ *            / |  \                         /    \
+ *           /  |   \                       /      \
+ *   +--------+ | +--------+        +--------+   +--------+
+ *   |smbd 0:1| | |smbd 0:4|        |smbd 1:7|   |smbd 1:2|
+ *   +--------+ | +--------+        +--------+   +--------+
+ *              |
+ *                +---------+
+ *        |smbd 0:20|
+ *        +---------+
+ *
+ * Suppose 0:1 and 0:4 are interested in changes for /foo and 0:20 creates the
+ * file /foo/bar, if everything fully connected, 0:20 would have to send two
+ * local messages, one to 0:1 and one to 0:4. With the notifyd design, 0:20
+ * only has to send one message, it lets notifyd 0:5 do the hard work to
+ * multicast the change to 0:1 and 0:4.
+ *
+ * Now lets assume 1:7 on the other node creates /foo/baz. It tells its
+ * notifyd 1:6 about this change. All 1:6 will know about is that its peer
+ * notifyd 0:5 is interested in the change. Thus it forwards the event to 0:5,
+ * which sees it as if it came from just another local event creator. 0:5 will
+ * multicast the change to 0:1 and 0:4. To prevent notify loops, the message
+ * from 1:6 to 0:5 will carry a "proxied" flag, so that 0:5 will only forward
+ * the event to local clients.
+ */
+
+/*
+ * Data that notifyd maintains per smbd notify instance
+ */
+struct notify_instance {
+       struct timespec creation_time;
+       uint32_t filter;
+       uint32_t subdir_filter;
+       void *private_data;
+};
+
+/* MSG_SMB_NOTIFY_REC_CHANGE payload */
+struct notify_rec_change_msg {
+       struct notify_instance instance;
+       char path[];
+};
+
+/*
+ * The second message from smbd to notifyd is sent whenever an smbd makes a
+ * file system change. It tells notifyd to inform all interested parties about
+ * that change. This is the message that needs to be really fast in smbd
+ * because it is called a lot.
+ */
+
+/* MSG_SMB_NOTIFY_TRIGGER payload */
+struct notify_trigger_msg {
+       struct timespec when;
+       uint32_t action;
+       uint32_t filter;
+       char path[];
+};
+
+/*
+ * In response to a MSG_SMB_NOTIFY_TRIGGER message notifyd walks its database
+ * and sends out the following message to all interested clients
+ */
+
+/* MSG_PVFS_NOTIFY payload */
+struct notify_event_msg {
+       struct timespec when;
+       void *private_data;
+       uint32_t action;
+       char path[];
+};
+
+struct sys_notify_context;
+
+typedef int (*sys_notify_watch_fn)(TALLOC_CTX *mem_ctx,
+                                  struct sys_notify_context *ctx,
+                                  const char *path,
+                                  uint32_t *filter,
+                                  uint32_t *subdir_filter,
+                                  void (*callback)(struct sys_notify_context *ctx,
+                                                   void *private_data,
+                                                   struct notify_event *ev),
+                                  void *private_data,
+                                  void *handle_p);
+
+struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+                               struct messaging_context *msg_ctx,
+                               struct ctdbd_connection *ctdbd_conn,
+                               sys_notify_watch_fn sys_notify_watch,
+                               struct sys_notify_context *sys_notify_ctx);
+int notifyd_recv(struct tevent_req *req);
+
+#endif
diff --git a/source3/smbd/notifyd/tests.c b/source3/smbd/notifyd/tests.c
new file mode 100644 (file)
index 0000000..6bcce6a
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Unix SMB/CIFS implementation.
+ *
+ * Copyright (C) Volker Lendecke 2014
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "notifyd.h"
+#include "messages.h"
+#include "lib/util/server_id_db.h"
+
+int main(int argc, const char *argv[])
+{
+       TALLOC_CTX *frame = talloc_stackframe();
+       struct tevent_context *ev;
+       struct messaging_context *msg_ctx;
+       struct server_id_db *names;
+       struct server_id notifyd;
+       struct tevent_req *req;
+       unsigned i;
+       bool ok;
+
+       if (argc != 2) {
+               fprintf(stderr, "Usage: %s <smb.conf-file>\n", argv[0]);
+               exit(1);
+       }
+
+       setup_logging(argv[0], DEBUG_STDOUT);
+       lp_load_global(argv[1]);
+
+       ev = tevent_context_init(NULL);
+       if (ev == NULL) {
+               fprintf(stderr, "tevent_context_init failed\n");
+               exit(1);
+       }
+
+       msg_ctx = messaging_init(ev, ev);
+       if (msg_ctx == NULL) {
+               fprintf(stderr, "messaging_init failed\n");
+               exit(1);
+       }
+
+       names = messaging_names_db(msg_ctx);
+
+       ok = server_id_db_lookup_one(names, "notify-daemon", &notifyd);
+       if (!ok) {
+               fprintf(stderr, "no notifyd\n");
+               exit(1);
+       }
+
+       for (i=0; i<50000; i++) {
+               struct notify_rec_change_msg msg = {
+                       .instance.filter = UINT32_MAX,
+                       .instance.subdir_filter = UINT32_MAX
+               };
+               char path[64];
+               size_t len;
+               struct iovec iov[2];
+               NTSTATUS status;
+
+               len = snprintf(path, sizeof(path), "/tmp%u", i);
+
+               iov[0].iov_base = &msg;
+               iov[0].iov_len = offsetof(struct notify_rec_change_msg, path);
+               iov[1].iov_base = path;
+               iov[1].iov_len = len+1;
+
+               status = messaging_send_iov(
+                       msg_ctx, notifyd, MSG_SMB_NOTIFY_REC_CHANGE,
+                       iov, ARRAY_SIZE(iov), NULL, 0);
+               if (!NT_STATUS_IS_OK(status)) {
+                       fprintf(stderr, "messaging_send_iov returned %s\n",
+                               nt_errstr(status));
+                       exit(1);
+               }
+
+               msg.instance.filter = 0;
+               msg.instance.subdir_filter = 0;
+
+               status = messaging_send_iov(
+                       msg_ctx, notifyd, MSG_SMB_NOTIFY_REC_CHANGE,
+                       iov, ARRAY_SIZE(iov), NULL, 0);
+               if (!NT_STATUS_IS_OK(status)) {
+                       fprintf(stderr, "messaging_send_iov returned %s\n",
+                               nt_errstr(status));
+                       exit(1);
+               }
+       }
+
+       req = messaging_read_send(ev, ev, msg_ctx, MSG_PONG);
+       if (req == NULL) {
+               fprintf(stderr, "messaging_read_send failed\n");
+               exit(1);
+       }
+       messaging_send_buf(msg_ctx, notifyd, MSG_PING, NULL, 0);
+
+       ok = tevent_req_poll(req, ev);
+       if (!ok) {
+               fprintf(stderr, "tevent_req_poll failed\n");
+               exit(1);
+       }
+
+       TALLOC_FREE(frame);
+       return 0;
+}
diff --git a/source3/smbd/notifyd/wscript_build b/source3/smbd/notifyd/wscript_build
new file mode 100644 (file)
index 0000000..90a9505
--- /dev/null
@@ -0,0 +1,12 @@
+#!/usr/bin/env python
+
+bld.SAMBA3_SUBSYSTEM('notifyd',
+                    source='notifyd.c',
+                     deps='util_tdb TDB_LIB messages_util')
+
+bld.SAMBA3_BINARY('notifyd-tests',
+                  source='tests.c',
+                  install=False,
+                  deps='''
+                    param
+                  ''')
index 9863b485457288960a5041b8ce8316dea61e5818..e9f276cc1fe05d4f86ed82b4accf069872db7f78 100755 (executable)
@@ -1536,6 +1536,7 @@ bld.RECURSE('../examples/pdb')
 bld.RECURSE('../examples/VFS')
 bld.RECURSE('lib/netapi/tests')
 bld.RECURSE('lib/netapi/examples')
+bld.RECURSE('smbd/notifyd')
 
 bld.ENFORCE_GROUP_ORDERING()
 bld.CHECK_PROJECT_RULES()