notifyd: Broadcast to all connected nodes
[bbaumbach/samba-autobuild/.git] / source3 / smbd / notifyd / notifyd.c
1 /*
2  * Unix SMB/CIFS implementation.
3  *
4  * Copyright (C) Volker Lendecke 2014
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include <tevent.h>
22 #include "lib/util/server_id.h"
23 #include "lib/util/data_blob.h"
24 #include "librpc/gen_ndr/notify.h"
25 #include "librpc/gen_ndr/messaging.h"
26 #include "librpc/gen_ndr/server_id.h"
27 #include "lib/dbwrap/dbwrap.h"
28 #include "lib/dbwrap/dbwrap_rbt.h"
29 #include "messages.h"
30 #include "tdb.h"
31 #include "util_tdb.h"
32 #include "notifyd.h"
33 #include "lib/util/server_id_db.h"
34 #include "lib/util/tevent_unix.h"
35 #include "lib/util/tevent_ntstatus.h"
36 #include "ctdbd_conn.h"
37 #include "ctdb_srvids.h"
38 #include "server_id_db_util.h"
39 #include "lib/util/iov_buf.h"
40 #include "messages_util.h"
41
42 #ifdef CLUSTER_SUPPORT
43 #include "ctdb_protocol.h"
44 #endif
45
46 struct notifyd_peer;
47
48 /*
49  * All of notifyd's state
50  */
51
52 struct notifyd_state {
53         struct tevent_context *ev;
54         struct messaging_context *msg_ctx;
55         struct ctdbd_connection *ctdbd_conn;
56
57         /*
58          * Database of everything clients show interest in. Indexed by
59          * absolute path. The database keys are not 0-terminated
60          * to allow the criticial operation, notifyd_trigger, to walk
61          * the structure from the top without adding intermediate 0s.
62          * The database records contain an array of
63          *
64          * struct notifyd_instance
65          *
66          * to be maintained and parsed by notifyd_entry_parse()
67          */
68         struct db_context *entries;
69
70         /*
71          * In the cluster case, this is the place where we store a log
72          * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
73          * forward them to our peer notifyd's in the cluster once a
74          * second or when the log grows too large.
75          */
76
77         struct messaging_reclog *log;
78
79         /*
80          * Array of companion notifyd's in a cluster. Every notifyd
81          * broadcasts its messaging_reclog to every other notifyd in
82          * the cluster. This is done by making ctdb send a message to
83          * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
84          * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who
85          * had called register_with_ctdbd this srvid will receive the
86          * broadcasts.
87          *
88          * Database replication happens via these broadcasts. Also,
89          * they serve as liveness indication. If a notifyd receives a
90          * broadcast from an unknown peer, it will create one for this
91          * srvid. Also when we don't hear anything from a peer for a
92          * while, we will discard it.
93          */
94
95         struct notifyd_peer **peers;
96         size_t num_peers;
97
98         sys_notify_watch_fn sys_notify_watch;
99         struct sys_notify_context *sys_notify_ctx;
100 };
101
102 /*
103  * notifyd's representation of a notify instance
104  */
105 struct notifyd_instance {
106         struct server_id client;
107         struct notify_instance instance;
108
109         void *sys_watch; /* inotify/fam/etc handle */
110
111         /*
112          * Filters after sys_watch took responsibility of some bits
113          */
114         uint32_t internal_filter;
115         uint32_t internal_subdir_filter;
116 };
117
118 struct notifyd_peer {
119         struct notifyd_state *state;
120         struct server_id pid;
121         uint64_t rec_index;
122         struct db_context *db;
123         time_t last_broadcast;
124 };
125
126 static void notifyd_rec_change(struct messaging_context *msg_ctx,
127                                void *private_data, uint32_t msg_type,
128                                struct server_id src, DATA_BLOB *data);
129 static void notifyd_trigger(struct messaging_context *msg_ctx,
130                             void *private_data, uint32_t msg_type,
131                             struct server_id src, DATA_BLOB *data);
132 static void notifyd_get_db(struct messaging_context *msg_ctx,
133                            void *private_data, uint32_t msg_type,
134                            struct server_id src, DATA_BLOB *data);
135
136 #ifdef CLUSTER_SUPPORT
137 static void notifyd_got_db(struct messaging_context *msg_ctx,
138                            void *private_data, uint32_t msg_type,
139                            struct server_id src, DATA_BLOB *data);
140 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
141                                      struct server_id src,
142                                      struct messaging_reclog *log);
143 #endif
144 static void notifyd_sys_callback(struct sys_notify_context *ctx,
145                                  void *private_data, struct notify_event *ev,
146                                  uint32_t filter);
147
148 #ifdef CLUSTER_SUPPORT
149 static struct tevent_req *notifyd_broadcast_reclog_send(
150         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
151         struct ctdbd_connection *ctdbd_conn, struct server_id src,
152         struct messaging_reclog *log);
153 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
154
155 static struct tevent_req *notifyd_clean_peers_send(
156         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
157         struct notifyd_state *notifyd);
158 static int notifyd_clean_peers_recv(struct tevent_req *req);
159 #endif
160
161 static int sys_notify_watch_dummy(
162         TALLOC_CTX *mem_ctx,
163         struct sys_notify_context *ctx,
164         const char *path,
165         uint32_t *filter,
166         uint32_t *subdir_filter,
167         void (*callback)(struct sys_notify_context *ctx,
168                          void *private_data,
169                          struct notify_event *ev,
170                          uint32_t filter),
171         void *private_data,
172         void *handle_p)
173 {
174         void **handle = handle_p;
175         *handle = NULL;
176         return 0;
177 }
178
179 #ifdef CLUSTER_SUPPORT
180 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
181 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
182 static int notifyd_snoop_broadcast(struct tevent_context *ev,
183                                    uint32_t src_vnn, uint32_t dst_vnn,
184                                    uint64_t dst_srvid,
185                                    const uint8_t *msg, size_t msglen,
186                                    void *private_data);
187 #endif
188
189 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
190                                 struct messaging_context *msg_ctx,
191                                 struct ctdbd_connection *ctdbd_conn,
192                                 sys_notify_watch_fn sys_notify_watch,
193                                 struct sys_notify_context *sys_notify_ctx)
194 {
195         struct tevent_req *req;
196 #ifdef CLUSTER_SUPPORT
197         struct tevent_req *subreq;
198 #endif
199         struct notifyd_state *state;
200         struct server_id_db *names_db;
201         NTSTATUS status;
202         int ret;
203
204         req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
205         if (req == NULL) {
206                 return NULL;
207         }
208         state->ev = ev;
209         state->msg_ctx = msg_ctx;
210         state->ctdbd_conn = ctdbd_conn;
211
212         if (sys_notify_watch == NULL) {
213                 sys_notify_watch = sys_notify_watch_dummy;
214         }
215
216         state->sys_notify_watch = sys_notify_watch;
217         state->sys_notify_ctx = sys_notify_ctx;
218
219         state->entries = db_open_rbt(state);
220         if (tevent_req_nomem(state->entries, req)) {
221                 return tevent_req_post(req, ev);
222         }
223
224         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
225                                     notifyd_rec_change);
226         if (tevent_req_nterror(req, status)) {
227                 return tevent_req_post(req, ev);
228         }
229
230         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER,
231                                     notifyd_trigger);
232         if (tevent_req_nterror(req, status)) {
233                 goto deregister_rec_change;
234         }
235
236         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB,
237                                     notifyd_get_db);
238         if (tevent_req_nterror(req, status)) {
239                 goto deregister_trigger;
240         }
241
242         names_db = messaging_names_db(msg_ctx);
243
244         ret = server_id_db_set_exclusive(names_db, "notify-daemon");
245         if (ret != 0) {
246                 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
247                            __func__, strerror(ret)));
248                 tevent_req_error(req, ret);
249                 goto deregister_get_db;
250         }
251
252         if (ctdbd_conn == NULL) {
253                 /*
254                  * No cluster around, skip the database replication
255                  * engine
256                  */
257                 return req;
258         }
259
260 #ifdef CLUSTER_SUPPORT
261         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB,
262                                     notifyd_got_db);
263         if (tevent_req_nterror(req, status)) {
264                 goto deregister_get_db;
265         }
266
267         state->log = talloc_zero(state, struct messaging_reclog);
268         if (tevent_req_nomem(state->log, req)) {
269                 goto deregister_db;
270         }
271
272         subreq = notifyd_broadcast_reclog_send(
273                 state->log, ev, ctdbd_conn,
274                 messaging_server_id(msg_ctx),
275                 state->log);
276         if (tevent_req_nomem(subreq, req)) {
277                 goto deregister_db;
278         }
279         tevent_req_set_callback(subreq,
280                                 notifyd_broadcast_reclog_finished,
281                                 req);
282
283         subreq = notifyd_clean_peers_send(state, ev, state);
284         if (tevent_req_nomem(subreq, req)) {
285                 goto deregister_db;
286         }
287         tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
288                                 req);
289
290         ret = register_with_ctdbd(ctdbd_conn,
291                                   CTDB_SRVID_SAMBA_NOTIFY_PROXY,
292                                   notifyd_snoop_broadcast, state);
293         if (ret != 0) {
294                 tevent_req_error(req, ret);
295                 goto deregister_db;
296         }
297 #endif
298
299         return req;
300
301 #ifdef CLUSTER_SUPPORT
302 deregister_db:
303         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state);
304 #endif
305 deregister_get_db:
306         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state);
307 deregister_trigger:
308         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state);
309 deregister_rec_change:
310         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
311         return tevent_req_post(req, ev);
312 }
313
314 #ifdef CLUSTER_SUPPORT
315
316 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
317 {
318         struct tevent_req *req = tevent_req_callback_data(
319                 subreq, struct tevent_req);
320         int ret;
321
322         ret = notifyd_broadcast_reclog_recv(subreq);
323         TALLOC_FREE(subreq);
324         tevent_req_error(req, ret);
325 }
326
327 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
328 {
329         struct tevent_req *req = tevent_req_callback_data(
330                 subreq, struct tevent_req);
331         int ret;
332
333         ret = notifyd_clean_peers_recv(subreq);
334         TALLOC_FREE(subreq);
335         tevent_req_error(req, ret);
336 }
337
338 #endif
339
340 int notifyd_recv(struct tevent_req *req)
341 {
342         return tevent_req_simple_recv_unix(req);
343 }
344
345 /*
346  * Parse an entry in the notifyd_context->entries database
347  */
348
349 static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
350                                 struct notifyd_instance **instances,
351                                 size_t *num_instances)
352 {
353         if ((buflen % sizeof(struct notifyd_instance)) != 0) {
354                 DEBUG(1, ("%s: invalid buffer size: %u\n",
355                           __func__, (unsigned)buflen));
356                 return false;
357         }
358
359         if (instances != NULL) {
360                 *instances = (struct notifyd_instance *)buf;
361         }
362         if (num_instances != NULL) {
363                 *num_instances = buflen / sizeof(struct notifyd_instance);
364         }
365         return true;
366 }
367
368 static bool notifyd_apply_rec_change(
369         const struct server_id *client,
370         const char *path, size_t pathlen,
371         const struct notify_instance *chg,
372         struct db_context *entries,
373         sys_notify_watch_fn sys_notify_watch,
374         struct sys_notify_context *sys_notify_ctx,
375         struct messaging_context *msg_ctx)
376 {
377         struct db_record *rec;
378         struct notifyd_instance *instances;
379         size_t num_instances;
380         size_t i;
381         struct notifyd_instance *instance;
382         TDB_DATA value;
383         NTSTATUS status;
384         bool ok = false;
385
386         if (pathlen == 0) {
387                 DEBUG(1, ("%s: pathlen==0\n", __func__));
388                 return false;
389         }
390         if (path[pathlen-1] != '\0') {
391                 DEBUG(1, ("%s: path not 0-terminated\n", __func__));
392                 return false;
393         }
394
395         DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
396                    "private_data=%p\n", __func__, path,
397                    (unsigned)chg->filter, (unsigned)chg->subdir_filter,
398                    chg->private_data));
399
400         rec = dbwrap_fetch_locked(
401                 entries, entries,
402                 make_tdb_data((const uint8_t *)path, pathlen-1));
403
404         if (rec == NULL) {
405                 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
406                 goto fail;
407         }
408
409         num_instances = 0;
410         value = dbwrap_record_get_value(rec);
411
412         if (value.dsize != 0) {
413                 if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
414                                          &num_instances)) {
415                         goto fail;
416                 }
417         }
418
419         /*
420          * Overallocate by one instance to avoid a realloc when adding
421          */
422         instances = talloc_array(rec, struct notifyd_instance,
423                                  num_instances + 1);
424         if (instances == NULL) {
425                 DEBUG(1, ("%s: talloc failed\n", __func__));
426                 goto fail;
427         }
428
429         if (value.dsize != 0) {
430                 memcpy(instances, value.dptr, value.dsize);
431         }
432
433         for (i=0; i<num_instances; i++) {
434                 instance = &instances[i];
435
436                 if (server_id_equal(&instance->client, client) &&
437                     (instance->instance.private_data == chg->private_data)) {
438                         break;
439                 }
440         }
441
442         if (i < num_instances) {
443                 instance->instance = *chg;
444         } else {
445                 /*
446                  * We've overallocated for one instance
447                  */
448                 instance = &instances[num_instances];
449
450                 *instance = (struct notifyd_instance) {
451                         .client = *client,
452                         .instance = *chg,
453                         .internal_filter = chg->filter,
454                         .internal_subdir_filter = chg->subdir_filter
455                 };
456
457                 num_instances += 1;
458         }
459
460         if ((instance->instance.filter != 0) ||
461             (instance->instance.subdir_filter != 0)) {
462                 int ret;
463
464                 TALLOC_FREE(instance->sys_watch);
465
466                 ret = sys_notify_watch(entries, sys_notify_ctx, path,
467                                        &instance->internal_filter,
468                                        &instance->internal_subdir_filter,
469                                        notifyd_sys_callback, msg_ctx,
470                                        &instance->sys_watch);
471                 if (ret != 0) {
472                         DEBUG(1, ("%s: inotify_watch returned %s\n",
473                                   __func__, strerror(errno)));
474                 }
475         }
476
477         if ((instance->instance.filter == 0) &&
478             (instance->instance.subdir_filter == 0)) {
479                 /* This is a delete request */
480                 TALLOC_FREE(instance->sys_watch);
481                 *instance = instances[num_instances-1];
482                 num_instances -= 1;
483         }
484
485         DEBUG(10, ("%s: %s has %u instances\n", __func__,
486                    path, (unsigned)num_instances));
487
488         if (num_instances == 0) {
489                 status = dbwrap_record_delete(rec);
490                 if (!NT_STATUS_IS_OK(status)) {
491                         DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
492                                   __func__, nt_errstr(status)));
493                         goto fail;
494                 }
495         } else {
496                 value = make_tdb_data(
497                         (uint8_t *)instances,
498                         sizeof(struct notifyd_instance) * num_instances);
499
500                 status = dbwrap_record_store(rec, value, 0);
501                 if (!NT_STATUS_IS_OK(status)) {
502                         DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
503                                   __func__, nt_errstr(status)));
504                         goto fail;
505                 }
506         }
507
508         ok = true;
509 fail:
510         TALLOC_FREE(rec);
511         return ok;
512 }
513
514 static void notifyd_sys_callback(struct sys_notify_context *ctx,
515                                  void *private_data, struct notify_event *ev,
516                                  uint32_t filter)
517 {
518         struct messaging_context *msg_ctx = talloc_get_type_abort(
519                 private_data, struct messaging_context);
520         struct notify_trigger_msg msg;
521         struct iovec iov[4];
522         char slash = '/';
523
524         msg = (struct notify_trigger_msg) {
525                 .when = timespec_current(),
526                 .action = ev->action,
527                 .filter = filter,
528         };
529
530         iov[0].iov_base = &msg;
531         iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
532         iov[1].iov_base = discard_const_p(char, ev->dir);
533         iov[1].iov_len = strlen(ev->dir);
534         iov[2].iov_base = &slash;
535         iov[2].iov_len = 1;
536         iov[3].iov_base = discard_const_p(char, ev->path);
537         iov[3].iov_len = strlen(ev->path)+1;
538
539         messaging_send_iov(
540                 msg_ctx, messaging_server_id(msg_ctx),
541                 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
542 }
543
544 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
545                                      struct notify_rec_change_msg **pmsg,
546                                      size_t *pathlen)
547 {
548         struct notify_rec_change_msg *msg;
549
550         if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
551                 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
552                           (unsigned)bufsize));
553                 return false;
554         }
555
556         *pmsg = msg = (struct notify_rec_change_msg *)buf;
557         *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
558
559         DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
560                    "private_data=%p, path=%.*s\n",
561                    __func__, (unsigned)msg->instance.filter,
562                    (unsigned)msg->instance.subdir_filter,
563                    msg->instance.private_data, (int)(*pathlen), msg->path));
564
565         return true;
566 }
567
568 static void notifyd_rec_change(struct messaging_context *msg_ctx,
569                                void *private_data, uint32_t msg_type,
570                                struct server_id src, DATA_BLOB *data)
571 {
572         struct notifyd_state *state = talloc_get_type_abort(
573                 private_data, struct notifyd_state);
574         struct server_id_buf idbuf;
575         struct notify_rec_change_msg *msg;
576         size_t pathlen;
577         bool ok;
578
579         DBG_DEBUG("Got %zu bytes from %s\n", data->length,
580                   server_id_str_buf(src, &idbuf));
581
582         ok = notifyd_parse_rec_change(data->data, data->length,
583                                       &msg, &pathlen);
584         if (!ok) {
585                 return;
586         }
587
588         ok = notifyd_apply_rec_change(
589                 &src, msg->path, pathlen, &msg->instance,
590                 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
591                 state->msg_ctx);
592         if (!ok) {
593                 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
594                           __func__));
595                 return;
596         }
597
598         if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
599                 return;
600         }
601
602 #ifdef CLUSTER_SUPPORT
603         {
604
605         struct messaging_rec **tmp;
606         struct messaging_reclog *log;
607         struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
608
609         log = state->log;
610
611         tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
612                              log->num_recs+1);
613         if (tmp == NULL) {
614                 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
615                 return;
616         }
617         log->recs = tmp;
618
619         log->recs[log->num_recs] = messaging_rec_create(
620                 log->recs, src, messaging_server_id(msg_ctx),
621                 msg_type, &iov, 1, NULL, 0);
622
623         if (log->recs[log->num_recs] == NULL) {
624                 DBG_WARNING("messaging_rec_create failed, ignoring\n");
625                 return;
626         }
627
628         log->num_recs += 1;
629
630         if (log->num_recs >= 100) {
631                 /*
632                  * Don't let the log grow too large
633                  */
634                 notifyd_broadcast_reclog(state->ctdbd_conn,
635                                          messaging_server_id(msg_ctx), log);
636         }
637
638         }
639 #endif
640 }
641
642 struct notifyd_trigger_state {
643         struct messaging_context *msg_ctx;
644         struct notify_trigger_msg *msg;
645         bool recursive;
646         bool covered_by_sys_notify;
647 };
648
649 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
650                                    void *private_data);
651
652 static void notifyd_trigger(struct messaging_context *msg_ctx,
653                             void *private_data, uint32_t msg_type,
654                             struct server_id src, DATA_BLOB *data)
655 {
656         struct notifyd_state *state = talloc_get_type_abort(
657                 private_data, struct notifyd_state);
658         struct server_id my_id = messaging_server_id(msg_ctx);
659         struct notifyd_trigger_state tstate;
660         const char *path;
661         const char *p, *next_p;
662
663         if (data->length < offsetof(struct notify_trigger_msg, path) + 1) {
664                 DBG_WARNING("message too short, ignoring: %zu\n",
665                             data->length);
666                 return;
667         }
668         if (data->data[data->length-1] != 0) {
669                 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
670                 return;
671         }
672
673         tstate.msg_ctx = msg_ctx;
674
675         tstate.covered_by_sys_notify = (src.vnn == my_id.vnn);
676         tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id);
677
678         tstate.msg = (struct notify_trigger_msg *)data->data;
679         path = tstate.msg->path;
680
681         DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
682                    __func__, (unsigned)tstate.msg->action,
683                    (unsigned)tstate.msg->filter, path));
684
685         if (path[0] != '/') {
686                 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
687                           __func__, path));
688                 return;
689         }
690
691         for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
692                 ptrdiff_t path_len = p - path;
693                 TDB_DATA key;
694                 uint32_t i;
695
696                 next_p = strchr(p+1, '/');
697                 tstate.recursive = (next_p != NULL);
698
699                 DEBUG(10, ("%s: Trying path %.*s\n", __func__,
700                            (int)path_len, path));
701
702                 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
703                                    .dsize = path_len };
704
705                 dbwrap_parse_record(state->entries, key,
706                                     notifyd_trigger_parser, &tstate);
707
708                 if (state->peers == NULL) {
709                         continue;
710                 }
711
712                 if (src.vnn != my_id.vnn) {
713                         continue;
714                 }
715
716                 for (i=0; i<state->num_peers; i++) {
717                         if (state->peers[i]->db == NULL) {
718                                 /*
719                                  * Inactive peer, did not get a db yet
720                                  */
721                                 continue;
722                         }
723                         dbwrap_parse_record(state->peers[i]->db, key,
724                                             notifyd_trigger_parser, &tstate);
725                 }
726         }
727 }
728
729 static void notifyd_send_delete(struct messaging_context *msg_ctx,
730                                 TDB_DATA key,
731                                 struct notifyd_instance *instance);
732
733 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
734                                    void *private_data)
735
736 {
737         struct notifyd_trigger_state *tstate = private_data;
738         struct notify_event_msg msg = { .action = tstate->msg->action,
739                                         .when = tstate->msg->when };
740         struct iovec iov[2];
741         size_t path_len = key.dsize;
742         struct notifyd_instance *instances = NULL;
743         size_t num_instances = 0;
744         size_t i;
745
746         if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
747                                  &num_instances)) {
748                 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
749                 return;
750         }
751
752         DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
753                    (unsigned)num_instances, (int)key.dsize,
754                    (char *)key.dptr));
755
756         iov[0].iov_base = &msg;
757         iov[0].iov_len = offsetof(struct notify_event_msg, path);
758         iov[1].iov_base = tstate->msg->path + path_len + 1;
759         iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
760
761         for (i=0; i<num_instances; i++) {
762                 struct notifyd_instance *instance = &instances[i];
763                 struct server_id_buf idbuf;
764                 uint32_t i_filter;
765                 NTSTATUS status;
766
767                 if (tstate->covered_by_sys_notify) {
768                         if (tstate->recursive) {
769                                 i_filter = instance->internal_subdir_filter;
770                         } else {
771                                 i_filter = instance->internal_filter;
772                         }
773                 } else {
774                         if (tstate->recursive) {
775                                 i_filter = instance->instance.subdir_filter;
776                         } else {
777                                 i_filter = instance->instance.filter;
778                         }
779                 }
780
781                 if ((i_filter & tstate->msg->filter) == 0) {
782                         continue;
783                 }
784
785                 msg.private_data = instance->instance.private_data;
786
787                 status = messaging_send_iov(
788                         tstate->msg_ctx, instance->client,
789                         MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
790
791                 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
792                            __func__,
793                            server_id_str_buf(instance->client, &idbuf),
794                            nt_errstr(status)));
795
796                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
797                     procid_is_local(&instance->client)) {
798                         /*
799                          * That process has died
800                          */
801                         notifyd_send_delete(tstate->msg_ctx, key, instance);
802                         continue;
803                 }
804
805                 if (!NT_STATUS_IS_OK(status)) {
806                         DEBUG(1, ("%s: messaging_send_iov returned %s\n",
807                                   __func__, nt_errstr(status)));
808                 }
809         }
810 }
811
812 /*
813  * Send a delete request to ourselves to properly discard a notify
814  * record for an smbd that has died.
815  */
816
817 static void notifyd_send_delete(struct messaging_context *msg_ctx,
818                                 TDB_DATA key,
819                                 struct notifyd_instance *instance)
820 {
821         struct notify_rec_change_msg msg = {
822                 .instance.private_data = instance->instance.private_data
823         };
824         uint8_t nul = 0;
825         struct iovec iov[3];
826         int ret;
827
828         /*
829          * Send a rec_change to ourselves to delete a dead entry
830          */
831
832         iov[0] = (struct iovec) {
833                 .iov_base = &msg,
834                 .iov_len = offsetof(struct notify_rec_change_msg, path) };
835         iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
836         iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
837
838         ret = messaging_send_iov_from(
839                 msg_ctx, instance->client, messaging_server_id(msg_ctx),
840                 MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
841
842         if (ret != 0) {
843                 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
844                            __func__, strerror(ret)));
845         }
846 }
847
848 static void notifyd_get_db(struct messaging_context *msg_ctx,
849                            void *private_data, uint32_t msg_type,
850                            struct server_id src, DATA_BLOB *data)
851 {
852         struct notifyd_state *state = talloc_get_type_abort(
853                 private_data, struct notifyd_state);
854         struct server_id_buf id1, id2;
855         NTSTATUS status;
856         uint64_t rec_index = UINT64_MAX;
857         uint8_t index_buf[sizeof(uint64_t)];
858         size_t dbsize;
859         uint8_t *buf;
860         struct iovec iov[2];
861
862         dbsize = dbwrap_marshall(state->entries, NULL, 0);
863
864         buf = talloc_array(talloc_tos(), uint8_t, dbsize);
865         if (buf == NULL) {
866                 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
867                           __func__, (uintmax_t)dbsize));
868                 return;
869         }
870
871         dbsize = dbwrap_marshall(state->entries, buf, dbsize);
872
873         if (dbsize != talloc_get_size(buf)) {
874                 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
875                           (uintmax_t)talloc_get_size(buf),
876                           (uintmax_t)dbsize));
877                 TALLOC_FREE(buf);
878                 return;
879         }
880
881         if (state->log != NULL) {
882                 rec_index = state->log->rec_index;
883         }
884         SBVAL(index_buf, 0, rec_index);
885
886         iov[0] = (struct iovec) { .iov_base = index_buf,
887                                   .iov_len = sizeof(index_buf) };
888         iov[1] = (struct iovec) { .iov_base = buf,
889                                   .iov_len = dbsize };
890
891         DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
892                    (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
893                    server_id_str_buf(messaging_server_id(msg_ctx), &id1),
894                    server_id_str_buf(src, &id2)));
895
896         status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB,
897                                     iov, ARRAY_SIZE(iov), NULL, 0);
898         TALLOC_FREE(buf);
899         if (!NT_STATUS_IS_OK(status)) {
900                 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
901                           __func__, nt_errstr(status)));
902         }
903 }
904
905 #ifdef CLUSTER_SUPPORT
906
907 static int notifyd_add_proxy_syswatches(struct db_record *rec,
908                                         void *private_data);
909
910 static void notifyd_got_db(struct messaging_context *msg_ctx,
911                            void *private_data, uint32_t msg_type,
912                            struct server_id src, DATA_BLOB *data)
913 {
914         struct notifyd_state *state = talloc_get_type_abort(
915                 private_data, struct notifyd_state);
916         struct notifyd_peer *p = NULL;
917         struct server_id_buf idbuf;
918         NTSTATUS status;
919         int count;
920         size_t i;
921
922         for (i=0; i<state->num_peers; i++) {
923                 if (server_id_equal(&src, &state->peers[i]->pid)) {
924                         p = state->peers[i];
925                         break;
926                 }
927         }
928
929         if (p == NULL) {
930                 DBG_DEBUG("Did not find peer for db from %s\n",
931                           server_id_str_buf(src, &idbuf));
932                 return;
933         }
934
935         if (data->length < 8) {
936                 DBG_DEBUG("Got short db length %zu from %s\n", data->length,
937                            server_id_str_buf(src, &idbuf));
938                 TALLOC_FREE(p);
939                 return;
940         }
941
942         p->rec_index = BVAL(data->data, 0);
943
944         p->db = db_open_rbt(p);
945         if (p->db == NULL) {
946                 DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
947                 TALLOC_FREE(p);
948                 return;
949         }
950
951         status = dbwrap_unmarshall(p->db, data->data + 8,
952                                    data->length - 8);
953         if (!NT_STATUS_IS_OK(status)) {
954                 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
955                            __func__, nt_errstr(status),
956                            server_id_str_buf(src, &idbuf)));
957                 TALLOC_FREE(p);
958                 return;
959         }
960
961         dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
962                              &count);
963
964         DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
965                    server_id_str_buf(src, &idbuf), count));
966 }
967
968 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
969                                      struct server_id src,
970                                      struct messaging_reclog *log)
971 {
972         enum ndr_err_code ndr_err;
973         uint8_t msghdr[MESSAGE_HDR_LENGTH];
974         DATA_BLOB blob;
975         struct iovec iov[2];
976         int ret;
977
978         if (log == NULL) {
979                 return;
980         }
981
982         DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
983                    (uintmax_t)log->rec_index, (unsigned)log->num_recs));
984
985         message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
986                         (struct server_id) {0 });
987         iov[0] = (struct iovec) { .iov_base = msghdr,
988                                   .iov_len = sizeof(msghdr) };
989
990         ndr_err = ndr_push_struct_blob(
991                 &blob, log, log,
992                 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
993         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
994                 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
995                           __func__, ndr_errstr(ndr_err)));
996                 goto done;
997         }
998         iov[1] = (struct iovec) { .iov_base = blob.data,
999                                   .iov_len = blob.length };
1000
1001         ret = ctdbd_messaging_send_iov(
1002                 ctdbd_conn, CTDB_BROADCAST_CONNECTED,
1003                 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
1004         TALLOC_FREE(blob.data);
1005         if (ret != 0) {
1006                 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
1007                           __func__, strerror(ret)));
1008                 goto done;
1009         }
1010
1011         log->rec_index += 1;
1012
1013 done:
1014         log->num_recs = 0;
1015         TALLOC_FREE(log->recs);
1016 }
1017
1018 struct notifyd_broadcast_reclog_state {
1019         struct tevent_context *ev;
1020         struct ctdbd_connection *ctdbd_conn;
1021         struct server_id src;
1022         struct messaging_reclog *log;
1023 };
1024
1025 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1026
1027 static struct tevent_req *notifyd_broadcast_reclog_send(
1028         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1029         struct ctdbd_connection *ctdbd_conn, struct server_id src,
1030         struct messaging_reclog *log)
1031 {
1032         struct tevent_req *req, *subreq;
1033         struct notifyd_broadcast_reclog_state *state;
1034
1035         req = tevent_req_create(mem_ctx, &state,
1036                                 struct notifyd_broadcast_reclog_state);
1037         if (req == NULL) {
1038                 return NULL;
1039         }
1040         state->ev = ev;
1041         state->ctdbd_conn = ctdbd_conn;
1042         state->src = src;
1043         state->log = log;
1044
1045         subreq = tevent_wakeup_send(state, state->ev,
1046                                     timeval_current_ofs_msec(1000));
1047         if (tevent_req_nomem(subreq, req)) {
1048                 return tevent_req_post(req, ev);
1049         }
1050         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1051         return req;
1052 }
1053
1054 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1055 {
1056         struct tevent_req *req = tevent_req_callback_data(
1057                 subreq, struct tevent_req);
1058         struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1059                 req, struct notifyd_broadcast_reclog_state);
1060         bool ok;
1061
1062         ok = tevent_wakeup_recv(subreq);
1063         TALLOC_FREE(subreq);
1064         if (!ok) {
1065                 tevent_req_oom(req);
1066                 return;
1067         }
1068
1069         notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1070
1071         subreq = tevent_wakeup_send(state, state->ev,
1072                                     timeval_current_ofs_msec(1000));
1073         if (tevent_req_nomem(subreq, req)) {
1074                 return;
1075         }
1076         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1077 }
1078
1079 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1080 {
1081         return tevent_req_simple_recv_unix(req);
1082 }
1083
1084 struct notifyd_clean_peers_state {
1085         struct tevent_context *ev;
1086         struct notifyd_state *notifyd;
1087 };
1088
1089 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1090
1091 static struct tevent_req *notifyd_clean_peers_send(
1092         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1093         struct notifyd_state *notifyd)
1094 {
1095         struct tevent_req *req, *subreq;
1096         struct notifyd_clean_peers_state *state;
1097
1098         req = tevent_req_create(mem_ctx, &state,
1099                                 struct notifyd_clean_peers_state);
1100         if (req == NULL) {
1101                 return NULL;
1102         }
1103         state->ev = ev;
1104         state->notifyd = notifyd;
1105
1106         subreq = tevent_wakeup_send(state, state->ev,
1107                                     timeval_current_ofs_msec(30000));
1108         if (tevent_req_nomem(subreq, req)) {
1109                 return tevent_req_post(req, ev);
1110         }
1111         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1112         return req;
1113 }
1114
1115 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1116 {
1117         struct tevent_req *req = tevent_req_callback_data(
1118                 subreq, struct tevent_req);
1119         struct notifyd_clean_peers_state *state = tevent_req_data(
1120                 req, struct notifyd_clean_peers_state);
1121         struct notifyd_state *notifyd = state->notifyd;
1122         size_t i;
1123         bool ok;
1124         time_t now = time(NULL);
1125
1126         ok = tevent_wakeup_recv(subreq);
1127         TALLOC_FREE(subreq);
1128         if (!ok) {
1129                 tevent_req_oom(req);
1130                 return;
1131         }
1132
1133         i = 0;
1134         while (i < notifyd->num_peers) {
1135                 struct notifyd_peer *p = notifyd->peers[i];
1136
1137                 if ((now - p->last_broadcast) > 60) {
1138                         struct server_id_buf idbuf;
1139
1140                         /*
1141                          * Haven't heard for more than 60 seconds. Call this
1142                          * peer dead
1143                          */
1144
1145                         DEBUG(10, ("%s: peer %s died\n", __func__,
1146                                    server_id_str_buf(p->pid, &idbuf)));
1147                         /*
1148                          * This implicitly decrements notifyd->num_peers
1149                          */
1150                         TALLOC_FREE(p);
1151                 } else {
1152                         i += 1;
1153                 }
1154         }
1155
1156         subreq = tevent_wakeup_send(state, state->ev,
1157                                     timeval_current_ofs_msec(30000));
1158         if (tevent_req_nomem(subreq, req)) {
1159                 return;
1160         }
1161         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1162 }
1163
1164 static int notifyd_clean_peers_recv(struct tevent_req *req)
1165 {
1166         return tevent_req_simple_recv_unix(req);
1167 }
1168
1169 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1170                                         void *private_data)
1171 {
1172         struct notifyd_state *state = talloc_get_type_abort(
1173                 private_data, struct notifyd_state);
1174         struct db_context *db = dbwrap_record_get_db(rec);
1175         TDB_DATA key = dbwrap_record_get_key(rec);
1176         TDB_DATA value = dbwrap_record_get_value(rec);
1177         struct notifyd_instance *instances = NULL;
1178         size_t num_instances = 0;
1179         size_t i;
1180         char path[key.dsize+1];
1181         bool ok;
1182
1183         memcpy(path, key.dptr, key.dsize);
1184         path[key.dsize] = '\0';
1185
1186         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1187                                  &num_instances);
1188         if (!ok) {
1189                 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1190                           __func__, path));
1191                 return 0;
1192         }
1193
1194         for (i=0; i<num_instances; i++) {
1195                 struct notifyd_instance *instance = &instances[i];
1196                 uint32_t filter = instance->instance.filter;
1197                 uint32_t subdir_filter = instance->instance.subdir_filter;
1198                 int ret;
1199
1200                 /*
1201                  * This is a remote database. Pointers that we were
1202                  * given don't make sense locally. Initialize to NULL
1203                  * in case sys_notify_watch fails.
1204                  */
1205                 instances[i].sys_watch = NULL;
1206
1207                 ret = state->sys_notify_watch(
1208                         db, state->sys_notify_ctx, path,
1209                         &filter, &subdir_filter,
1210                         notifyd_sys_callback, state->msg_ctx,
1211                         &instance->sys_watch);
1212                 if (ret != 0) {
1213                         DEBUG(1, ("%s: inotify_watch returned %s\n",
1214                                   __func__, strerror(errno)));
1215                 }
1216         }
1217
1218         return 0;
1219 }
1220
1221 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1222 {
1223         TDB_DATA key = dbwrap_record_get_key(rec);
1224         TDB_DATA value = dbwrap_record_get_value(rec);
1225         struct notifyd_instance *instances = NULL;
1226         size_t num_instances = 0;
1227         size_t i;
1228         bool ok;
1229
1230         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1231                                  &num_instances);
1232         if (!ok) {
1233                 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1234                           __func__, (int)key.dsize, (char *)key.dptr));
1235                 return 0;
1236         }
1237         for (i=0; i<num_instances; i++) {
1238                 TALLOC_FREE(instances[i].sys_watch);
1239         }
1240         return 0;
1241 }
1242
1243 static int notifyd_peer_destructor(struct notifyd_peer *p)
1244 {
1245         struct notifyd_state *state = p->state;
1246         size_t i;
1247
1248         if (p->db != NULL) {
1249                 dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1250                                      NULL, NULL);
1251         }
1252
1253         for (i = 0; i<state->num_peers; i++) {
1254                 if (p == state->peers[i]) {
1255                         state->peers[i] = state->peers[state->num_peers-1];
1256                         state->num_peers -= 1;
1257                         break;
1258                 }
1259         }
1260         return 0;
1261 }
1262
1263 static struct notifyd_peer *notifyd_peer_new(
1264         struct notifyd_state *state, struct server_id pid)
1265 {
1266         struct notifyd_peer *p, **tmp;
1267
1268         tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1269                              state->num_peers+1);
1270         if (tmp == NULL) {
1271                 return NULL;
1272         }
1273         state->peers = tmp;
1274
1275         p = talloc_zero(state->peers, struct notifyd_peer);
1276         if (p == NULL) {
1277                 return NULL;
1278         }
1279         p->state = state;
1280         p->pid = pid;
1281
1282         state->peers[state->num_peers] = p;
1283         state->num_peers += 1;
1284
1285         talloc_set_destructor(p, notifyd_peer_destructor);
1286
1287         return p;
1288 }
1289
1290 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1291                                  const uint8_t *msg, size_t msglen)
1292 {
1293         struct notifyd_state *state = peer->state;
1294         DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1295                            .length = msglen };
1296         struct server_id_buf idbuf;
1297         struct messaging_reclog *log;
1298         enum ndr_err_code ndr_err;
1299         uint32_t i;
1300
1301         if (peer->db == NULL) {
1302                 /*
1303                  * No db yet
1304                  */
1305                 return;
1306         }
1307
1308         log = talloc(peer, struct messaging_reclog);
1309         if (log == NULL) {
1310                 DEBUG(10, ("%s: talloc failed\n", __func__));
1311                 return;
1312         }
1313
1314         ndr_err = ndr_pull_struct_blob_all(
1315                 &blob, log, log,
1316                 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1317         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1318                 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1319                            __func__, ndr_errstr(ndr_err)));
1320                 goto fail;
1321         }
1322
1323         DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
1324                    (unsigned)log->num_recs, (uintmax_t)log->rec_index,
1325                    server_id_str_buf(peer->pid, &idbuf)));
1326
1327         if (log->rec_index != peer->rec_index) {
1328                 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1329                           __func__, (uintmax_t)log->rec_index,
1330                           server_id_str_buf(peer->pid, &idbuf),
1331                           (uintmax_t)peer->rec_index));
1332                 goto fail;
1333         }
1334
1335         for (i=0; i<log->num_recs; i++) {
1336                 struct messaging_rec *r = log->recs[i];
1337                 struct notify_rec_change_msg *chg;
1338                 size_t pathlen;
1339                 bool ok;
1340
1341                 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1342                                               &chg, &pathlen);
1343                 if (!ok) {
1344                         DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1345                                   __func__));
1346                         goto fail;
1347                 }
1348
1349                 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1350                                               &chg->instance, peer->db,
1351                                               state->sys_notify_watch,
1352                                               state->sys_notify_ctx,
1353                                               state->msg_ctx);
1354                 if (!ok) {
1355                         DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1356                                   __func__));
1357                         goto fail;
1358                 }
1359         }
1360
1361         peer->rec_index += 1;
1362         peer->last_broadcast = time(NULL);
1363
1364         TALLOC_FREE(log);
1365         return;
1366
1367 fail:
1368         DEBUG(10, ("%s: Dropping peer %s\n", __func__,
1369                    server_id_str_buf(peer->pid, &idbuf)));
1370         TALLOC_FREE(peer);
1371 }
1372
1373 /*
1374  * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1375  * messages) broadcasts by other notifyds. Several cases:
1376  *
1377  * We don't know the source. This creates a new peer. Creating a peer
1378  * involves asking the peer for its full database. We assume ordered
1379  * messages, so the new database will arrive before the next broadcast
1380  * will.
1381  *
1382  * We know the source and the log index matches. We will apply the log
1383  * locally to our peer's db as if we had received it from a local
1384  * client.
1385  *
1386  * We know the source but the log index does not match. This means we
1387  * lost a message. We just drop the whole peer and wait for the next
1388  * broadcast, which will then trigger a fresh database pull.
1389  */
1390
1391 static int notifyd_snoop_broadcast(struct tevent_context *ev,
1392                                    uint32_t src_vnn, uint32_t dst_vnn,
1393                                    uint64_t dst_srvid,
1394                                    const uint8_t *msg, size_t msglen,
1395                                    void *private_data)
1396 {
1397         struct notifyd_state *state = talloc_get_type_abort(
1398                 private_data, struct notifyd_state);
1399         struct server_id my_id = messaging_server_id(state->msg_ctx);
1400         struct notifyd_peer *p;
1401         uint32_t i;
1402         uint32_t msg_type;
1403         struct server_id src, dst;
1404         struct server_id_buf idbuf;
1405         NTSTATUS status;
1406
1407         if (msglen < MESSAGE_HDR_LENGTH) {
1408                 DEBUG(10, ("%s: Got short broadcast\n", __func__));
1409                 return 0;
1410         }
1411         message_hdr_get(&msg_type, &src, &dst, msg);
1412
1413         if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1414                 DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
1415                            (unsigned)msg_type));
1416                 return 0;
1417         }
1418         if (server_id_equal(&src, &my_id)) {
1419                 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
1420                 return 0;
1421         }
1422
1423         DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1424                    __func__, server_id_str_buf(src, &idbuf)));
1425
1426         for (i=0; i<state->num_peers; i++) {
1427                 if (server_id_equal(&state->peers[i]->pid, &src)) {
1428
1429                         DEBUG(10, ("%s: Applying changes to peer %u\n",
1430                                    __func__, (unsigned)i));
1431
1432                         notifyd_apply_reclog(state->peers[i],
1433                                              msg + MESSAGE_HDR_LENGTH,
1434                                              msglen - MESSAGE_HDR_LENGTH);
1435                         return 0;
1436                 }
1437         }
1438
1439         DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
1440                    server_id_str_buf(src, &idbuf)));
1441
1442         p = notifyd_peer_new(state, src);
1443         if (p == NULL) {
1444                 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
1445                 return 0;
1446         }
1447
1448         status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1449                                     NULL, 0);
1450         if (!NT_STATUS_IS_OK(status)) {
1451                 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1452                            __func__, nt_errstr(status)));
1453                 TALLOC_FREE(p);
1454                 return 0;
1455         }
1456
1457         return 0;
1458 }
1459 #endif
1460
1461 struct notifyd_parse_db_state {
1462         bool (*fn)(const char *path,
1463                    struct server_id server,
1464                    const struct notify_instance *instance,
1465                    void *private_data);
1466         void *private_data;
1467 };
1468
1469 static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
1470                                     void *private_data)
1471 {
1472         struct notifyd_parse_db_state *state = private_data;
1473         char path[key.dsize+1];
1474         struct notifyd_instance *instances = NULL;
1475         size_t num_instances = 0;
1476         size_t i;
1477         bool ok;
1478
1479         memcpy(path, key.dptr, key.dsize);
1480         path[key.dsize] = 0;
1481
1482         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1483                                  &num_instances);
1484         if (!ok) {
1485                 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1486                            __func__, path));
1487                 return true;
1488         }
1489
1490         for (i=0; i<num_instances; i++) {
1491                 ok = state->fn(path, instances[i].client,
1492                                &instances[i].instance,
1493                                state->private_data);
1494                 if (!ok) {
1495                         return false;
1496                 }
1497         }
1498
1499         return true;
1500 }
1501
1502 int notifyd_parse_db(const uint8_t *buf, size_t buflen,
1503                      uint64_t *log_index,
1504                      bool (*fn)(const char *path,
1505                                 struct server_id server,
1506                                 const struct notify_instance *instance,
1507                                 void *private_data),
1508                      void *private_data)
1509 {
1510         struct notifyd_parse_db_state state = {
1511                 .fn = fn, .private_data = private_data
1512         };
1513         NTSTATUS status;
1514
1515         if (buflen < 8) {
1516                 return EINVAL;
1517         }
1518         *log_index = BVAL(buf, 0);
1519
1520         buf += 8;
1521         buflen -= 8;
1522
1523         status = dbwrap_parse_marshall_buf(
1524                 buf, buflen, notifyd_parse_db_parser, &state);
1525         if (!NT_STATUS_IS_OK(status)) {
1526                 return map_errno_from_nt_status(status);
1527         }
1528
1529         return 0;
1530 }