notifyd: Use messaging_register for MSG_SMB_NOTIFY_DB
[sfrench/samba-autobuild/.git] / source3 / smbd / notifyd / notifyd.c
1 /*
2  * Unix SMB/CIFS implementation.
3  *
4  * Copyright (C) Volker Lendecke 2014
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include <tevent.h>
22 #include "lib/util/server_id.h"
23 #include "lib/util/data_blob.h"
24 #include "librpc/gen_ndr/notify.h"
25 #include "librpc/gen_ndr/messaging.h"
26 #include "librpc/gen_ndr/server_id.h"
27 #include "lib/dbwrap/dbwrap.h"
28 #include "lib/dbwrap/dbwrap_rbt.h"
29 #include "messages.h"
30 #include "tdb.h"
31 #include "util_tdb.h"
32 #include "notifyd.h"
33 #include "lib/util/server_id_db.h"
34 #include "lib/util/tevent_unix.h"
35 #include "lib/util/tevent_ntstatus.h"
36 #include "ctdbd_conn.h"
37 #include "ctdb_srvids.h"
38 #include "server_id_db_util.h"
39 #include "lib/util/iov_buf.h"
40 #include "messages_util.h"
41
42 #ifdef CLUSTER_SUPPORT
43 #include "ctdb_protocol.h"
44 #endif
45
46 struct notifyd_peer;
47
48 /*
49  * All of notifyd's state
50  */
51
52 struct notifyd_state {
53         struct tevent_context *ev;
54         struct messaging_context *msg_ctx;
55         struct ctdbd_connection *ctdbd_conn;
56
57         /*
58          * Database of everything clients show interest in. Indexed by
59          * absolute path. The database keys are not 0-terminated
60          * because the criticial operation, notifyd_trigger, can walk
61          * the structure from the top without adding intermediate 0s.
62          * The database records contain an array of
63          *
64          * struct notifyd_instance
65          *
66          * to be maintained and parsed by notifyd_entry_parse()
67          */
68         struct db_context *entries;
69
70         /*
71          * In the cluster case, this is the place where we store a log
72          * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
73          * forward them to our peer notifyd's in the cluster once a
74          * second or when the log grows too large.
75          */
76
77         struct messaging_reclog *log;
78
79         /*
80          * Array of companion notifyd's in a cluster. Every notifyd
81          * broadcasts its messaging_reclog to every other notifyd in
82          * the cluster. This is done by making ctdb send a message to
83          * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
84          * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
85          * had called register_with_ctdbd this srvid will receive the
86          * broadcasts.
87          *
88          * Database replication happens via these broadcasts. Also,
89          * they serve as liveness indication. If a notifyd receives a
90          * broadcast from an unknown peer, it will create one for this
91          * srvid. Also when we don't hear anything from a peer for a
92          * while, we will discard it.
93          */
94
95         struct notifyd_peer **peers;
96         size_t num_peers;
97
98         sys_notify_watch_fn sys_notify_watch;
99         struct sys_notify_context *sys_notify_ctx;
100 };
101
102 /*
103  * notifyd's representation of a notify instance
104  */
105 struct notifyd_instance {
106         struct server_id client;
107         struct notify_instance instance;
108
109         void *sys_watch; /* inotify/fam/etc handle */
110
111         /*
112          * Filters after sys_watch took responsibility of some bits
113          */
114         uint32_t internal_filter;
115         uint32_t internal_subdir_filter;
116 };
117
118 struct notifyd_peer {
119         struct notifyd_state *state;
120         struct server_id pid;
121         uint64_t rec_index;
122         struct db_context *db;
123         time_t last_broadcast;
124 };
125
126 static void notifyd_rec_change(struct messaging_context *msg_ctx,
127                                void *private_data, uint32_t msg_type,
128                                struct server_id src, DATA_BLOB *data);
129 static void notifyd_trigger(struct messaging_context *msg_ctx,
130                             void *private_data, uint32_t msg_type,
131                             struct server_id src, DATA_BLOB *data);
132 static void notifyd_get_db(struct messaging_context *msg_ctx,
133                            void *private_data, uint32_t msg_type,
134                            struct server_id src, DATA_BLOB *data);
135
136 #ifdef CLUSTER_SUPPORT
137 static void notifyd_got_db(struct messaging_context *msg_ctx,
138                            void *private_data, uint32_t msg_type,
139                            struct server_id src, DATA_BLOB *data);
140 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
141                                      struct server_id src,
142                                      struct messaging_reclog *log);
143 #endif
144 static void notifyd_sys_callback(struct sys_notify_context *ctx,
145                                  void *private_data, struct notify_event *ev,
146                                  uint32_t filter);
147
148 #ifdef CLUSTER_SUPPORT
149 static struct tevent_req *notifyd_broadcast_reclog_send(
150         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
151         struct ctdbd_connection *ctdbd_conn, struct server_id src,
152         struct messaging_reclog *log);
153 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
154
155 static struct tevent_req *notifyd_clean_peers_send(
156         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
157         struct notifyd_state *notifyd);
158 static int notifyd_clean_peers_recv(struct tevent_req *req);
159 #endif
160
161 static int sys_notify_watch_dummy(
162         TALLOC_CTX *mem_ctx,
163         struct sys_notify_context *ctx,
164         const char *path,
165         uint32_t *filter,
166         uint32_t *subdir_filter,
167         void (*callback)(struct sys_notify_context *ctx,
168                          void *private_data,
169                          struct notify_event *ev,
170                          uint32_t filter),
171         void *private_data,
172         void *handle_p)
173 {
174         void **handle = handle_p;
175         *handle = NULL;
176         return 0;
177 }
178
179 static void notifyd_handler_done(struct tevent_req *subreq);
180
181 #ifdef CLUSTER_SUPPORT
182 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
183 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
184 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
185                                    uint64_t dst_srvid,
186                                    const uint8_t *msg, size_t msglen,
187                                    void *private_data);
188 #endif
189
190 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
191                                 struct messaging_context *msg_ctx,
192                                 struct ctdbd_connection *ctdbd_conn,
193                                 sys_notify_watch_fn sys_notify_watch,
194                                 struct sys_notify_context *sys_notify_ctx)
195 {
196         struct tevent_req *req;
197 #ifdef CLUSTER_SUPPORT
198         struct tevent_req *subreq;
199 #endif
200         struct notifyd_state *state;
201         struct server_id_db *names_db;
202         NTSTATUS status;
203         int ret;
204
205         req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
206         if (req == NULL) {
207                 return NULL;
208         }
209         state->ev = ev;
210         state->msg_ctx = msg_ctx;
211         state->ctdbd_conn = ctdbd_conn;
212
213         if (sys_notify_watch == NULL) {
214                 sys_notify_watch = sys_notify_watch_dummy;
215         }
216
217         state->sys_notify_watch = sys_notify_watch;
218         state->sys_notify_ctx = sys_notify_ctx;
219
220         state->entries = db_open_rbt(state);
221         if (tevent_req_nomem(state->entries, req)) {
222                 return tevent_req_post(req, ev);
223         }
224
225         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
226                                     notifyd_rec_change);
227         if (tevent_req_nterror(req, status)) {
228                 return tevent_req_post(req, ev);
229         }
230
231         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER,
232                                     notifyd_trigger);
233         if (tevent_req_nterror(req, status)) {
234                 goto deregister_rec_change;
235         }
236
237         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB,
238                                     notifyd_get_db);
239         if (tevent_req_nterror(req, status)) {
240                 goto deregister_trigger;
241         }
242
243         names_db = messaging_names_db(msg_ctx);
244
245         ret = server_id_db_set_exclusive(names_db, "notify-daemon");
246         if (ret != 0) {
247                 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
248                            __func__, strerror(ret)));
249                 tevent_req_error(req, ret);
250                 goto deregister_get_db;
251         }
252
253         if (ctdbd_conn == NULL) {
254                 /*
255                  * No cluster around, skip the database replication
256                  * engine
257                  */
258                 return req;
259         }
260
261 #ifdef CLUSTER_SUPPORT
262         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB,
263                                     notifyd_got_db);
264         if (tevent_req_nterror(req, status)) {
265                 goto deregister_get_db;
266         }
267
268         state->log = talloc_zero(state, struct messaging_reclog);
269         if (tevent_req_nomem(state->log, req)) {
270                 goto deregister_db;
271         }
272
273         subreq = notifyd_broadcast_reclog_send(
274                 state->log, ev, ctdbd_conn,
275                 messaging_server_id(msg_ctx),
276                 state->log);
277         if (tevent_req_nomem(subreq, req)) {
278                 goto deregister_db;
279         }
280         tevent_req_set_callback(subreq,
281                                 notifyd_broadcast_reclog_finished,
282                                 req);
283
284         subreq = notifyd_clean_peers_send(state, ev, state);
285         if (tevent_req_nomem(subreq, req)) {
286                 goto deregister_db;
287         }
288         tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
289                                 req);
290
291         ret = register_with_ctdbd(ctdbd_conn,
292                                   CTDB_SRVID_SAMBA_NOTIFY_PROXY,
293                                   notifyd_snoop_broadcast, state);
294         if (ret != 0) {
295                 tevent_req_error(req, ret);
296                 goto deregister_db;
297         }
298 #endif
299
300         return req;
301
302 #ifdef CLUSTER_SUPPORT
303 deregister_db:
304         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state);
305 #endif
306 deregister_get_db:
307         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state);
308 deregister_trigger:
309         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state);
310 deregister_rec_change:
311         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
312         return tevent_req_post(req, ev);
313 }
314
315 static void notifyd_handler_done(struct tevent_req *subreq)
316 {
317         struct tevent_req *req = tevent_req_callback_data(
318                 subreq, struct tevent_req);
319         int ret;
320
321         ret = messaging_handler_recv(subreq);
322         TALLOC_FREE(subreq);
323         tevent_req_error(req, ret);
324 }
325
326 #ifdef CLUSTER_SUPPORT
327
328 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
329 {
330         struct tevent_req *req = tevent_req_callback_data(
331                 subreq, struct tevent_req);
332         int ret;
333
334         ret = notifyd_broadcast_reclog_recv(subreq);
335         TALLOC_FREE(subreq);
336         tevent_req_error(req, ret);
337 }
338
339 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
340 {
341         struct tevent_req *req = tevent_req_callback_data(
342                 subreq, struct tevent_req);
343         int ret;
344
345         ret = notifyd_clean_peers_recv(subreq);
346         TALLOC_FREE(subreq);
347         tevent_req_error(req, ret);
348 }
349
350 #endif
351
352 int notifyd_recv(struct tevent_req *req)
353 {
354         return tevent_req_simple_recv_unix(req);
355 }
356
357 /*
358  * Parse an entry in the notifyd_context->entries database
359  */
360
361 static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
362                                 struct notifyd_instance **instances,
363                                 size_t *num_instances)
364 {
365         if ((buflen % sizeof(struct notifyd_instance)) != 0) {
366                 DEBUG(1, ("%s: invalid buffer size: %u\n",
367                           __func__, (unsigned)buflen));
368                 return false;
369         }
370
371         if (instances != NULL) {
372                 *instances = (struct notifyd_instance *)buf;
373         }
374         if (num_instances != NULL) {
375                 *num_instances = buflen / sizeof(struct notifyd_instance);
376         }
377         return true;
378 }
379
380 static bool notifyd_apply_rec_change(
381         const struct server_id *client,
382         const char *path, size_t pathlen,
383         const struct notify_instance *chg,
384         struct db_context *entries,
385         sys_notify_watch_fn sys_notify_watch,
386         struct sys_notify_context *sys_notify_ctx,
387         struct messaging_context *msg_ctx)
388 {
389         struct db_record *rec;
390         struct notifyd_instance *instances;
391         size_t num_instances;
392         size_t i;
393         struct notifyd_instance *instance;
394         TDB_DATA value;
395         NTSTATUS status;
396         bool ok = false;
397
398         if (pathlen == 0) {
399                 DEBUG(1, ("%s: pathlen==0\n", __func__));
400                 return false;
401         }
402         if (path[pathlen-1] != '\0') {
403                 DEBUG(1, ("%s: path not 0-terminated\n", __func__));
404                 return false;
405         }
406
407         DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
408                    "private_data=%p\n", __func__, path,
409                    (unsigned)chg->filter, (unsigned)chg->subdir_filter,
410                    chg->private_data));
411
412         rec = dbwrap_fetch_locked(
413                 entries, entries,
414                 make_tdb_data((const uint8_t *)path, pathlen-1));
415
416         if (rec == NULL) {
417                 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
418                 goto fail;
419         }
420
421         num_instances = 0;
422         value = dbwrap_record_get_value(rec);
423
424         if (value.dsize != 0) {
425                 if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
426                                          &num_instances)) {
427                         goto fail;
428                 }
429         }
430
431         /*
432          * Overallocate by one instance to avoid a realloc when adding
433          */
434         instances = talloc_array(rec, struct notifyd_instance,
435                                  num_instances + 1);
436         if (instances == NULL) {
437                 DEBUG(1, ("%s: talloc failed\n", __func__));
438                 goto fail;
439         }
440
441         if (value.dsize != 0) {
442                 memcpy(instances, value.dptr, value.dsize);
443         }
444
445         for (i=0; i<num_instances; i++) {
446                 instance = &instances[i];
447
448                 if (server_id_equal(&instance->client, client) &&
449                     (instance->instance.private_data == chg->private_data)) {
450                         break;
451                 }
452         }
453
454         if (i < num_instances) {
455                 instance->instance = *chg;
456         } else {
457                 /*
458                  * We've overallocated for one instance
459                  */
460                 instance = &instances[num_instances];
461
462                 *instance = (struct notifyd_instance) {
463                         .client = *client,
464                         .instance = *chg,
465                         .internal_filter = chg->filter,
466                         .internal_subdir_filter = chg->subdir_filter
467                 };
468
469                 num_instances += 1;
470         }
471
472         if ((instance->instance.filter != 0) ||
473             (instance->instance.subdir_filter != 0)) {
474                 int ret;
475
476                 TALLOC_FREE(instance->sys_watch);
477
478                 ret = sys_notify_watch(entries, sys_notify_ctx, path,
479                                        &instance->internal_filter,
480                                        &instance->internal_subdir_filter,
481                                        notifyd_sys_callback, msg_ctx,
482                                        &instance->sys_watch);
483                 if (ret != 0) {
484                         DEBUG(1, ("%s: inotify_watch returned %s\n",
485                                   __func__, strerror(errno)));
486                 }
487         }
488
489         if ((instance->instance.filter == 0) &&
490             (instance->instance.subdir_filter == 0)) {
491                 /* This is a delete request */
492                 TALLOC_FREE(instance->sys_watch);
493                 *instance = instances[num_instances-1];
494                 num_instances -= 1;
495         }
496
497         DEBUG(10, ("%s: %s has %u instances\n", __func__,
498                    path, (unsigned)num_instances));
499
500         if (num_instances == 0) {
501                 status = dbwrap_record_delete(rec);
502                 if (!NT_STATUS_IS_OK(status)) {
503                         DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
504                                   __func__, nt_errstr(status)));
505                         goto fail;
506                 }
507         } else {
508                 value = make_tdb_data(
509                         (uint8_t *)instances,
510                         sizeof(struct notifyd_instance) * num_instances);
511
512                 status = dbwrap_record_store(rec, value, 0);
513                 if (!NT_STATUS_IS_OK(status)) {
514                         DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
515                                   __func__, nt_errstr(status)));
516                         goto fail;
517                 }
518         }
519
520         ok = true;
521 fail:
522         TALLOC_FREE(rec);
523         return ok;
524 }
525
526 static void notifyd_sys_callback(struct sys_notify_context *ctx,
527                                  void *private_data, struct notify_event *ev,
528                                  uint32_t filter)
529 {
530         struct messaging_context *msg_ctx = talloc_get_type_abort(
531                 private_data, struct messaging_context);
532         struct notify_trigger_msg msg;
533         struct iovec iov[4];
534         char slash = '/';
535
536         msg = (struct notify_trigger_msg) {
537                 .when = timespec_current(),
538                 .action = ev->action,
539                 .filter = filter,
540         };
541
542         iov[0].iov_base = &msg;
543         iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
544         iov[1].iov_base = discard_const_p(char, ev->dir);
545         iov[1].iov_len = strlen(ev->dir);
546         iov[2].iov_base = &slash;
547         iov[2].iov_len = 1;
548         iov[3].iov_base = discard_const_p(char, ev->path);
549         iov[3].iov_len = strlen(ev->path)+1;
550
551         messaging_send_iov(
552                 msg_ctx, messaging_server_id(msg_ctx),
553                 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
554 }
555
556 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
557                                      struct notify_rec_change_msg **pmsg,
558                                      size_t *pathlen)
559 {
560         struct notify_rec_change_msg *msg;
561
562         if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
563                 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
564                           (unsigned)bufsize));
565                 return false;
566         }
567
568         *pmsg = msg = (struct notify_rec_change_msg *)buf;
569         *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
570
571         DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
572                    "private_data=%p, path=%.*s\n",
573                    __func__, (unsigned)msg->instance.filter,
574                    (unsigned)msg->instance.subdir_filter,
575                    msg->instance.private_data, (int)(*pathlen), msg->path));
576
577         return true;
578 }
579
580 static void notifyd_rec_change(struct messaging_context *msg_ctx,
581                                void *private_data, uint32_t msg_type,
582                                struct server_id src, DATA_BLOB *data)
583 {
584         struct notifyd_state *state = talloc_get_type_abort(
585                 private_data, struct notifyd_state);
586         struct server_id_buf idbuf;
587         struct notify_rec_change_msg *msg;
588         size_t pathlen;
589         bool ok;
590
591         DBG_DEBUG("Got %zu bytes from %s\n", data->length,
592                   server_id_str_buf(src, &idbuf));
593
594         ok = notifyd_parse_rec_change(data->data, data->length,
595                                       &msg, &pathlen);
596         if (!ok) {
597                 return;
598         }
599
600         ok = notifyd_apply_rec_change(
601                 &src, msg->path, pathlen, &msg->instance,
602                 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
603                 state->msg_ctx);
604         if (!ok) {
605                 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
606                           __func__));
607                 return;
608         }
609
610         if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
611                 return;
612         }
613
614 #ifdef CLUSTER_SUPPORT
615         {
616
617         struct messaging_rec **tmp;
618         struct messaging_reclog *log;
619         struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
620
621         log = state->log;
622
623         tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
624                              log->num_recs+1);
625         if (tmp == NULL) {
626                 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
627                 return;
628         }
629         log->recs = tmp;
630
631         log->recs[log->num_recs] = messaging_rec_create(
632                 log->recs, src, messaging_server_id(msg_ctx),
633                 msg_type, &iov, 1, NULL, 0);
634
635         if (log->recs[log->num_recs] == NULL) {
636                 DBG_WARNING("messaging_rec_create failed, ignoring\n");
637                 return;
638         }
639
640         log->num_recs += 1;
641
642         if (log->num_recs >= 100) {
643                 /*
644                  * Don't let the log grow too large
645                  */
646                 notifyd_broadcast_reclog(state->ctdbd_conn,
647                                          messaging_server_id(msg_ctx), log);
648         }
649
650         }
651 #endif
652 }
653
654 struct notifyd_trigger_state {
655         struct messaging_context *msg_ctx;
656         struct notify_trigger_msg *msg;
657         bool recursive;
658         bool covered_by_sys_notify;
659 };
660
661 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
662                                    void *private_data);
663
664 static void notifyd_trigger(struct messaging_context *msg_ctx,
665                             void *private_data, uint32_t msg_type,
666                             struct server_id src, DATA_BLOB *data)
667 {
668         struct notifyd_state *state = talloc_get_type_abort(
669                 private_data, struct notifyd_state);
670         struct server_id my_id = messaging_server_id(msg_ctx);
671         struct notifyd_trigger_state tstate;
672         const char *path;
673         const char *p, *next_p;
674
675         if (data->length < offsetof(struct notify_trigger_msg, path) + 1) {
676                 DBG_WARNING("message too short, ignoring: %zu\n",
677                             data->length);
678                 return;
679         }
680         if (data->data[data->length-1] != 0) {
681                 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
682                 return;
683         }
684
685         tstate.msg_ctx = msg_ctx;
686
687         tstate.covered_by_sys_notify = (src.vnn == my_id.vnn);
688         tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id);
689
690         tstate.msg = (struct notify_trigger_msg *)data->data;
691         path = tstate.msg->path;
692
693         DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
694                    __func__, (unsigned)tstate.msg->action,
695                    (unsigned)tstate.msg->filter, path));
696
697         if (path[0] != '/') {
698                 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
699                           __func__, path));
700                 return;
701         }
702
703         for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
704                 ptrdiff_t path_len = p - path;
705                 TDB_DATA key;
706                 uint32_t i;
707
708                 next_p = strchr(p+1, '/');
709                 tstate.recursive = (next_p != NULL);
710
711                 DEBUG(10, ("%s: Trying path %.*s\n", __func__,
712                            (int)path_len, path));
713
714                 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
715                                    .dsize = path_len };
716
717                 dbwrap_parse_record(state->entries, key,
718                                     notifyd_trigger_parser, &tstate);
719
720                 if (state->peers == NULL) {
721                         continue;
722                 }
723
724                 if (src.vnn != my_id.vnn) {
725                         continue;
726                 }
727
728                 for (i=0; i<state->num_peers; i++) {
729                         if (state->peers[i]->db == NULL) {
730                                 /*
731                                  * Inactive peer, did not get a db yet
732                                  */
733                                 continue;
734                         }
735                         dbwrap_parse_record(state->peers[i]->db, key,
736                                             notifyd_trigger_parser, &tstate);
737                 }
738         }
739 }
740
741 static void notifyd_send_delete(struct messaging_context *msg_ctx,
742                                 TDB_DATA key,
743                                 struct notifyd_instance *instance);
744
745 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
746                                    void *private_data)
747
748 {
749         struct notifyd_trigger_state *tstate = private_data;
750         struct notify_event_msg msg = { .action = tstate->msg->action,
751                                         .when = tstate->msg->when };
752         struct iovec iov[2];
753         size_t path_len = key.dsize;
754         struct notifyd_instance *instances = NULL;
755         size_t num_instances = 0;
756         size_t i;
757
758         if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
759                                  &num_instances)) {
760                 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
761                 return;
762         }
763
764         DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
765                    (unsigned)num_instances, (int)key.dsize,
766                    (char *)key.dptr));
767
768         iov[0].iov_base = &msg;
769         iov[0].iov_len = offsetof(struct notify_event_msg, path);
770         iov[1].iov_base = tstate->msg->path + path_len + 1;
771         iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
772
773         for (i=0; i<num_instances; i++) {
774                 struct notifyd_instance *instance = &instances[i];
775                 struct server_id_buf idbuf;
776                 uint32_t i_filter;
777                 NTSTATUS status;
778
779                 if (tstate->covered_by_sys_notify) {
780                         if (tstate->recursive) {
781                                 i_filter = instance->internal_subdir_filter;
782                         } else {
783                                 i_filter = instance->internal_filter;
784                         }
785                 } else {
786                         if (tstate->recursive) {
787                                 i_filter = instance->instance.subdir_filter;
788                         } else {
789                                 i_filter = instance->instance.filter;
790                         }
791                 }
792
793                 if ((i_filter & tstate->msg->filter) == 0) {
794                         continue;
795                 }
796
797                 msg.private_data = instance->instance.private_data;
798
799                 status = messaging_send_iov(
800                         tstate->msg_ctx, instance->client,
801                         MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
802
803                 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
804                            __func__,
805                            server_id_str_buf(instance->client, &idbuf),
806                            nt_errstr(status)));
807
808                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
809                     procid_is_local(&instance->client)) {
810                         /*
811                          * That process has died
812                          */
813                         notifyd_send_delete(tstate->msg_ctx, key, instance);
814                         continue;
815                 }
816
817                 if (!NT_STATUS_IS_OK(status)) {
818                         DEBUG(1, ("%s: messaging_send_iov returned %s\n",
819                                   __func__, nt_errstr(status)));
820                 }
821         }
822 }
823
824 /*
825  * Send a delete request to ourselves to properly discard a notify
826  * record for an smbd that has died.
827  */
828
829 static void notifyd_send_delete(struct messaging_context *msg_ctx,
830                                 TDB_DATA key,
831                                 struct notifyd_instance *instance)
832 {
833         struct notify_rec_change_msg msg = {
834                 .instance.private_data = instance->instance.private_data
835         };
836         uint8_t nul = 0;
837         struct iovec iov[3];
838         int ret;
839
840         /*
841          * Send a rec_change to ourselves to delete a dead entry
842          */
843
844         iov[0] = (struct iovec) {
845                 .iov_base = &msg,
846                 .iov_len = offsetof(struct notify_rec_change_msg, path) };
847         iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
848         iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
849
850         ret = messaging_send_iov_from(
851                 msg_ctx, instance->client, messaging_server_id(msg_ctx),
852                 MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
853
854         if (ret != 0) {
855                 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
856                            __func__, strerror(ret)));
857         }
858 }
859
860 static void notifyd_get_db(struct messaging_context *msg_ctx,
861                            void *private_data, uint32_t msg_type,
862                            struct server_id src, DATA_BLOB *data)
863 {
864         struct notifyd_state *state = talloc_get_type_abort(
865                 private_data, struct notifyd_state);
866         struct server_id_buf id1, id2;
867         NTSTATUS status;
868         uint64_t rec_index = UINT64_MAX;
869         uint8_t index_buf[sizeof(uint64_t)];
870         size_t dbsize;
871         uint8_t *buf;
872         struct iovec iov[2];
873
874         dbsize = dbwrap_marshall(state->entries, NULL, 0);
875
876         buf = talloc_array(talloc_tos(), uint8_t, dbsize);
877         if (buf == NULL) {
878                 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
879                           __func__, (uintmax_t)dbsize));
880                 return;
881         }
882
883         dbsize = dbwrap_marshall(state->entries, buf, dbsize);
884
885         if (dbsize != talloc_get_size(buf)) {
886                 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
887                           (uintmax_t)talloc_get_size(buf),
888                           (uintmax_t)dbsize));
889                 TALLOC_FREE(buf);
890                 return;
891         }
892
893         if (state->log != NULL) {
894                 rec_index = state->log->rec_index;
895         }
896         SBVAL(index_buf, 0, rec_index);
897
898         iov[0] = (struct iovec) { .iov_base = index_buf,
899                                   .iov_len = sizeof(index_buf) };
900         iov[1] = (struct iovec) { .iov_base = buf,
901                                   .iov_len = dbsize };
902
903         DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
904                    (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
905                    server_id_str_buf(messaging_server_id(msg_ctx), &id1),
906                    server_id_str_buf(src, &id2)));
907
908         status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB,
909                                     iov, ARRAY_SIZE(iov), NULL, 0);
910         TALLOC_FREE(buf);
911         if (!NT_STATUS_IS_OK(status)) {
912                 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
913                           __func__, nt_errstr(status)));
914         }
915 }
916
917 #ifdef CLUSTER_SUPPORT
918
919 static int notifyd_add_proxy_syswatches(struct db_record *rec,
920                                         void *private_data);
921
922 static void notifyd_got_db(struct messaging_context *msg_ctx,
923                            void *private_data, uint32_t msg_type,
924                            struct server_id src, DATA_BLOB *data)
925 {
926         struct notifyd_state *state = talloc_get_type_abort(
927                 private_data, struct notifyd_state);
928         struct notifyd_peer *p = NULL;
929         struct server_id_buf idbuf;
930         NTSTATUS status;
931         int count;
932         size_t i;
933
934         for (i=0; i<state->num_peers; i++) {
935                 if (server_id_equal(&src, &state->peers[i]->pid)) {
936                         p = state->peers[i];
937                         break;
938                 }
939         }
940
941         if (p == NULL) {
942                 DBG_DEBUG("Did not find peer for db from %s\n",
943                           server_id_str_buf(src, &idbuf));
944                 return;
945         }
946
947         if (data->length < 8) {
948                 DBG_DEBUG("Got short db length %zu from %s\n", data->length,
949                            server_id_str_buf(src, &idbuf));
950                 TALLOC_FREE(p);
951                 return;
952         }
953
954         p->rec_index = BVAL(data->data, 0);
955
956         p->db = db_open_rbt(p);
957         if (p->db == NULL) {
958                 DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
959                 TALLOC_FREE(p);
960                 return;
961         }
962
963         status = dbwrap_unmarshall(p->db, data->data + 8,
964                                    data->length - 8);
965         if (!NT_STATUS_IS_OK(status)) {
966                 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
967                            __func__, nt_errstr(status),
968                            server_id_str_buf(src, &idbuf)));
969                 TALLOC_FREE(p);
970                 return;
971         }
972
973         dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
974                              &count);
975
976         DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
977                    server_id_str_buf(src, &idbuf), count));
978 }
979
980 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
981                                      struct server_id src,
982                                      struct messaging_reclog *log)
983 {
984         enum ndr_err_code ndr_err;
985         uint8_t msghdr[MESSAGE_HDR_LENGTH];
986         DATA_BLOB blob;
987         struct iovec iov[2];
988         int ret;
989
990         if (log == NULL) {
991                 return;
992         }
993
994         DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
995                    (uintmax_t)log->rec_index, (unsigned)log->num_recs));
996
997         message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
998                         (struct server_id) {0 });
999         iov[0] = (struct iovec) { .iov_base = msghdr,
1000                                   .iov_len = sizeof(msghdr) };
1001
1002         ndr_err = ndr_push_struct_blob(
1003                 &blob, log, log,
1004                 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
1005         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1006                 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
1007                           __func__, ndr_errstr(ndr_err)));
1008                 goto done;
1009         }
1010         iov[1] = (struct iovec) { .iov_base = blob.data,
1011                                   .iov_len = blob.length };
1012
1013         ret = ctdbd_messaging_send_iov(
1014                 ctdbd_conn, CTDB_BROADCAST_VNNMAP,
1015                 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
1016         TALLOC_FREE(blob.data);
1017         if (ret != 0) {
1018                 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
1019                           __func__, strerror(ret)));
1020                 goto done;
1021         }
1022
1023         log->rec_index += 1;
1024
1025 done:
1026         log->num_recs = 0;
1027         TALLOC_FREE(log->recs);
1028 }
1029
1030 struct notifyd_broadcast_reclog_state {
1031         struct tevent_context *ev;
1032         struct ctdbd_connection *ctdbd_conn;
1033         struct server_id src;
1034         struct messaging_reclog *log;
1035 };
1036
1037 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1038
1039 static struct tevent_req *notifyd_broadcast_reclog_send(
1040         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1041         struct ctdbd_connection *ctdbd_conn, struct server_id src,
1042         struct messaging_reclog *log)
1043 {
1044         struct tevent_req *req, *subreq;
1045         struct notifyd_broadcast_reclog_state *state;
1046
1047         req = tevent_req_create(mem_ctx, &state,
1048                                 struct notifyd_broadcast_reclog_state);
1049         if (req == NULL) {
1050                 return NULL;
1051         }
1052         state->ev = ev;
1053         state->ctdbd_conn = ctdbd_conn;
1054         state->src = src;
1055         state->log = log;
1056
1057         subreq = tevent_wakeup_send(state, state->ev,
1058                                     timeval_current_ofs_msec(1000));
1059         if (tevent_req_nomem(subreq, req)) {
1060                 return tevent_req_post(req, ev);
1061         }
1062         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1063         return req;
1064 }
1065
1066 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1067 {
1068         struct tevent_req *req = tevent_req_callback_data(
1069                 subreq, struct tevent_req);
1070         struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1071                 req, struct notifyd_broadcast_reclog_state);
1072         bool ok;
1073
1074         ok = tevent_wakeup_recv(subreq);
1075         TALLOC_FREE(subreq);
1076         if (!ok) {
1077                 tevent_req_oom(req);
1078                 return;
1079         }
1080
1081         notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1082
1083         subreq = tevent_wakeup_send(state, state->ev,
1084                                     timeval_current_ofs_msec(1000));
1085         if (tevent_req_nomem(subreq, req)) {
1086                 return;
1087         }
1088         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1089 }
1090
1091 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1092 {
1093         return tevent_req_simple_recv_unix(req);
1094 }
1095
1096 struct notifyd_clean_peers_state {
1097         struct tevent_context *ev;
1098         struct notifyd_state *notifyd;
1099 };
1100
1101 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1102
1103 static struct tevent_req *notifyd_clean_peers_send(
1104         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1105         struct notifyd_state *notifyd)
1106 {
1107         struct tevent_req *req, *subreq;
1108         struct notifyd_clean_peers_state *state;
1109
1110         req = tevent_req_create(mem_ctx, &state,
1111                                 struct notifyd_clean_peers_state);
1112         if (req == NULL) {
1113                 return NULL;
1114         }
1115         state->ev = ev;
1116         state->notifyd = notifyd;
1117
1118         subreq = tevent_wakeup_send(state, state->ev,
1119                                     timeval_current_ofs_msec(30000));
1120         if (tevent_req_nomem(subreq, req)) {
1121                 return tevent_req_post(req, ev);
1122         }
1123         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1124         return req;
1125 }
1126
1127 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1128 {
1129         struct tevent_req *req = tevent_req_callback_data(
1130                 subreq, struct tevent_req);
1131         struct notifyd_clean_peers_state *state = tevent_req_data(
1132                 req, struct notifyd_clean_peers_state);
1133         struct notifyd_state *notifyd = state->notifyd;
1134         size_t i;
1135         bool ok;
1136         time_t now = time(NULL);
1137
1138         ok = tevent_wakeup_recv(subreq);
1139         TALLOC_FREE(subreq);
1140         if (!ok) {
1141                 tevent_req_oom(req);
1142                 return;
1143         }
1144
1145         i = 0;
1146         while (i < notifyd->num_peers) {
1147                 struct notifyd_peer *p = notifyd->peers[i];
1148
1149                 if ((now - p->last_broadcast) > 60) {
1150                         struct server_id_buf idbuf;
1151
1152                         /*
1153                          * Haven't heard for more than 60 seconds. Call this
1154                          * peer dead
1155                          */
1156
1157                         DEBUG(10, ("%s: peer %s died\n", __func__,
1158                                    server_id_str_buf(p->pid, &idbuf)));
1159                         /*
1160                          * This implicitly decrements notifyd->num_peers
1161                          */
1162                         TALLOC_FREE(p);
1163                 } else {
1164                         i += 1;
1165                 }
1166         }
1167
1168         subreq = tevent_wakeup_send(state, state->ev,
1169                                     timeval_current_ofs_msec(30000));
1170         if (tevent_req_nomem(subreq, req)) {
1171                 return;
1172         }
1173         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1174 }
1175
1176 static int notifyd_clean_peers_recv(struct tevent_req *req)
1177 {
1178         return tevent_req_simple_recv_unix(req);
1179 }
1180
1181 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1182                                         void *private_data)
1183 {
1184         struct notifyd_state *state = talloc_get_type_abort(
1185                 private_data, struct notifyd_state);
1186         struct db_context *db = dbwrap_record_get_db(rec);
1187         TDB_DATA key = dbwrap_record_get_key(rec);
1188         TDB_DATA value = dbwrap_record_get_value(rec);
1189         struct notifyd_instance *instances = NULL;
1190         size_t num_instances = 0;
1191         size_t i;
1192         char path[key.dsize+1];
1193         bool ok;
1194
1195         memcpy(path, key.dptr, key.dsize);
1196         path[key.dsize] = '\0';
1197
1198         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1199                                  &num_instances);
1200         if (!ok) {
1201                 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1202                           __func__, path));
1203                 return 0;
1204         }
1205
1206         for (i=0; i<num_instances; i++) {
1207                 struct notifyd_instance *instance = &instances[i];
1208                 uint32_t filter = instance->instance.filter;
1209                 uint32_t subdir_filter = instance->instance.subdir_filter;
1210                 int ret;
1211
1212                 /*
1213                  * This is a remote database. Pointers that we were
1214                  * given don't make sense locally. Initialize to NULL
1215                  * in case sys_notify_watch fails.
1216                  */
1217                 instances[i].sys_watch = NULL;
1218
1219                 ret = state->sys_notify_watch(
1220                         db, state->sys_notify_ctx, path,
1221                         &filter, &subdir_filter,
1222                         notifyd_sys_callback, state->msg_ctx,
1223                         &instance->sys_watch);
1224                 if (ret != 0) {
1225                         DEBUG(1, ("%s: inotify_watch returned %s\n",
1226                                   __func__, strerror(errno)));
1227                 }
1228         }
1229
1230         return 0;
1231 }
1232
1233 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1234 {
1235         TDB_DATA key = dbwrap_record_get_key(rec);
1236         TDB_DATA value = dbwrap_record_get_value(rec);
1237         struct notifyd_instance *instances = NULL;
1238         size_t num_instances = 0;
1239         size_t i;
1240         bool ok;
1241
1242         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1243                                  &num_instances);
1244         if (!ok) {
1245                 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1246                           __func__, (int)key.dsize, (char *)key.dptr));
1247                 return 0;
1248         }
1249         for (i=0; i<num_instances; i++) {
1250                 TALLOC_FREE(instances[i].sys_watch);
1251         }
1252         return 0;
1253 }
1254
1255 static int notifyd_peer_destructor(struct notifyd_peer *p)
1256 {
1257         struct notifyd_state *state = p->state;
1258         size_t i;
1259
1260         if (p->db != NULL) {
1261                 dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1262                                      NULL, NULL);
1263         }
1264
1265         for (i = 0; i<state->num_peers; i++) {
1266                 if (p == state->peers[i]) {
1267                         state->peers[i] = state->peers[state->num_peers-1];
1268                         state->num_peers -= 1;
1269                         break;
1270                 }
1271         }
1272         return 0;
1273 }
1274
1275 static struct notifyd_peer *notifyd_peer_new(
1276         struct notifyd_state *state, struct server_id pid)
1277 {
1278         struct notifyd_peer *p, **tmp;
1279
1280         tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1281                              state->num_peers+1);
1282         if (tmp == NULL) {
1283                 return NULL;
1284         }
1285         state->peers = tmp;
1286
1287         p = talloc_zero(state->peers, struct notifyd_peer);
1288         if (p == NULL) {
1289                 return NULL;
1290         }
1291         p->state = state;
1292         p->pid = pid;
1293
1294         state->peers[state->num_peers] = p;
1295         state->num_peers += 1;
1296
1297         talloc_set_destructor(p, notifyd_peer_destructor);
1298
1299         return p;
1300 }
1301
1302 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1303                                  const uint8_t *msg, size_t msglen)
1304 {
1305         struct notifyd_state *state = peer->state;
1306         DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1307                            .length = msglen };
1308         struct server_id_buf idbuf;
1309         struct messaging_reclog *log;
1310         enum ndr_err_code ndr_err;
1311         uint32_t i;
1312
1313         if (peer->db == NULL) {
1314                 /*
1315                  * No db yet
1316                  */
1317                 return;
1318         }
1319
1320         log = talloc(peer, struct messaging_reclog);
1321         if (log == NULL) {
1322                 DEBUG(10, ("%s: talloc failed\n", __func__));
1323                 return;
1324         }
1325
1326         ndr_err = ndr_pull_struct_blob_all(
1327                 &blob, log, log,
1328                 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1329         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1330                 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1331                            __func__, ndr_errstr(ndr_err)));
1332                 goto fail;
1333         }
1334
1335         DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
1336                    (unsigned)log->num_recs, (uintmax_t)log->rec_index,
1337                    server_id_str_buf(peer->pid, &idbuf)));
1338
1339         if (log->rec_index != peer->rec_index) {
1340                 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1341                           __func__, (uintmax_t)log->rec_index,
1342                           server_id_str_buf(peer->pid, &idbuf),
1343                           (uintmax_t)peer->rec_index));
1344                 goto fail;
1345         }
1346
1347         for (i=0; i<log->num_recs; i++) {
1348                 struct messaging_rec *r = log->recs[i];
1349                 struct notify_rec_change_msg *chg;
1350                 size_t pathlen;
1351                 bool ok;
1352
1353                 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1354                                               &chg, &pathlen);
1355                 if (!ok) {
1356                         DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1357                                   __func__));
1358                         goto fail;
1359                 }
1360
1361                 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1362                                               &chg->instance, peer->db,
1363                                               state->sys_notify_watch,
1364                                               state->sys_notify_ctx,
1365                                               state->msg_ctx);
1366                 if (!ok) {
1367                         DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1368                                   __func__));
1369                         goto fail;
1370                 }
1371         }
1372
1373         peer->rec_index += 1;
1374         peer->last_broadcast = time(NULL);
1375
1376         TALLOC_FREE(log);
1377         return;
1378
1379 fail:
1380         DEBUG(10, ("%s: Dropping peer %s\n", __func__,
1381                    server_id_str_buf(peer->pid, &idbuf)));
1382         TALLOC_FREE(peer);
1383 }
1384
1385 /*
1386  * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1387  * messages) broadcasts by other notifyds. Several cases:
1388  *
1389  * We don't know the source. This creates a new peer. Creating a peer
1390  * involves asking the peer for its full database. We assume ordered
1391  * messages, so the new database will arrive before the next broadcast
1392  * will.
1393  *
1394  * We know the source and the log index matches. We will apply the log
1395  * locally to our peer's db as if we had received it from a local
1396  * client.
1397  *
1398  * We know the source but the log index does not match. This means we
1399  * lost a message. We just drop the whole peer and wait for the next
1400  * broadcast, which will then trigger a fresh database pull.
1401  */
1402
1403 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
1404                                    uint64_t dst_srvid,
1405                                    const uint8_t *msg, size_t msglen,
1406                                    void *private_data)
1407 {
1408         struct notifyd_state *state = talloc_get_type_abort(
1409                 private_data, struct notifyd_state);
1410         struct server_id my_id = messaging_server_id(state->msg_ctx);
1411         struct notifyd_peer *p;
1412         uint32_t i;
1413         uint32_t msg_type;
1414         struct server_id src, dst;
1415         struct server_id_buf idbuf;
1416         NTSTATUS status;
1417
1418         if (msglen < MESSAGE_HDR_LENGTH) {
1419                 DEBUG(10, ("%s: Got short broadcast\n", __func__));
1420                 return 0;
1421         }
1422         message_hdr_get(&msg_type, &src, &dst, msg);
1423
1424         if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1425                 DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
1426                            (unsigned)msg_type));
1427                 return 0;
1428         }
1429         if (server_id_equal(&src, &my_id)) {
1430                 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
1431                 return 0;
1432         }
1433
1434         DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1435                    __func__, server_id_str_buf(src, &idbuf)));
1436
1437         for (i=0; i<state->num_peers; i++) {
1438                 if (server_id_equal(&state->peers[i]->pid, &src)) {
1439
1440                         DEBUG(10, ("%s: Applying changes to peer %u\n",
1441                                    __func__, (unsigned)i));
1442
1443                         notifyd_apply_reclog(state->peers[i],
1444                                              msg + MESSAGE_HDR_LENGTH,
1445                                              msglen - MESSAGE_HDR_LENGTH);
1446                         return 0;
1447                 }
1448         }
1449
1450         DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
1451                    server_id_str_buf(src, &idbuf)));
1452
1453         p = notifyd_peer_new(state, src);
1454         if (p == NULL) {
1455                 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
1456                 return 0;
1457         }
1458
1459         status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1460                                     NULL, 0);
1461         if (!NT_STATUS_IS_OK(status)) {
1462                 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1463                            __func__, nt_errstr(status)));
1464                 TALLOC_FREE(p);
1465                 return 0;
1466         }
1467
1468         return 0;
1469 }
1470 #endif
1471
1472 struct notifyd_parse_db_state {
1473         bool (*fn)(const char *path,
1474                    struct server_id server,
1475                    const struct notify_instance *instance,
1476                    void *private_data);
1477         void *private_data;
1478 };
1479
1480 static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
1481                                     void *private_data)
1482 {
1483         struct notifyd_parse_db_state *state = private_data;
1484         char path[key.dsize+1];
1485         struct notifyd_instance *instances = NULL;
1486         size_t num_instances = 0;
1487         size_t i;
1488         bool ok;
1489
1490         memcpy(path, key.dptr, key.dsize);
1491         path[key.dsize] = 0;
1492
1493         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1494                                  &num_instances);
1495         if (!ok) {
1496                 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1497                            __func__, path));
1498                 return true;
1499         }
1500
1501         for (i=0; i<num_instances; i++) {
1502                 ok = state->fn(path, instances[i].client,
1503                                &instances[i].instance,
1504                                state->private_data);
1505                 if (!ok) {
1506                         return false;
1507                 }
1508         }
1509
1510         return true;
1511 }
1512
1513 int notifyd_parse_db(const uint8_t *buf, size_t buflen,
1514                      uint64_t *log_index,
1515                      bool (*fn)(const char *path,
1516                                 struct server_id server,
1517                                 const struct notify_instance *instance,
1518                                 void *private_data),
1519                      void *private_data)
1520 {
1521         struct notifyd_parse_db_state state = {
1522                 .fn = fn, .private_data = private_data
1523         };
1524         NTSTATUS status;
1525
1526         if (buflen < 8) {
1527                 return EINVAL;
1528         }
1529         *log_index = BVAL(buf, 0);
1530
1531         buf += 8;
1532         buflen -= 8;
1533
1534         status = dbwrap_parse_marshall_buf(
1535                 buf, buflen, notifyd_parse_db_parser, &state);
1536         if (!NT_STATUS_IS_OK(status)) {
1537                 return map_errno_from_nt_status(status);
1538         }
1539
1540         return 0;
1541 }