2 * Unix SMB/CIFS implementation.
4 * Copyright (C) Volker Lendecke 2014
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "lib/util/server_id.h"
23 #include "lib/util/data_blob.h"
24 #include "librpc/gen_ndr/notify.h"
25 #include "librpc/gen_ndr/messaging.h"
26 #include "librpc/gen_ndr/server_id.h"
27 #include "lib/dbwrap/dbwrap.h"
28 #include "lib/dbwrap/dbwrap_rbt.h"
33 #include "lib/util/server_id_db.h"
34 #include "lib/util/tevent_unix.h"
35 #include "ctdbd_conn.h"
36 #include "ctdb_srvids.h"
37 #include "server_id_db_util.h"
38 #include "lib/util/iov_buf.h"
39 #include "messages_util.h"
41 #ifdef CLUSTER_SUPPORT
42 #include "ctdb_protocol.h"
48 * All of notifyd's state
51 struct notifyd_state {
52 struct tevent_context *ev;
53 struct messaging_context *msg_ctx;
54 struct ctdbd_connection *ctdbd_conn;
57 * Database of everything clients show interest in. Indexed by
58 * absolute path. The database keys are not 0-terminated
59 * because the criticial operation, notifyd_trigger, can walk
60 * the structure from the top without adding intermediate 0s.
61 * The database records contain an array of
63 * struct notifyd_instance
65 * to be maintained and parsed by notifyd_entry_parse()
67 struct db_context *entries;
70 * In the cluster case, this is the place where we store a log
71 * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
72 * forward them to our peer notifyd's in the cluster once a
73 * second or when the log grows too large.
76 struct messaging_reclog *log;
79 * Array of companion notifyd's in a cluster. Every notifyd
80 * broadcasts its messaging_reclog to every other notifyd in
81 * the cluster. This is done by making ctdb send a message to
82 * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
83 * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
84 * had called register_with_ctdbd this srvid will receive the
87 * Database replication happens via these broadcasts. Also,
88 * they serve as liveness indication. If a notifyd receives a
89 * broadcast from an unknown peer, it will create one for this
90 * srvid. Also when we don't hear anything from a peer for a
91 * while, we will discard it.
94 struct notifyd_peer **peers;
97 sys_notify_watch_fn sys_notify_watch;
98 struct sys_notify_context *sys_notify_ctx;
102 * notifyd's representation of a notify instance
104 struct notifyd_instance {
105 struct server_id client;
106 struct notify_instance instance;
108 void *sys_watch; /* inotify/fam/etc handle */
111 * Filters after sys_watch took responsibility of some bits
113 uint32_t internal_filter;
114 uint32_t internal_subdir_filter;
117 struct notifyd_peer {
118 struct notifyd_state *state;
119 struct server_id pid;
121 struct db_context *db;
122 time_t last_broadcast;
125 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
126 struct messaging_rec **prec,
128 static bool notifyd_trigger(struct messaging_context *msg_ctx,
129 struct messaging_rec **prec,
131 static bool notifyd_get_db(struct messaging_context *msg_ctx,
132 struct messaging_rec **prec,
135 #ifdef CLUSTER_SUPPORT
136 static bool notifyd_got_db(struct messaging_context *msg_ctx,
137 struct messaging_rec **prec,
139 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
140 struct server_id src,
141 struct messaging_reclog *log);
143 static void notifyd_sys_callback(struct sys_notify_context *ctx,
144 void *private_data, struct notify_event *ev,
147 #ifdef CLUSTER_SUPPORT
148 static struct tevent_req *notifyd_broadcast_reclog_send(
149 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
150 struct ctdbd_connection *ctdbd_conn, struct server_id src,
151 struct messaging_reclog *log);
152 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
154 static struct tevent_req *notifyd_clean_peers_send(
155 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
156 struct notifyd_state *notifyd);
157 static int notifyd_clean_peers_recv(struct tevent_req *req);
160 static int sys_notify_watch_dummy(
162 struct sys_notify_context *ctx,
165 uint32_t *subdir_filter,
166 void (*callback)(struct sys_notify_context *ctx,
168 struct notify_event *ev,
173 void **handle = handle_p;
178 static void notifyd_handler_done(struct tevent_req *subreq);
180 #ifdef CLUSTER_SUPPORT
181 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
182 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
183 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
185 const uint8_t *msg, size_t msglen,
189 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
190 struct messaging_context *msg_ctx,
191 struct ctdbd_connection *ctdbd_conn,
192 sys_notify_watch_fn sys_notify_watch,
193 struct sys_notify_context *sys_notify_ctx)
195 struct tevent_req *req, *subreq;
196 struct notifyd_state *state;
197 struct server_id_db *names_db;
200 req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
205 state->msg_ctx = msg_ctx;
206 state->ctdbd_conn = ctdbd_conn;
208 if (sys_notify_watch == NULL) {
209 sys_notify_watch = sys_notify_watch_dummy;
212 state->sys_notify_watch = sys_notify_watch;
213 state->sys_notify_ctx = sys_notify_ctx;
215 state->entries = db_open_rbt(state);
216 if (tevent_req_nomem(state->entries, req)) {
217 return tevent_req_post(req, ev);
220 subreq = messaging_handler_send(state, ev, msg_ctx,
221 MSG_SMB_NOTIFY_REC_CHANGE,
222 notifyd_rec_change, state);
223 if (tevent_req_nomem(subreq, req)) {
224 return tevent_req_post(req, ev);
226 tevent_req_set_callback(subreq, notifyd_handler_done, req);
228 subreq = messaging_handler_send(state, ev, msg_ctx,
229 MSG_SMB_NOTIFY_TRIGGER,
230 notifyd_trigger, state);
231 if (tevent_req_nomem(subreq, req)) {
232 return tevent_req_post(req, ev);
234 tevent_req_set_callback(subreq, notifyd_handler_done, req);
236 subreq = messaging_handler_send(state, ev, msg_ctx,
237 MSG_SMB_NOTIFY_GET_DB,
238 notifyd_get_db, state);
239 if (tevent_req_nomem(subreq, req)) {
240 return tevent_req_post(req, ev);
242 tevent_req_set_callback(subreq, notifyd_handler_done, req);
244 names_db = messaging_names_db(msg_ctx);
246 ret = server_id_db_set_exclusive(names_db, "notify-daemon");
248 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
249 __func__, strerror(ret)));
250 tevent_req_error(req, ret);
251 return tevent_req_post(req, ev);
254 if (ctdbd_conn == NULL) {
256 * No cluster around, skip the database replication
262 #ifdef CLUSTER_SUPPORT
263 subreq = messaging_handler_send(state, ev, msg_ctx,
265 notifyd_got_db, state);
266 if (tevent_req_nomem(subreq, req)) {
267 return tevent_req_post(req, ev);
269 tevent_req_set_callback(subreq, notifyd_handler_done, req);
271 state->log = talloc_zero(state, struct messaging_reclog);
272 if (tevent_req_nomem(state->log, req)) {
273 return tevent_req_post(req, ev);
276 subreq = notifyd_broadcast_reclog_send(
277 state->log, ev, ctdbd_conn,
278 messaging_server_id(msg_ctx),
280 if (tevent_req_nomem(subreq, req)) {
281 return tevent_req_post(req, ev);
283 tevent_req_set_callback(subreq,
284 notifyd_broadcast_reclog_finished,
287 subreq = notifyd_clean_peers_send(state, ev, state);
288 if (tevent_req_nomem(subreq, req)) {
289 return tevent_req_post(req, ev);
291 tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
294 ret = register_with_ctdbd(ctdbd_conn,
295 CTDB_SRVID_SAMBA_NOTIFY_PROXY,
296 notifyd_snoop_broadcast, state);
298 tevent_req_error(req, ret);
299 return tevent_req_post(req, ev);
306 static void notifyd_handler_done(struct tevent_req *subreq)
308 struct tevent_req *req = tevent_req_callback_data(
309 subreq, struct tevent_req);
312 ret = messaging_handler_recv(subreq);
314 tevent_req_error(req, ret);
317 #ifdef CLUSTER_SUPPORT
319 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
321 struct tevent_req *req = tevent_req_callback_data(
322 subreq, struct tevent_req);
325 ret = notifyd_broadcast_reclog_recv(subreq);
327 tevent_req_error(req, ret);
330 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
332 struct tevent_req *req = tevent_req_callback_data(
333 subreq, struct tevent_req);
336 ret = notifyd_clean_peers_recv(subreq);
338 tevent_req_error(req, ret);
343 int notifyd_recv(struct tevent_req *req)
345 return tevent_req_simple_recv_unix(req);
349 * Parse an entry in the notifyd_context->entries database
352 static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
353 struct notifyd_instance **instances,
354 size_t *num_instances)
356 if ((buflen % sizeof(struct notifyd_instance)) != 0) {
357 DEBUG(1, ("%s: invalid buffer size: %u\n",
358 __func__, (unsigned)buflen));
362 if (instances != NULL) {
363 *instances = (struct notifyd_instance *)buf;
365 if (num_instances != NULL) {
366 *num_instances = buflen / sizeof(struct notifyd_instance);
371 static bool notifyd_apply_rec_change(
372 const struct server_id *client,
373 const char *path, size_t pathlen,
374 const struct notify_instance *chg,
375 struct db_context *entries,
376 sys_notify_watch_fn sys_notify_watch,
377 struct sys_notify_context *sys_notify_ctx,
378 struct messaging_context *msg_ctx)
380 struct db_record *rec;
381 struct notifyd_instance *instances;
382 size_t num_instances;
384 struct notifyd_instance *instance;
390 DEBUG(1, ("%s: pathlen==0\n", __func__));
393 if (path[pathlen-1] != '\0') {
394 DEBUG(1, ("%s: path not 0-terminated\n", __func__));
398 DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
399 "private_data=%p\n", __func__, path,
400 (unsigned)chg->filter, (unsigned)chg->subdir_filter,
403 rec = dbwrap_fetch_locked(
405 make_tdb_data((const uint8_t *)path, pathlen-1));
408 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
413 value = dbwrap_record_get_value(rec);
415 if (value.dsize != 0) {
416 if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
423 * Overallocate by one instance to avoid a realloc when adding
425 instances = talloc_array(rec, struct notifyd_instance,
427 if (instances == NULL) {
428 DEBUG(1, ("%s: talloc failed\n", __func__));
432 if (value.dsize != 0) {
433 memcpy(instances, value.dptr, value.dsize);
436 for (i=0; i<num_instances; i++) {
437 instance = &instances[i];
439 if (server_id_equal(&instance->client, client) &&
440 (instance->instance.private_data == chg->private_data)) {
445 if (i < num_instances) {
446 instance->instance = *chg;
449 * We've overallocated for one instance
451 instance = &instances[num_instances];
453 *instance = (struct notifyd_instance) {
456 .internal_filter = chg->filter,
457 .internal_subdir_filter = chg->subdir_filter
463 if ((instance->instance.filter != 0) ||
464 (instance->instance.subdir_filter != 0)) {
467 TALLOC_FREE(instance->sys_watch);
469 ret = sys_notify_watch(entries, sys_notify_ctx, path,
470 &instance->internal_filter,
471 &instance->internal_subdir_filter,
472 notifyd_sys_callback, msg_ctx,
473 &instance->sys_watch);
475 DEBUG(1, ("%s: inotify_watch returned %s\n",
476 __func__, strerror(errno)));
480 if ((instance->instance.filter == 0) &&
481 (instance->instance.subdir_filter == 0)) {
482 /* This is a delete request */
483 TALLOC_FREE(instance->sys_watch);
484 *instance = instances[num_instances-1];
488 DEBUG(10, ("%s: %s has %u instances\n", __func__,
489 path, (unsigned)num_instances));
491 if (num_instances == 0) {
492 status = dbwrap_record_delete(rec);
493 if (!NT_STATUS_IS_OK(status)) {
494 DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
495 __func__, nt_errstr(status)));
499 value = make_tdb_data(
500 (uint8_t *)instances,
501 sizeof(struct notifyd_instance) * num_instances);
503 status = dbwrap_record_store(rec, value, 0);
504 if (!NT_STATUS_IS_OK(status)) {
505 DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
506 __func__, nt_errstr(status)));
517 static void notifyd_sys_callback(struct sys_notify_context *ctx,
518 void *private_data, struct notify_event *ev,
521 struct messaging_context *msg_ctx = talloc_get_type_abort(
522 private_data, struct messaging_context);
523 struct notify_trigger_msg msg;
527 msg = (struct notify_trigger_msg) {
528 .when = timespec_current(),
529 .action = ev->action,
533 iov[0].iov_base = &msg;
534 iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
535 iov[1].iov_base = discard_const_p(char, ev->dir);
536 iov[1].iov_len = strlen(ev->dir);
537 iov[2].iov_base = &slash;
539 iov[3].iov_base = discard_const_p(char, ev->path);
540 iov[3].iov_len = strlen(ev->path)+1;
543 msg_ctx, messaging_server_id(msg_ctx),
544 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
547 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
548 struct notify_rec_change_msg **pmsg,
551 struct notify_rec_change_msg *msg;
553 if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
554 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
559 *pmsg = msg = (struct notify_rec_change_msg *)buf;
560 *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
562 DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
563 "private_data=%p, path=%.*s\n",
564 __func__, (unsigned)msg->instance.filter,
565 (unsigned)msg->instance.subdir_filter,
566 msg->instance.private_data, (int)(*pathlen), msg->path));
571 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
572 struct messaging_rec **prec,
575 struct notifyd_state *state = talloc_get_type_abort(
576 private_data, struct notifyd_state);
577 struct server_id_buf idbuf;
578 struct messaging_rec *rec = *prec;
579 struct notify_rec_change_msg *msg;
583 DEBUG(10, ("%s: Got %d bytes from %s\n", __func__,
584 (unsigned)rec->buf.length,
585 server_id_str_buf(rec->src, &idbuf)));
587 ok = notifyd_parse_rec_change(rec->buf.data, rec->buf.length,
593 ok = notifyd_apply_rec_change(
594 &rec->src, msg->path, pathlen, &msg->instance,
595 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
598 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
603 if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
607 #ifdef CLUSTER_SUPPORT
610 struct messaging_rec **tmp;
611 struct messaging_reclog *log;
615 tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
618 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
623 log->recs[log->num_recs] = talloc_move(log->recs, prec);
626 if (log->num_recs >= 100) {
628 * Don't let the log grow too large
630 notifyd_broadcast_reclog(state->ctdbd_conn,
631 messaging_server_id(msg_ctx), log);
640 struct notifyd_trigger_state {
641 struct messaging_context *msg_ctx;
642 struct notify_trigger_msg *msg;
644 bool covered_by_sys_notify;
647 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
650 static bool notifyd_trigger(struct messaging_context *msg_ctx,
651 struct messaging_rec **prec,
654 struct notifyd_state *state = talloc_get_type_abort(
655 private_data, struct notifyd_state);
656 struct server_id my_id = messaging_server_id(msg_ctx);
657 struct messaging_rec *rec = *prec;
658 struct notifyd_trigger_state tstate;
660 const char *p, *next_p;
662 if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) {
663 DEBUG(1, ("message too short, ignoring: %u\n",
664 (unsigned)rec->buf.length));
667 if (rec->buf.data[rec->buf.length-1] != 0) {
668 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
672 tstate.msg_ctx = msg_ctx;
674 tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn);
675 tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id);
677 tstate.msg = (struct notify_trigger_msg *)rec->buf.data;
678 path = tstate.msg->path;
680 DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
681 __func__, (unsigned)tstate.msg->action,
682 (unsigned)tstate.msg->filter, path));
684 if (path[0] != '/') {
685 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
690 for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
691 ptrdiff_t path_len = p - path;
695 next_p = strchr(p+1, '/');
696 tstate.recursive = (next_p != NULL);
698 DEBUG(10, ("%s: Trying path %.*s\n", __func__,
699 (int)path_len, path));
701 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
704 dbwrap_parse_record(state->entries, key,
705 notifyd_trigger_parser, &tstate);
707 if (state->peers == NULL) {
711 if (rec->src.vnn != my_id.vnn) {
715 for (i=0; i<state->num_peers; i++) {
716 if (state->peers[i]->db == NULL) {
718 * Inactive peer, did not get a db yet
722 dbwrap_parse_record(state->peers[i]->db, key,
723 notifyd_trigger_parser, &tstate);
730 static void notifyd_send_delete(struct messaging_context *msg_ctx,
732 struct notifyd_instance *instance);
734 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
738 struct notifyd_trigger_state *tstate = private_data;
739 struct notify_event_msg msg = { .action = tstate->msg->action,
740 .when = tstate->msg->when };
742 size_t path_len = key.dsize;
743 struct notifyd_instance *instances = NULL;
744 size_t num_instances = 0;
747 if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
749 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
753 DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
754 (unsigned)num_instances, (int)key.dsize,
757 iov[0].iov_base = &msg;
758 iov[0].iov_len = offsetof(struct notify_event_msg, path);
759 iov[1].iov_base = tstate->msg->path + path_len + 1;
760 iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
762 for (i=0; i<num_instances; i++) {
763 struct notifyd_instance *instance = &instances[i];
764 struct server_id_buf idbuf;
768 if (tstate->covered_by_sys_notify) {
769 if (tstate->recursive) {
770 i_filter = instance->internal_subdir_filter;
772 i_filter = instance->internal_filter;
775 if (tstate->recursive) {
776 i_filter = instance->instance.subdir_filter;
778 i_filter = instance->instance.filter;
782 if ((i_filter & tstate->msg->filter) == 0) {
786 msg.private_data = instance->instance.private_data;
788 status = messaging_send_iov(
789 tstate->msg_ctx, instance->client,
790 MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
792 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
794 server_id_str_buf(instance->client, &idbuf),
797 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
798 procid_is_local(&instance->client)) {
800 * That process has died
802 notifyd_send_delete(tstate->msg_ctx, key, instance);
806 if (!NT_STATUS_IS_OK(status)) {
807 DEBUG(1, ("%s: messaging_send_iov returned %s\n",
808 __func__, nt_errstr(status)));
814 * Send a delete request to ourselves to properly discard a notify
815 * record for an smbd that has died.
818 static void notifyd_send_delete(struct messaging_context *msg_ctx,
820 struct notifyd_instance *instance)
822 struct notify_rec_change_msg msg = {
823 .instance.private_data = instance->instance.private_data
830 * Send a rec_change to ourselves to delete a dead entry
833 iov[0] = (struct iovec) {
835 .iov_len = offsetof(struct notify_rec_change_msg, path) };
836 iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
837 iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
839 ret = messaging_send_iov_from(
840 msg_ctx, instance->client, messaging_server_id(msg_ctx),
841 MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
844 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
845 __func__, strerror(ret)));
849 static bool notifyd_get_db(struct messaging_context *msg_ctx,
850 struct messaging_rec **prec,
853 struct notifyd_state *state = talloc_get_type_abort(
854 private_data, struct notifyd_state);
855 struct messaging_rec *rec = *prec;
856 struct server_id_buf id1, id2;
858 uint64_t rec_index = UINT64_MAX;
859 uint8_t index_buf[sizeof(uint64_t)];
864 dbsize = dbwrap_marshall(state->entries, NULL, 0);
866 buf = talloc_array(rec, uint8_t, dbsize);
868 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
869 __func__, (uintmax_t)dbsize));
873 dbsize = dbwrap_marshall(state->entries, buf, dbsize);
875 if (dbsize != talloc_get_size(buf)) {
876 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
877 (uintmax_t)talloc_get_size(buf),
883 if (state->log != NULL) {
884 rec_index = state->log->rec_index;
886 SBVAL(index_buf, 0, rec_index);
888 iov[0] = (struct iovec) { .iov_base = index_buf,
889 .iov_len = sizeof(index_buf) };
890 iov[1] = (struct iovec) { .iov_base = buf,
893 DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
894 (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
895 server_id_str_buf(messaging_server_id(msg_ctx), &id1),
896 server_id_str_buf(rec->src, &id2)));
898 status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
899 iov, ARRAY_SIZE(iov), NULL, 0);
901 if (!NT_STATUS_IS_OK(status)) {
902 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
903 __func__, nt_errstr(status)));
909 #ifdef CLUSTER_SUPPORT
911 static int notifyd_add_proxy_syswatches(struct db_record *rec,
914 static bool notifyd_got_db(struct messaging_context *msg_ctx,
915 struct messaging_rec **prec,
918 struct notifyd_state *state = talloc_get_type_abort(
919 private_data, struct notifyd_state);
920 struct messaging_rec *rec = *prec;
921 struct notifyd_peer *p = NULL;
922 struct server_id_buf idbuf;
927 for (i=0; i<state->num_peers; i++) {
928 if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
935 DEBUG(10, ("%s: Did not find peer for db from %s\n",
936 __func__, server_id_str_buf(rec->src, &idbuf)));
940 if (rec->buf.length < 8) {
941 DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
942 (unsigned)rec->buf.length,
943 server_id_str_buf(rec->src, &idbuf)));
948 p->rec_index = BVAL(rec->buf.data, 0);
950 p->db = db_open_rbt(p);
952 DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
957 status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
958 rec->buf.length - 8);
959 if (!NT_STATUS_IS_OK(status)) {
960 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
961 __func__, nt_errstr(status),
962 server_id_str_buf(rec->src, &idbuf)));
967 dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
970 DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
971 server_id_str_buf(rec->src, &idbuf), count));
976 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
977 struct server_id src,
978 struct messaging_reclog *log)
980 enum ndr_err_code ndr_err;
981 uint8_t msghdr[MESSAGE_HDR_LENGTH];
990 DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
991 (uintmax_t)log->rec_index, (unsigned)log->num_recs));
993 message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
994 (struct server_id) {0 });
995 iov[0] = (struct iovec) { .iov_base = msghdr,
996 .iov_len = sizeof(msghdr) };
998 ndr_err = ndr_push_struct_blob(
1000 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
1001 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1002 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
1003 __func__, ndr_errstr(ndr_err)));
1006 iov[1] = (struct iovec) { .iov_base = blob.data,
1007 .iov_len = blob.length };
1009 ret = ctdbd_messaging_send_iov(
1010 ctdbd_conn, CTDB_BROADCAST_VNNMAP,
1011 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
1012 TALLOC_FREE(blob.data);
1014 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
1015 __func__, strerror(ret)));
1019 log->rec_index += 1;
1023 TALLOC_FREE(log->recs);
1026 struct notifyd_broadcast_reclog_state {
1027 struct tevent_context *ev;
1028 struct ctdbd_connection *ctdbd_conn;
1029 struct server_id src;
1030 struct messaging_reclog *log;
1033 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1035 static struct tevent_req *notifyd_broadcast_reclog_send(
1036 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1037 struct ctdbd_connection *ctdbd_conn, struct server_id src,
1038 struct messaging_reclog *log)
1040 struct tevent_req *req, *subreq;
1041 struct notifyd_broadcast_reclog_state *state;
1043 req = tevent_req_create(mem_ctx, &state,
1044 struct notifyd_broadcast_reclog_state);
1049 state->ctdbd_conn = ctdbd_conn;
1053 subreq = tevent_wakeup_send(state, state->ev,
1054 timeval_current_ofs_msec(1000));
1055 if (tevent_req_nomem(subreq, req)) {
1056 return tevent_req_post(req, ev);
1058 tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1062 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1064 struct tevent_req *req = tevent_req_callback_data(
1065 subreq, struct tevent_req);
1066 struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1067 req, struct notifyd_broadcast_reclog_state);
1070 ok = tevent_wakeup_recv(subreq);
1071 TALLOC_FREE(subreq);
1073 tevent_req_oom(req);
1077 notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1079 subreq = tevent_wakeup_send(state, state->ev,
1080 timeval_current_ofs_msec(1000));
1081 if (tevent_req_nomem(subreq, req)) {
1084 tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1087 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1089 return tevent_req_simple_recv_unix(req);
1092 struct notifyd_clean_peers_state {
1093 struct tevent_context *ev;
1094 struct notifyd_state *notifyd;
1097 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1099 static struct tevent_req *notifyd_clean_peers_send(
1100 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1101 struct notifyd_state *notifyd)
1103 struct tevent_req *req, *subreq;
1104 struct notifyd_clean_peers_state *state;
1106 req = tevent_req_create(mem_ctx, &state,
1107 struct notifyd_clean_peers_state);
1112 state->notifyd = notifyd;
1114 subreq = tevent_wakeup_send(state, state->ev,
1115 timeval_current_ofs_msec(30000));
1116 if (tevent_req_nomem(subreq, req)) {
1117 return tevent_req_post(req, ev);
1119 tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1123 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1125 struct tevent_req *req = tevent_req_callback_data(
1126 subreq, struct tevent_req);
1127 struct notifyd_clean_peers_state *state = tevent_req_data(
1128 req, struct notifyd_clean_peers_state);
1129 struct notifyd_state *notifyd = state->notifyd;
1132 time_t now = time(NULL);
1134 ok = tevent_wakeup_recv(subreq);
1135 TALLOC_FREE(subreq);
1137 tevent_req_oom(req);
1142 while (i < notifyd->num_peers) {
1143 struct notifyd_peer *p = notifyd->peers[i];
1145 if ((now - p->last_broadcast) > 60) {
1146 struct server_id_buf idbuf;
1149 * Haven't heard for more than 60 seconds. Call this
1153 DEBUG(10, ("%s: peer %s died\n", __func__,
1154 server_id_str_buf(p->pid, &idbuf)));
1156 * This implicitly decrements notifyd->num_peers
1164 subreq = tevent_wakeup_send(state, state->ev,
1165 timeval_current_ofs_msec(30000));
1166 if (tevent_req_nomem(subreq, req)) {
1169 tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1172 static int notifyd_clean_peers_recv(struct tevent_req *req)
1174 return tevent_req_simple_recv_unix(req);
1177 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1180 struct notifyd_state *state = talloc_get_type_abort(
1181 private_data, struct notifyd_state);
1182 struct db_context *db = dbwrap_record_get_db(rec);
1183 TDB_DATA key = dbwrap_record_get_key(rec);
1184 TDB_DATA value = dbwrap_record_get_value(rec);
1185 struct notifyd_instance *instances = NULL;
1186 size_t num_instances = 0;
1188 char path[key.dsize+1];
1191 memcpy(path, key.dptr, key.dsize);
1192 path[key.dsize] = '\0';
1194 ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1197 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1202 for (i=0; i<num_instances; i++) {
1203 struct notifyd_instance *instance = &instances[i];
1204 uint32_t filter = instance->instance.filter;
1205 uint32_t subdir_filter = instance->instance.subdir_filter;
1209 * This is a remote database. Pointers that we were
1210 * given don't make sense locally. Initialize to NULL
1211 * in case sys_notify_watch fails.
1213 instances[i].sys_watch = NULL;
1215 ret = state->sys_notify_watch(
1216 db, state->sys_notify_ctx, path,
1217 &filter, &subdir_filter,
1218 notifyd_sys_callback, state->msg_ctx,
1219 &instance->sys_watch);
1221 DEBUG(1, ("%s: inotify_watch returned %s\n",
1222 __func__, strerror(errno)));
1229 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1231 TDB_DATA key = dbwrap_record_get_key(rec);
1232 TDB_DATA value = dbwrap_record_get_value(rec);
1233 struct notifyd_instance *instances = NULL;
1234 size_t num_instances = 0;
1238 ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1241 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1242 __func__, (int)key.dsize, (char *)key.dptr));
1245 for (i=0; i<num_instances; i++) {
1246 TALLOC_FREE(instances[i].sys_watch);
1251 static int notifyd_peer_destructor(struct notifyd_peer *p)
1253 struct notifyd_state *state = p->state;
1256 if (p->db != NULL) {
1257 dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1261 for (i = 0; i<state->num_peers; i++) {
1262 if (p == state->peers[i]) {
1263 state->peers[i] = state->peers[state->num_peers-1];
1264 state->num_peers -= 1;
1271 static struct notifyd_peer *notifyd_peer_new(
1272 struct notifyd_state *state, struct server_id pid)
1274 struct notifyd_peer *p, **tmp;
1276 tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1277 state->num_peers+1);
1283 p = talloc_zero(state->peers, struct notifyd_peer);
1290 state->peers[state->num_peers] = p;
1291 state->num_peers += 1;
1293 talloc_set_destructor(p, notifyd_peer_destructor);
1298 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1299 const uint8_t *msg, size_t msglen)
1301 struct notifyd_state *state = peer->state;
1302 DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1304 struct server_id_buf idbuf;
1305 struct messaging_reclog *log;
1306 enum ndr_err_code ndr_err;
1309 if (peer->db == NULL) {
1316 log = talloc(peer, struct messaging_reclog);
1318 DEBUG(10, ("%s: talloc failed\n", __func__));
1322 ndr_err = ndr_pull_struct_blob_all(
1324 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1325 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1326 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1327 __func__, ndr_errstr(ndr_err)));
1331 DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
1332 (unsigned)log->num_recs, (uintmax_t)log->rec_index,
1333 server_id_str_buf(peer->pid, &idbuf)));
1335 if (log->rec_index != peer->rec_index) {
1336 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1337 __func__, (uintmax_t)log->rec_index,
1338 server_id_str_buf(peer->pid, &idbuf),
1339 (uintmax_t)peer->rec_index));
1343 for (i=0; i<log->num_recs; i++) {
1344 struct messaging_rec *r = log->recs[i];
1345 struct notify_rec_change_msg *chg;
1349 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1352 DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1357 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1358 &chg->instance, peer->db,
1359 state->sys_notify_watch,
1360 state->sys_notify_ctx,
1363 DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1369 peer->rec_index += 1;
1370 peer->last_broadcast = time(NULL);
1376 DEBUG(10, ("%s: Dropping peer %s\n", __func__,
1377 server_id_str_buf(peer->pid, &idbuf)));
1382 * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1383 * messages) broadcasts by other notifyds. Several cases:
1385 * We don't know the source. This creates a new peer. Creating a peer
1386 * involves asking the peer for its full database. We assume ordered
1387 * messages, so the new database will arrive before the next broadcast
1390 * We know the source and the log index matches. We will apply the log
1391 * locally to our peer's db as if we had received it from a local
1394 * We know the source but the log index does not match. This means we
1395 * lost a message. We just drop the whole peer and wait for the next
1396 * broadcast, which will then trigger a fresh database pull.
1399 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
1401 const uint8_t *msg, size_t msglen,
1404 struct notifyd_state *state = talloc_get_type_abort(
1405 private_data, struct notifyd_state);
1406 struct server_id my_id = messaging_server_id(state->msg_ctx);
1407 struct notifyd_peer *p;
1410 struct server_id src, dst;
1411 struct server_id_buf idbuf;
1414 if (msglen < MESSAGE_HDR_LENGTH) {
1415 DEBUG(10, ("%s: Got short broadcast\n", __func__));
1418 message_hdr_get(&msg_type, &src, &dst, msg);
1420 if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1421 DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
1422 (unsigned)msg_type));
1425 if (server_id_equal(&src, &my_id)) {
1426 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
1430 DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1431 __func__, server_id_str_buf(src, &idbuf)));
1433 for (i=0; i<state->num_peers; i++) {
1434 if (server_id_equal(&state->peers[i]->pid, &src)) {
1436 DEBUG(10, ("%s: Applying changes to peer %u\n",
1437 __func__, (unsigned)i));
1439 notifyd_apply_reclog(state->peers[i],
1440 msg + MESSAGE_HDR_LENGTH,
1441 msglen - MESSAGE_HDR_LENGTH);
1446 DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
1447 server_id_str_buf(src, &idbuf)));
1449 p = notifyd_peer_new(state, src);
1451 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
1455 status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1457 if (!NT_STATUS_IS_OK(status)) {
1458 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1459 __func__, nt_errstr(status)));
1468 struct notifyd_parse_db_state {
1469 bool (*fn)(const char *path,
1470 struct server_id server,
1471 const struct notify_instance *instance,
1472 void *private_data);
1476 static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
1479 struct notifyd_parse_db_state *state = private_data;
1480 char path[key.dsize+1];
1481 struct notifyd_instance *instances = NULL;
1482 size_t num_instances = 0;
1486 memcpy(path, key.dptr, key.dsize);
1487 path[key.dsize] = 0;
1489 ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1492 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1497 for (i=0; i<num_instances; i++) {
1498 ok = state->fn(path, instances[i].client,
1499 &instances[i].instance,
1500 state->private_data);
1509 int notifyd_parse_db(const uint8_t *buf, size_t buflen,
1510 uint64_t *log_index,
1511 bool (*fn)(const char *path,
1512 struct server_id server,
1513 const struct notify_instance *instance,
1514 void *private_data),
1517 struct notifyd_parse_db_state state = {
1518 .fn = fn, .private_data = private_data
1525 *log_index = BVAL(buf, 0);
1530 status = dbwrap_parse_marshall_buf(
1531 buf, buflen, notifyd_parse_db_parser, &state);
1532 if (!NT_STATUS_IS_OK(status)) {
1533 return map_errno_from_nt_status(status);