notifyd: Avoid an if-expression
[bbaumbach/samba-autobuild/.git] / source3 / smbd / notifyd / notifyd.c
1 /*
2  * Unix SMB/CIFS implementation.
3  *
4  * Copyright (C) Volker Lendecke 2014
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include <tevent.h>
22 #include "lib/util/server_id.h"
23 #include "lib/util/data_blob.h"
24 #include "librpc/gen_ndr/notify.h"
25 #include "librpc/gen_ndr/messaging.h"
26 #include "librpc/gen_ndr/server_id.h"
27 #include "lib/dbwrap/dbwrap.h"
28 #include "lib/dbwrap/dbwrap_rbt.h"
29 #include "messages.h"
30 #include "tdb.h"
31 #include "util_tdb.h"
32 #include "notifyd.h"
33 #include "lib/util/server_id_db.h"
34 #include "lib/util/tevent_unix.h"
35 #include "ctdbd_conn.h"
36 #include "ctdb_srvids.h"
37 #include "server_id_db_util.h"
38 #include "lib/util/iov_buf.h"
39 #include "messages_util.h"
40
41 #ifdef CLUSTER_SUPPORT
42 #include "ctdb_protocol.h"
43 #endif
44
45 struct notifyd_peer;
46
47 /*
48  * All of notifyd's state
49  */
50
51 struct notifyd_state {
52         struct tevent_context *ev;
53         struct messaging_context *msg_ctx;
54         struct ctdbd_connection *ctdbd_conn;
55
56         /*
57          * Database of everything clients show interest in. Indexed by
58          * absolute path. The database keys are not 0-terminated
59          * because the criticial operation, notifyd_trigger, can walk
60          * the structure from the top without adding intermediate 0s.
61          * The database records contain an array of
62          *
63          * struct notifyd_instance
64          *
65          * to be maintained and parsed by notifyd_entry_parse()
66          */
67         struct db_context *entries;
68
69         /*
70          * In the cluster case, this is the place where we store a log
71          * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
72          * forward them to our peer notifyd's in the cluster once a
73          * second or when the log grows too large.
74          */
75
76         struct messaging_reclog *log;
77
78         /*
79          * Array of companion notifyd's in a cluster. Every notifyd
80          * broadcasts its messaging_reclog to every other notifyd in
81          * the cluster. This is done by making ctdb send a message to
82          * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
83          * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
84          * had called register_with_ctdbd this srvid will receive the
85          * broadcasts.
86          *
87          * Database replication happens via these broadcasts. Also,
88          * they serve as liveness indication. If a notifyd receives a
89          * broadcast from an unknown peer, it will create one for this
90          * srvid. Also when we don't hear anything from a peer for a
91          * while, we will discard it.
92          */
93
94         struct notifyd_peer **peers;
95         size_t num_peers;
96
97         sys_notify_watch_fn sys_notify_watch;
98         struct sys_notify_context *sys_notify_ctx;
99 };
100
101 /*
102  * notifyd's representation of a notify instance
103  */
104 struct notifyd_instance {
105         struct server_id client;
106         struct notify_instance instance;
107
108         void *sys_watch; /* inotify/fam/etc handle */
109
110         /*
111          * Filters after sys_watch took responsibility of some bits
112          */
113         uint32_t internal_filter;
114         uint32_t internal_subdir_filter;
115 };
116
117 struct notifyd_peer {
118         struct notifyd_state *state;
119         struct server_id pid;
120         uint64_t rec_index;
121         struct db_context *db;
122         time_t last_broadcast;
123 };
124
125 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
126                                struct messaging_rec **prec,
127                                void *private_data);
128 static bool notifyd_trigger(struct messaging_context *msg_ctx,
129                             struct messaging_rec **prec,
130                             void *private_data);
131 static bool notifyd_get_db(struct messaging_context *msg_ctx,
132                            struct messaging_rec **prec,
133                            void *private_data);
134
135 #ifdef CLUSTER_SUPPORT
136 static bool notifyd_got_db(struct messaging_context *msg_ctx,
137                            struct messaging_rec **prec,
138                            void *private_data);
139 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
140                                      struct server_id src,
141                                      struct messaging_reclog *log);
142 #endif
143 static void notifyd_sys_callback(struct sys_notify_context *ctx,
144                                  void *private_data, struct notify_event *ev,
145                                  uint32_t filter);
146
147 #ifdef CLUSTER_SUPPORT
148 static struct tevent_req *notifyd_broadcast_reclog_send(
149         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
150         struct ctdbd_connection *ctdbd_conn, struct server_id src,
151         struct messaging_reclog *log);
152 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
153
154 static struct tevent_req *notifyd_clean_peers_send(
155         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
156         struct notifyd_state *notifyd);
157 static int notifyd_clean_peers_recv(struct tevent_req *req);
158 #endif
159
160 static int sys_notify_watch_dummy(
161         TALLOC_CTX *mem_ctx,
162         struct sys_notify_context *ctx,
163         const char *path,
164         uint32_t *filter,
165         uint32_t *subdir_filter,
166         void (*callback)(struct sys_notify_context *ctx,
167                          void *private_data,
168                          struct notify_event *ev,
169                          uint32_t filter),
170         void *private_data,
171         void *handle_p)
172 {
173         void **handle = handle_p;
174         *handle = NULL;
175         return 0;
176 }
177
178 static void notifyd_handler_done(struct tevent_req *subreq);
179
180 #ifdef CLUSTER_SUPPORT
181 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
182 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
183 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
184                                    uint64_t dst_srvid,
185                                    const uint8_t *msg, size_t msglen,
186                                    void *private_data);
187 #endif
188
189 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
190                                 struct messaging_context *msg_ctx,
191                                 struct ctdbd_connection *ctdbd_conn,
192                                 sys_notify_watch_fn sys_notify_watch,
193                                 struct sys_notify_context *sys_notify_ctx)
194 {
195         struct tevent_req *req, *subreq;
196         struct notifyd_state *state;
197         struct server_id_db *names_db;
198         int ret;
199
200         req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
201         if (req == NULL) {
202                 return NULL;
203         }
204         state->ev = ev;
205         state->msg_ctx = msg_ctx;
206         state->ctdbd_conn = ctdbd_conn;
207
208         if (sys_notify_watch == NULL) {
209                 sys_notify_watch = sys_notify_watch_dummy;
210         }
211
212         state->sys_notify_watch = sys_notify_watch;
213         state->sys_notify_ctx = sys_notify_ctx;
214
215         state->entries = db_open_rbt(state);
216         if (tevent_req_nomem(state->entries, req)) {
217                 return tevent_req_post(req, ev);
218         }
219
220         subreq = messaging_handler_send(state, ev, msg_ctx,
221                                         MSG_SMB_NOTIFY_REC_CHANGE,
222                                         notifyd_rec_change, state);
223         if (tevent_req_nomem(subreq, req)) {
224                 return tevent_req_post(req, ev);
225         }
226         tevent_req_set_callback(subreq, notifyd_handler_done, req);
227
228         subreq = messaging_handler_send(state, ev, msg_ctx,
229                                         MSG_SMB_NOTIFY_TRIGGER,
230                                         notifyd_trigger, state);
231         if (tevent_req_nomem(subreq, req)) {
232                 return tevent_req_post(req, ev);
233         }
234         tevent_req_set_callback(subreq, notifyd_handler_done, req);
235
236         subreq = messaging_handler_send(state, ev, msg_ctx,
237                                         MSG_SMB_NOTIFY_GET_DB,
238                                         notifyd_get_db, state);
239         if (tevent_req_nomem(subreq, req)) {
240                 return tevent_req_post(req, ev);
241         }
242         tevent_req_set_callback(subreq, notifyd_handler_done, req);
243
244         names_db = messaging_names_db(msg_ctx);
245
246         ret = server_id_db_set_exclusive(names_db, "notify-daemon");
247         if (ret != 0) {
248                 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
249                            __func__, strerror(ret)));
250                 tevent_req_error(req, ret);
251                 return tevent_req_post(req, ev);
252         }
253
254         if (ctdbd_conn == NULL) {
255                 /*
256                  * No cluster around, skip the database replication
257                  * engine
258                  */
259                 return req;
260         }
261
262 #ifdef CLUSTER_SUPPORT
263         subreq = messaging_handler_send(state, ev, msg_ctx,
264                                         MSG_SMB_NOTIFY_DB,
265                                         notifyd_got_db, state);
266         if (tevent_req_nomem(subreq, req)) {
267                 return tevent_req_post(req, ev);
268         }
269         tevent_req_set_callback(subreq, notifyd_handler_done, req);
270
271         state->log = talloc_zero(state, struct messaging_reclog);
272         if (tevent_req_nomem(state->log, req)) {
273                 return tevent_req_post(req, ev);
274         }
275
276         subreq = notifyd_broadcast_reclog_send(
277                 state->log, ev, ctdbd_conn,
278                 messaging_server_id(msg_ctx),
279                 state->log);
280         if (tevent_req_nomem(subreq, req)) {
281                 return tevent_req_post(req, ev);
282         }
283         tevent_req_set_callback(subreq,
284                                 notifyd_broadcast_reclog_finished,
285                                 req);
286
287         subreq = notifyd_clean_peers_send(state, ev, state);
288         if (tevent_req_nomem(subreq, req)) {
289                 return tevent_req_post(req, ev);
290         }
291         tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
292                                 req);
293
294         ret = register_with_ctdbd(ctdbd_conn,
295                                   CTDB_SRVID_SAMBA_NOTIFY_PROXY,
296                                   notifyd_snoop_broadcast, state);
297         if (ret != 0) {
298                 tevent_req_error(req, ret);
299                 return tevent_req_post(req, ev);
300         }
301 #endif
302
303         return req;
304 }
305
306 static void notifyd_handler_done(struct tevent_req *subreq)
307 {
308         struct tevent_req *req = tevent_req_callback_data(
309                 subreq, struct tevent_req);
310         int ret;
311
312         ret = messaging_handler_recv(subreq);
313         TALLOC_FREE(subreq);
314         tevent_req_error(req, ret);
315 }
316
317 #ifdef CLUSTER_SUPPORT
318
319 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
320 {
321         struct tevent_req *req = tevent_req_callback_data(
322                 subreq, struct tevent_req);
323         int ret;
324
325         ret = notifyd_broadcast_reclog_recv(subreq);
326         TALLOC_FREE(subreq);
327         tevent_req_error(req, ret);
328 }
329
330 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
331 {
332         struct tevent_req *req = tevent_req_callback_data(
333                 subreq, struct tevent_req);
334         int ret;
335
336         ret = notifyd_clean_peers_recv(subreq);
337         TALLOC_FREE(subreq);
338         tevent_req_error(req, ret);
339 }
340
341 #endif
342
343 int notifyd_recv(struct tevent_req *req)
344 {
345         return tevent_req_simple_recv_unix(req);
346 }
347
348 /*
349  * Parse an entry in the notifyd_context->entries database
350  */
351
352 static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
353                                 struct notifyd_instance **instances,
354                                 size_t *num_instances)
355 {
356         if ((buflen % sizeof(struct notifyd_instance)) != 0) {
357                 DEBUG(1, ("%s: invalid buffer size: %u\n",
358                           __func__, (unsigned)buflen));
359                 return false;
360         }
361
362         if (instances != NULL) {
363                 *instances = (struct notifyd_instance *)buf;
364         }
365         if (num_instances != NULL) {
366                 *num_instances = buflen / sizeof(struct notifyd_instance);
367         }
368         return true;
369 }
370
371 static bool notifyd_apply_rec_change(
372         const struct server_id *client,
373         const char *path, size_t pathlen,
374         const struct notify_instance *chg,
375         struct db_context *entries,
376         sys_notify_watch_fn sys_notify_watch,
377         struct sys_notify_context *sys_notify_ctx,
378         struct messaging_context *msg_ctx)
379 {
380         struct db_record *rec;
381         struct notifyd_instance *instances;
382         size_t num_instances;
383         size_t i;
384         struct notifyd_instance *instance;
385         TDB_DATA value;
386         NTSTATUS status;
387         bool ok = false;
388
389         if (pathlen == 0) {
390                 DEBUG(1, ("%s: pathlen==0\n", __func__));
391                 return false;
392         }
393         if (path[pathlen-1] != '\0') {
394                 DEBUG(1, ("%s: path not 0-terminated\n", __func__));
395                 return false;
396         }
397
398         DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
399                    "private_data=%p\n", __func__, path,
400                    (unsigned)chg->filter, (unsigned)chg->subdir_filter,
401                    chg->private_data));
402
403         rec = dbwrap_fetch_locked(
404                 entries, entries,
405                 make_tdb_data((const uint8_t *)path, pathlen-1));
406
407         if (rec == NULL) {
408                 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
409                 goto fail;
410         }
411
412         num_instances = 0;
413         value = dbwrap_record_get_value(rec);
414
415         if (value.dsize != 0) {
416                 if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
417                                          &num_instances)) {
418                         goto fail;
419                 }
420         }
421
422         /*
423          * Overallocate by one instance to avoid a realloc when adding
424          */
425         instances = talloc_array(rec, struct notifyd_instance,
426                                  num_instances + 1);
427         if (instances == NULL) {
428                 DEBUG(1, ("%s: talloc failed\n", __func__));
429                 goto fail;
430         }
431
432         if (value.dsize != 0) {
433                 memcpy(instances, value.dptr, value.dsize);
434         }
435
436         for (i=0; i<num_instances; i++) {
437                 instance = &instances[i];
438
439                 if (server_id_equal(&instance->client, client) &&
440                     (instance->instance.private_data == chg->private_data)) {
441                         break;
442                 }
443         }
444
445         if (i < num_instances) {
446                 instance->instance = *chg;
447         } else {
448                 /*
449                  * We've overallocated for one instance
450                  */
451                 instance = &instances[num_instances];
452
453                 *instance = (struct notifyd_instance) {
454                         .client = *client,
455                         .instance = *chg,
456                         .internal_filter = chg->filter,
457                         .internal_subdir_filter = chg->subdir_filter
458                 };
459
460                 num_instances += 1;
461         }
462
463         if ((instance->instance.filter != 0) ||
464             (instance->instance.subdir_filter != 0)) {
465                 int ret;
466
467                 TALLOC_FREE(instance->sys_watch);
468
469                 ret = sys_notify_watch(entries, sys_notify_ctx, path,
470                                        &instance->internal_filter,
471                                        &instance->internal_subdir_filter,
472                                        notifyd_sys_callback, msg_ctx,
473                                        &instance->sys_watch);
474                 if (ret != 0) {
475                         DEBUG(1, ("%s: inotify_watch returned %s\n",
476                                   __func__, strerror(errno)));
477                 }
478         }
479
480         if ((instance->instance.filter == 0) &&
481             (instance->instance.subdir_filter == 0)) {
482                 /* This is a delete request */
483                 TALLOC_FREE(instance->sys_watch);
484                 *instance = instances[num_instances-1];
485                 num_instances -= 1;
486         }
487
488         DEBUG(10, ("%s: %s has %u instances\n", __func__,
489                    path, (unsigned)num_instances));
490
491         if (num_instances == 0) {
492                 status = dbwrap_record_delete(rec);
493                 if (!NT_STATUS_IS_OK(status)) {
494                         DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
495                                   __func__, nt_errstr(status)));
496                         goto fail;
497                 }
498         } else {
499                 value = make_tdb_data(
500                         (uint8_t *)instances,
501                         sizeof(struct notifyd_instance) * num_instances);
502
503                 status = dbwrap_record_store(rec, value, 0);
504                 if (!NT_STATUS_IS_OK(status)) {
505                         DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
506                                   __func__, nt_errstr(status)));
507                         goto fail;
508                 }
509         }
510
511         ok = true;
512 fail:
513         TALLOC_FREE(rec);
514         return ok;
515 }
516
517 static void notifyd_sys_callback(struct sys_notify_context *ctx,
518                                  void *private_data, struct notify_event *ev,
519                                  uint32_t filter)
520 {
521         struct messaging_context *msg_ctx = talloc_get_type_abort(
522                 private_data, struct messaging_context);
523         struct notify_trigger_msg msg;
524         struct iovec iov[4];
525         char slash = '/';
526
527         msg = (struct notify_trigger_msg) {
528                 .when = timespec_current(),
529                 .action = ev->action,
530                 .filter = filter,
531         };
532
533         iov[0].iov_base = &msg;
534         iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
535         iov[1].iov_base = discard_const_p(char, ev->dir);
536         iov[1].iov_len = strlen(ev->dir);
537         iov[2].iov_base = &slash;
538         iov[2].iov_len = 1;
539         iov[3].iov_base = discard_const_p(char, ev->path);
540         iov[3].iov_len = strlen(ev->path)+1;
541
542         messaging_send_iov(
543                 msg_ctx, messaging_server_id(msg_ctx),
544                 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
545 }
546
547 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
548                                      struct notify_rec_change_msg **pmsg,
549                                      size_t *pathlen)
550 {
551         struct notify_rec_change_msg *msg;
552
553         if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
554                 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
555                           (unsigned)bufsize));
556                 return false;
557         }
558
559         *pmsg = msg = (struct notify_rec_change_msg *)buf;
560         *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
561
562         DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
563                    "private_data=%p, path=%.*s\n",
564                    __func__, (unsigned)msg->instance.filter,
565                    (unsigned)msg->instance.subdir_filter,
566                    msg->instance.private_data, (int)(*pathlen), msg->path));
567
568         return true;
569 }
570
571 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
572                                struct messaging_rec **prec,
573                                void *private_data)
574 {
575         struct notifyd_state *state = talloc_get_type_abort(
576                 private_data, struct notifyd_state);
577         struct server_id_buf idbuf;
578         struct messaging_rec *rec = *prec;
579         struct notify_rec_change_msg *msg;
580         size_t pathlen;
581         bool ok;
582
583         DEBUG(10, ("%s: Got %d bytes from %s\n", __func__,
584                    (unsigned)rec->buf.length,
585                    server_id_str_buf(rec->src, &idbuf)));
586
587         ok = notifyd_parse_rec_change(rec->buf.data, rec->buf.length,
588                                       &msg, &pathlen);
589         if (!ok) {
590                 return true;
591         }
592
593         ok = notifyd_apply_rec_change(
594                 &rec->src, msg->path, pathlen, &msg->instance,
595                 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
596                 state->msg_ctx);
597         if (!ok) {
598                 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
599                           __func__));
600                 return true;
601         }
602
603         if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
604                 return true;
605         }
606
607 #ifdef CLUSTER_SUPPORT
608         {
609
610         struct messaging_rec **tmp;
611         struct messaging_reclog *log;
612
613         log = state->log;
614
615         tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
616                              log->num_recs+1);
617         if (tmp == NULL) {
618                 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
619                 return true;
620         }
621         log->recs = tmp;
622
623         log->recs[log->num_recs] = talloc_move(log->recs, prec);
624         log->num_recs += 1;
625
626         if (log->num_recs >= 100) {
627                 /*
628                  * Don't let the log grow too large
629                  */
630                 notifyd_broadcast_reclog(state->ctdbd_conn,
631                                          messaging_server_id(msg_ctx), log);
632         }
633
634         }
635 #endif
636
637         return true;
638 }
639
640 struct notifyd_trigger_state {
641         struct messaging_context *msg_ctx;
642         struct notify_trigger_msg *msg;
643         bool recursive;
644         bool covered_by_sys_notify;
645 };
646
647 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
648                                    void *private_data);
649
650 static bool notifyd_trigger(struct messaging_context *msg_ctx,
651                             struct messaging_rec **prec,
652                             void *private_data)
653 {
654         struct notifyd_state *state = talloc_get_type_abort(
655                 private_data, struct notifyd_state);
656         struct server_id my_id = messaging_server_id(msg_ctx);
657         struct messaging_rec *rec = *prec;
658         struct notifyd_trigger_state tstate;
659         const char *path;
660         const char *p, *next_p;
661
662         if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) {
663                 DEBUG(1, ("message too short, ignoring: %u\n",
664                           (unsigned)rec->buf.length));
665                 return true;
666         }
667         if (rec->buf.data[rec->buf.length-1] != 0) {
668                 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
669                 return true;
670         }
671
672         tstate.msg_ctx = msg_ctx;
673
674         tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn);
675         tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id);
676
677         tstate.msg = (struct notify_trigger_msg *)rec->buf.data;
678         path = tstate.msg->path;
679
680         DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
681                    __func__, (unsigned)tstate.msg->action,
682                    (unsigned)tstate.msg->filter, path));
683
684         if (path[0] != '/') {
685                 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
686                           __func__, path));
687                 return true;
688         }
689
690         for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
691                 ptrdiff_t path_len = p - path;
692                 TDB_DATA key;
693                 uint32_t i;
694
695                 next_p = strchr(p+1, '/');
696                 tstate.recursive = (next_p != NULL);
697
698                 DEBUG(10, ("%s: Trying path %.*s\n", __func__,
699                            (int)path_len, path));
700
701                 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
702                                    .dsize = path_len };
703
704                 dbwrap_parse_record(state->entries, key,
705                                     notifyd_trigger_parser, &tstate);
706
707                 if (state->peers == NULL) {
708                         continue;
709                 }
710
711                 if (rec->src.vnn != my_id.vnn) {
712                         continue;
713                 }
714
715                 for (i=0; i<state->num_peers; i++) {
716                         if (state->peers[i]->db == NULL) {
717                                 /*
718                                  * Inactive peer, did not get a db yet
719                                  */
720                                 continue;
721                         }
722                         dbwrap_parse_record(state->peers[i]->db, key,
723                                             notifyd_trigger_parser, &tstate);
724                 }
725         }
726
727         return true;
728 }
729
730 static void notifyd_send_delete(struct messaging_context *msg_ctx,
731                                 TDB_DATA key,
732                                 struct notifyd_instance *instance);
733
734 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
735                                    void *private_data)
736
737 {
738         struct notifyd_trigger_state *tstate = private_data;
739         struct notify_event_msg msg = { .action = tstate->msg->action,
740                                         .when = tstate->msg->when };
741         struct iovec iov[2];
742         size_t path_len = key.dsize;
743         struct notifyd_instance *instances = NULL;
744         size_t num_instances = 0;
745         size_t i;
746
747         if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
748                                  &num_instances)) {
749                 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
750                 return;
751         }
752
753         DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
754                    (unsigned)num_instances, (int)key.dsize,
755                    (char *)key.dptr));
756
757         iov[0].iov_base = &msg;
758         iov[0].iov_len = offsetof(struct notify_event_msg, path);
759         iov[1].iov_base = tstate->msg->path + path_len + 1;
760         iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
761
762         for (i=0; i<num_instances; i++) {
763                 struct notifyd_instance *instance = &instances[i];
764                 struct server_id_buf idbuf;
765                 uint32_t i_filter;
766                 NTSTATUS status;
767
768                 if (tstate->covered_by_sys_notify) {
769                         if (tstate->recursive) {
770                                 i_filter = instance->internal_subdir_filter;
771                         } else {
772                                 i_filter = instance->internal_filter;
773                         }
774                 } else {
775                         if (tstate->recursive) {
776                                 i_filter = instance->instance.subdir_filter;
777                         } else {
778                                 i_filter = instance->instance.filter;
779                         }
780                 }
781
782                 if ((i_filter & tstate->msg->filter) == 0) {
783                         continue;
784                 }
785
786                 msg.private_data = instance->instance.private_data;
787
788                 status = messaging_send_iov(
789                         tstate->msg_ctx, instance->client,
790                         MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
791
792                 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
793                            __func__,
794                            server_id_str_buf(instance->client, &idbuf),
795                            nt_errstr(status)));
796
797                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
798                     procid_is_local(&instance->client)) {
799                         /*
800                          * That process has died
801                          */
802                         notifyd_send_delete(tstate->msg_ctx, key, instance);
803                         continue;
804                 }
805
806                 if (!NT_STATUS_IS_OK(status)) {
807                         DEBUG(1, ("%s: messaging_send_iov returned %s\n",
808                                   __func__, nt_errstr(status)));
809                 }
810         }
811 }
812
813 /*
814  * Send a delete request to ourselves to properly discard a notify
815  * record for an smbd that has died.
816  */
817
818 static void notifyd_send_delete(struct messaging_context *msg_ctx,
819                                 TDB_DATA key,
820                                 struct notifyd_instance *instance)
821 {
822         struct notify_rec_change_msg msg = {
823                 .instance.private_data = instance->instance.private_data
824         };
825         uint8_t nul = 0;
826         struct iovec iov[3];
827         int ret;
828
829         /*
830          * Send a rec_change to ourselves to delete a dead entry
831          */
832
833         iov[0] = (struct iovec) {
834                 .iov_base = &msg,
835                 .iov_len = offsetof(struct notify_rec_change_msg, path) };
836         iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
837         iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
838
839         ret = messaging_send_iov_from(
840                 msg_ctx, instance->client, messaging_server_id(msg_ctx),
841                 MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
842
843         if (ret != 0) {
844                 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
845                            __func__, strerror(ret)));
846         }
847 }
848
849 static bool notifyd_get_db(struct messaging_context *msg_ctx,
850                            struct messaging_rec **prec,
851                            void *private_data)
852 {
853         struct notifyd_state *state = talloc_get_type_abort(
854                 private_data, struct notifyd_state);
855         struct messaging_rec *rec = *prec;
856         struct server_id_buf id1, id2;
857         NTSTATUS status;
858         uint64_t rec_index = UINT64_MAX;
859         uint8_t index_buf[sizeof(uint64_t)];
860         size_t dbsize;
861         uint8_t *buf;
862         struct iovec iov[2];
863
864         dbsize = dbwrap_marshall(state->entries, NULL, 0);
865
866         buf = talloc_array(rec, uint8_t, dbsize);
867         if (buf == NULL) {
868                 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
869                           __func__, (uintmax_t)dbsize));
870                 return true;
871         }
872
873         dbsize = dbwrap_marshall(state->entries, buf, dbsize);
874
875         if (dbsize != talloc_get_size(buf)) {
876                 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
877                           (uintmax_t)talloc_get_size(buf),
878                           (uintmax_t)dbsize));
879                 TALLOC_FREE(buf);
880                 return true;
881         }
882
883         if (state->log != NULL) {
884                 rec_index = state->log->rec_index;
885         }
886         SBVAL(index_buf, 0, rec_index);
887
888         iov[0] = (struct iovec) { .iov_base = index_buf,
889                                   .iov_len = sizeof(index_buf) };
890         iov[1] = (struct iovec) { .iov_base = buf,
891                                   .iov_len = dbsize };
892
893         DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
894                    (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
895                    server_id_str_buf(messaging_server_id(msg_ctx), &id1),
896                    server_id_str_buf(rec->src, &id2)));
897
898         status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
899                                     iov, ARRAY_SIZE(iov), NULL, 0);
900         TALLOC_FREE(buf);
901         if (!NT_STATUS_IS_OK(status)) {
902                 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
903                           __func__, nt_errstr(status)));
904         }
905
906         return true;
907 }
908
909 #ifdef CLUSTER_SUPPORT
910
911 static int notifyd_add_proxy_syswatches(struct db_record *rec,
912                                         void *private_data);
913
914 static bool notifyd_got_db(struct messaging_context *msg_ctx,
915                            struct messaging_rec **prec,
916                            void *private_data)
917 {
918         struct notifyd_state *state = talloc_get_type_abort(
919                 private_data, struct notifyd_state);
920         struct messaging_rec *rec = *prec;
921         struct notifyd_peer *p = NULL;
922         struct server_id_buf idbuf;
923         NTSTATUS status;
924         int count;
925         size_t i;
926
927         for (i=0; i<state->num_peers; i++) {
928                 if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
929                         p = state->peers[i];
930                         break;
931                 }
932         }
933
934         if (p == NULL) {
935                 DEBUG(10, ("%s: Did not find peer for db from %s\n",
936                            __func__, server_id_str_buf(rec->src, &idbuf)));
937                 return true;
938         }
939
940         if (rec->buf.length < 8) {
941                 DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
942                            (unsigned)rec->buf.length,
943                            server_id_str_buf(rec->src, &idbuf)));
944                 TALLOC_FREE(p);
945                 return true;
946         }
947
948         p->rec_index = BVAL(rec->buf.data, 0);
949
950         p->db = db_open_rbt(p);
951         if (p->db == NULL) {
952                 DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
953                 TALLOC_FREE(p);
954                 return true;
955         }
956
957         status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
958                                    rec->buf.length - 8);
959         if (!NT_STATUS_IS_OK(status)) {
960                 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
961                            __func__, nt_errstr(status),
962                            server_id_str_buf(rec->src, &idbuf)));
963                 TALLOC_FREE(p);
964                 return true;
965         }
966
967         dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
968                              &count);
969
970         DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
971                    server_id_str_buf(rec->src, &idbuf), count));
972
973         return true;
974 }
975
976 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
977                                      struct server_id src,
978                                      struct messaging_reclog *log)
979 {
980         enum ndr_err_code ndr_err;
981         uint8_t msghdr[MESSAGE_HDR_LENGTH];
982         DATA_BLOB blob;
983         struct iovec iov[2];
984         int ret;
985
986         if (log == NULL) {
987                 return;
988         }
989
990         DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
991                    (uintmax_t)log->rec_index, (unsigned)log->num_recs));
992
993         message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
994                         (struct server_id) {0 });
995         iov[0] = (struct iovec) { .iov_base = msghdr,
996                                   .iov_len = sizeof(msghdr) };
997
998         ndr_err = ndr_push_struct_blob(
999                 &blob, log, log,
1000                 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
1001         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1002                 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
1003                           __func__, ndr_errstr(ndr_err)));
1004                 goto done;
1005         }
1006         iov[1] = (struct iovec) { .iov_base = blob.data,
1007                                   .iov_len = blob.length };
1008
1009         ret = ctdbd_messaging_send_iov(
1010                 ctdbd_conn, CTDB_BROADCAST_VNNMAP,
1011                 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
1012         TALLOC_FREE(blob.data);
1013         if (ret != 0) {
1014                 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
1015                           __func__, strerror(ret)));
1016                 goto done;
1017         }
1018
1019         log->rec_index += 1;
1020
1021 done:
1022         log->num_recs = 0;
1023         TALLOC_FREE(log->recs);
1024 }
1025
1026 struct notifyd_broadcast_reclog_state {
1027         struct tevent_context *ev;
1028         struct ctdbd_connection *ctdbd_conn;
1029         struct server_id src;
1030         struct messaging_reclog *log;
1031 };
1032
1033 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1034
1035 static struct tevent_req *notifyd_broadcast_reclog_send(
1036         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1037         struct ctdbd_connection *ctdbd_conn, struct server_id src,
1038         struct messaging_reclog *log)
1039 {
1040         struct tevent_req *req, *subreq;
1041         struct notifyd_broadcast_reclog_state *state;
1042
1043         req = tevent_req_create(mem_ctx, &state,
1044                                 struct notifyd_broadcast_reclog_state);
1045         if (req == NULL) {
1046                 return NULL;
1047         }
1048         state->ev = ev;
1049         state->ctdbd_conn = ctdbd_conn;
1050         state->src = src;
1051         state->log = log;
1052
1053         subreq = tevent_wakeup_send(state, state->ev,
1054                                     timeval_current_ofs_msec(1000));
1055         if (tevent_req_nomem(subreq, req)) {
1056                 return tevent_req_post(req, ev);
1057         }
1058         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1059         return req;
1060 }
1061
1062 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1063 {
1064         struct tevent_req *req = tevent_req_callback_data(
1065                 subreq, struct tevent_req);
1066         struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1067                 req, struct notifyd_broadcast_reclog_state);
1068         bool ok;
1069
1070         ok = tevent_wakeup_recv(subreq);
1071         TALLOC_FREE(subreq);
1072         if (!ok) {
1073                 tevent_req_oom(req);
1074                 return;
1075         }
1076
1077         notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1078
1079         subreq = tevent_wakeup_send(state, state->ev,
1080                                     timeval_current_ofs_msec(1000));
1081         if (tevent_req_nomem(subreq, req)) {
1082                 return;
1083         }
1084         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1085 }
1086
1087 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1088 {
1089         return tevent_req_simple_recv_unix(req);
1090 }
1091
1092 struct notifyd_clean_peers_state {
1093         struct tevent_context *ev;
1094         struct notifyd_state *notifyd;
1095 };
1096
1097 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1098
1099 static struct tevent_req *notifyd_clean_peers_send(
1100         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1101         struct notifyd_state *notifyd)
1102 {
1103         struct tevent_req *req, *subreq;
1104         struct notifyd_clean_peers_state *state;
1105
1106         req = tevent_req_create(mem_ctx, &state,
1107                                 struct notifyd_clean_peers_state);
1108         if (req == NULL) {
1109                 return NULL;
1110         }
1111         state->ev = ev;
1112         state->notifyd = notifyd;
1113
1114         subreq = tevent_wakeup_send(state, state->ev,
1115                                     timeval_current_ofs_msec(30000));
1116         if (tevent_req_nomem(subreq, req)) {
1117                 return tevent_req_post(req, ev);
1118         }
1119         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1120         return req;
1121 }
1122
1123 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1124 {
1125         struct tevent_req *req = tevent_req_callback_data(
1126                 subreq, struct tevent_req);
1127         struct notifyd_clean_peers_state *state = tevent_req_data(
1128                 req, struct notifyd_clean_peers_state);
1129         struct notifyd_state *notifyd = state->notifyd;
1130         size_t i;
1131         bool ok;
1132         time_t now = time(NULL);
1133
1134         ok = tevent_wakeup_recv(subreq);
1135         TALLOC_FREE(subreq);
1136         if (!ok) {
1137                 tevent_req_oom(req);
1138                 return;
1139         }
1140
1141         i = 0;
1142         while (i < notifyd->num_peers) {
1143                 struct notifyd_peer *p = notifyd->peers[i];
1144
1145                 if ((now - p->last_broadcast) > 60) {
1146                         struct server_id_buf idbuf;
1147
1148                         /*
1149                          * Haven't heard for more than 60 seconds. Call this
1150                          * peer dead
1151                          */
1152
1153                         DEBUG(10, ("%s: peer %s died\n", __func__,
1154                                    server_id_str_buf(p->pid, &idbuf)));
1155                         /*
1156                          * This implicitly decrements notifyd->num_peers
1157                          */
1158                         TALLOC_FREE(p);
1159                 } else {
1160                         i += 1;
1161                 }
1162         }
1163
1164         subreq = tevent_wakeup_send(state, state->ev,
1165                                     timeval_current_ofs_msec(30000));
1166         if (tevent_req_nomem(subreq, req)) {
1167                 return;
1168         }
1169         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1170 }
1171
1172 static int notifyd_clean_peers_recv(struct tevent_req *req)
1173 {
1174         return tevent_req_simple_recv_unix(req);
1175 }
1176
1177 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1178                                         void *private_data)
1179 {
1180         struct notifyd_state *state = talloc_get_type_abort(
1181                 private_data, struct notifyd_state);
1182         struct db_context *db = dbwrap_record_get_db(rec);
1183         TDB_DATA key = dbwrap_record_get_key(rec);
1184         TDB_DATA value = dbwrap_record_get_value(rec);
1185         struct notifyd_instance *instances = NULL;
1186         size_t num_instances = 0;
1187         size_t i;
1188         char path[key.dsize+1];
1189         bool ok;
1190
1191         memcpy(path, key.dptr, key.dsize);
1192         path[key.dsize] = '\0';
1193
1194         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1195                                  &num_instances);
1196         if (!ok) {
1197                 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1198                           __func__, path));
1199                 return 0;
1200         }
1201
1202         for (i=0; i<num_instances; i++) {
1203                 struct notifyd_instance *instance = &instances[i];
1204                 uint32_t filter = instance->instance.filter;
1205                 uint32_t subdir_filter = instance->instance.subdir_filter;
1206                 int ret;
1207
1208                 /*
1209                  * This is a remote database. Pointers that we were
1210                  * given don't make sense locally. Initialize to NULL
1211                  * in case sys_notify_watch fails.
1212                  */
1213                 instances[i].sys_watch = NULL;
1214
1215                 ret = state->sys_notify_watch(
1216                         db, state->sys_notify_ctx, path,
1217                         &filter, &subdir_filter,
1218                         notifyd_sys_callback, state->msg_ctx,
1219                         &instance->sys_watch);
1220                 if (ret != 0) {
1221                         DEBUG(1, ("%s: inotify_watch returned %s\n",
1222                                   __func__, strerror(errno)));
1223                 }
1224         }
1225
1226         return 0;
1227 }
1228
1229 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1230 {
1231         TDB_DATA key = dbwrap_record_get_key(rec);
1232         TDB_DATA value = dbwrap_record_get_value(rec);
1233         struct notifyd_instance *instances = NULL;
1234         size_t num_instances = 0;
1235         size_t i;
1236         bool ok;
1237
1238         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1239                                  &num_instances);
1240         if (!ok) {
1241                 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1242                           __func__, (int)key.dsize, (char *)key.dptr));
1243                 return 0;
1244         }
1245         for (i=0; i<num_instances; i++) {
1246                 TALLOC_FREE(instances[i].sys_watch);
1247         }
1248         return 0;
1249 }
1250
1251 static int notifyd_peer_destructor(struct notifyd_peer *p)
1252 {
1253         struct notifyd_state *state = p->state;
1254         size_t i;
1255
1256         if (p->db != NULL) {
1257                 dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1258                                      NULL, NULL);
1259         }
1260
1261         for (i = 0; i<state->num_peers; i++) {
1262                 if (p == state->peers[i]) {
1263                         state->peers[i] = state->peers[state->num_peers-1];
1264                         state->num_peers -= 1;
1265                         break;
1266                 }
1267         }
1268         return 0;
1269 }
1270
1271 static struct notifyd_peer *notifyd_peer_new(
1272         struct notifyd_state *state, struct server_id pid)
1273 {
1274         struct notifyd_peer *p, **tmp;
1275
1276         tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1277                              state->num_peers+1);
1278         if (tmp == NULL) {
1279                 return NULL;
1280         }
1281         state->peers = tmp;
1282
1283         p = talloc_zero(state->peers, struct notifyd_peer);
1284         if (p == NULL) {
1285                 return NULL;
1286         }
1287         p->state = state;
1288         p->pid = pid;
1289
1290         state->peers[state->num_peers] = p;
1291         state->num_peers += 1;
1292
1293         talloc_set_destructor(p, notifyd_peer_destructor);
1294
1295         return p;
1296 }
1297
1298 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1299                                  const uint8_t *msg, size_t msglen)
1300 {
1301         struct notifyd_state *state = peer->state;
1302         DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1303                            .length = msglen };
1304         struct server_id_buf idbuf;
1305         struct messaging_reclog *log;
1306         enum ndr_err_code ndr_err;
1307         uint32_t i;
1308
1309         if (peer->db == NULL) {
1310                 /*
1311                  * No db yet
1312                  */
1313                 return;
1314         }
1315
1316         log = talloc(peer, struct messaging_reclog);
1317         if (log == NULL) {
1318                 DEBUG(10, ("%s: talloc failed\n", __func__));
1319                 return;
1320         }
1321
1322         ndr_err = ndr_pull_struct_blob_all(
1323                 &blob, log, log,
1324                 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1325         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1326                 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1327                            __func__, ndr_errstr(ndr_err)));
1328                 goto fail;
1329         }
1330
1331         DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
1332                    (unsigned)log->num_recs, (uintmax_t)log->rec_index,
1333                    server_id_str_buf(peer->pid, &idbuf)));
1334
1335         if (log->rec_index != peer->rec_index) {
1336                 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1337                           __func__, (uintmax_t)log->rec_index,
1338                           server_id_str_buf(peer->pid, &idbuf),
1339                           (uintmax_t)peer->rec_index));
1340                 goto fail;
1341         }
1342
1343         for (i=0; i<log->num_recs; i++) {
1344                 struct messaging_rec *r = log->recs[i];
1345                 struct notify_rec_change_msg *chg;
1346                 size_t pathlen;
1347                 bool ok;
1348
1349                 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1350                                               &chg, &pathlen);
1351                 if (!ok) {
1352                         DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1353                                   __func__));
1354                         goto fail;
1355                 }
1356
1357                 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1358                                               &chg->instance, peer->db,
1359                                               state->sys_notify_watch,
1360                                               state->sys_notify_ctx,
1361                                               state->msg_ctx);
1362                 if (!ok) {
1363                         DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1364                                   __func__));
1365                         goto fail;
1366                 }
1367         }
1368
1369         peer->rec_index += 1;
1370         peer->last_broadcast = time(NULL);
1371
1372         TALLOC_FREE(log);
1373         return;
1374
1375 fail:
1376         DEBUG(10, ("%s: Dropping peer %s\n", __func__,
1377                    server_id_str_buf(peer->pid, &idbuf)));
1378         TALLOC_FREE(peer);
1379 }
1380
1381 /*
1382  * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1383  * messages) broadcasts by other notifyds. Several cases:
1384  *
1385  * We don't know the source. This creates a new peer. Creating a peer
1386  * involves asking the peer for its full database. We assume ordered
1387  * messages, so the new database will arrive before the next broadcast
1388  * will.
1389  *
1390  * We know the source and the log index matches. We will apply the log
1391  * locally to our peer's db as if we had received it from a local
1392  * client.
1393  *
1394  * We know the source but the log index does not match. This means we
1395  * lost a message. We just drop the whole peer and wait for the next
1396  * broadcast, which will then trigger a fresh database pull.
1397  */
1398
1399 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
1400                                    uint64_t dst_srvid,
1401                                    const uint8_t *msg, size_t msglen,
1402                                    void *private_data)
1403 {
1404         struct notifyd_state *state = talloc_get_type_abort(
1405                 private_data, struct notifyd_state);
1406         struct server_id my_id = messaging_server_id(state->msg_ctx);
1407         struct notifyd_peer *p;
1408         uint32_t i;
1409         uint32_t msg_type;
1410         struct server_id src, dst;
1411         struct server_id_buf idbuf;
1412         NTSTATUS status;
1413
1414         if (msglen < MESSAGE_HDR_LENGTH) {
1415                 DEBUG(10, ("%s: Got short broadcast\n", __func__));
1416                 return 0;
1417         }
1418         message_hdr_get(&msg_type, &src, &dst, msg);
1419
1420         if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1421                 DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
1422                            (unsigned)msg_type));
1423                 return 0;
1424         }
1425         if (server_id_equal(&src, &my_id)) {
1426                 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
1427                 return 0;
1428         }
1429
1430         DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1431                    __func__, server_id_str_buf(src, &idbuf)));
1432
1433         for (i=0; i<state->num_peers; i++) {
1434                 if (server_id_equal(&state->peers[i]->pid, &src)) {
1435
1436                         DEBUG(10, ("%s: Applying changes to peer %u\n",
1437                                    __func__, (unsigned)i));
1438
1439                         notifyd_apply_reclog(state->peers[i],
1440                                              msg + MESSAGE_HDR_LENGTH,
1441                                              msglen - MESSAGE_HDR_LENGTH);
1442                         return 0;
1443                 }
1444         }
1445
1446         DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
1447                    server_id_str_buf(src, &idbuf)));
1448
1449         p = notifyd_peer_new(state, src);
1450         if (p == NULL) {
1451                 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
1452                 return 0;
1453         }
1454
1455         status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1456                                     NULL, 0);
1457         if (!NT_STATUS_IS_OK(status)) {
1458                 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1459                            __func__, nt_errstr(status)));
1460                 TALLOC_FREE(p);
1461                 return 0;
1462         }
1463
1464         return 0;
1465 }
1466 #endif
1467
1468 struct notifyd_parse_db_state {
1469         bool (*fn)(const char *path,
1470                    struct server_id server,
1471                    const struct notify_instance *instance,
1472                    void *private_data);
1473         void *private_data;
1474 };
1475
1476 static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
1477                                     void *private_data)
1478 {
1479         struct notifyd_parse_db_state *state = private_data;
1480         char path[key.dsize+1];
1481         struct notifyd_instance *instances = NULL;
1482         size_t num_instances = 0;
1483         size_t i;
1484         bool ok;
1485
1486         memcpy(path, key.dptr, key.dsize);
1487         path[key.dsize] = 0;
1488
1489         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1490                                  &num_instances);
1491         if (!ok) {
1492                 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1493                            __func__, path));
1494                 return true;
1495         }
1496
1497         for (i=0; i<num_instances; i++) {
1498                 ok = state->fn(path, instances[i].client,
1499                                &instances[i].instance,
1500                                state->private_data);
1501                 if (!ok) {
1502                         return false;
1503                 }
1504         }
1505
1506         return true;
1507 }
1508
1509 int notifyd_parse_db(const uint8_t *buf, size_t buflen,
1510                      uint64_t *log_index,
1511                      bool (*fn)(const char *path,
1512                                 struct server_id server,
1513                                 const struct notify_instance *instance,
1514                                 void *private_data),
1515                      void *private_data)
1516 {
1517         struct notifyd_parse_db_state state = {
1518                 .fn = fn, .private_data = private_data
1519         };
1520         NTSTATUS status;
1521
1522         if (buflen < 8) {
1523                 return EINVAL;
1524         }
1525         *log_index = BVAL(buf, 0);
1526
1527         buf += 8;
1528         buflen -= 8;
1529
1530         status = dbwrap_parse_marshall_buf(
1531                 buf, buflen, notifyd_parse_db_parser, &state);
1532         if (!NT_STATUS_IS_OK(status)) {
1533                 return map_errno_from_nt_status(status);
1534         }
1535
1536         return 0;
1537 }