notifyd: Use messaging_register for MSG_SMB_NOTIFY_TRIGGER
[bbaumbach/samba-autobuild/.git] / source3 / smbd / notifyd / notifyd.c
1 /*
2  * Unix SMB/CIFS implementation.
3  *
4  * Copyright (C) Volker Lendecke 2014
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include <tevent.h>
22 #include "lib/util/server_id.h"
23 #include "lib/util/data_blob.h"
24 #include "librpc/gen_ndr/notify.h"
25 #include "librpc/gen_ndr/messaging.h"
26 #include "librpc/gen_ndr/server_id.h"
27 #include "lib/dbwrap/dbwrap.h"
28 #include "lib/dbwrap/dbwrap_rbt.h"
29 #include "messages.h"
30 #include "tdb.h"
31 #include "util_tdb.h"
32 #include "notifyd.h"
33 #include "lib/util/server_id_db.h"
34 #include "lib/util/tevent_unix.h"
35 #include "lib/util/tevent_ntstatus.h"
36 #include "ctdbd_conn.h"
37 #include "ctdb_srvids.h"
38 #include "server_id_db_util.h"
39 #include "lib/util/iov_buf.h"
40 #include "messages_util.h"
41
42 #ifdef CLUSTER_SUPPORT
43 #include "ctdb_protocol.h"
44 #endif
45
46 struct notifyd_peer;
47
48 /*
49  * All of notifyd's state
50  */
51
52 struct notifyd_state {
53         struct tevent_context *ev;
54         struct messaging_context *msg_ctx;
55         struct ctdbd_connection *ctdbd_conn;
56
57         /*
58          * Database of everything clients show interest in. Indexed by
59          * absolute path. The database keys are not 0-terminated
60          * because the criticial operation, notifyd_trigger, can walk
61          * the structure from the top without adding intermediate 0s.
62          * The database records contain an array of
63          *
64          * struct notifyd_instance
65          *
66          * to be maintained and parsed by notifyd_entry_parse()
67          */
68         struct db_context *entries;
69
70         /*
71          * In the cluster case, this is the place where we store a log
72          * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
73          * forward them to our peer notifyd's in the cluster once a
74          * second or when the log grows too large.
75          */
76
77         struct messaging_reclog *log;
78
79         /*
80          * Array of companion notifyd's in a cluster. Every notifyd
81          * broadcasts its messaging_reclog to every other notifyd in
82          * the cluster. This is done by making ctdb send a message to
83          * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
84          * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
85          * had called register_with_ctdbd this srvid will receive the
86          * broadcasts.
87          *
88          * Database replication happens via these broadcasts. Also,
89          * they serve as liveness indication. If a notifyd receives a
90          * broadcast from an unknown peer, it will create one for this
91          * srvid. Also when we don't hear anything from a peer for a
92          * while, we will discard it.
93          */
94
95         struct notifyd_peer **peers;
96         size_t num_peers;
97
98         sys_notify_watch_fn sys_notify_watch;
99         struct sys_notify_context *sys_notify_ctx;
100 };
101
102 /*
103  * notifyd's representation of a notify instance
104  */
105 struct notifyd_instance {
106         struct server_id client;
107         struct notify_instance instance;
108
109         void *sys_watch; /* inotify/fam/etc handle */
110
111         /*
112          * Filters after sys_watch took responsibility of some bits
113          */
114         uint32_t internal_filter;
115         uint32_t internal_subdir_filter;
116 };
117
118 struct notifyd_peer {
119         struct notifyd_state *state;
120         struct server_id pid;
121         uint64_t rec_index;
122         struct db_context *db;
123         time_t last_broadcast;
124 };
125
126 static void notifyd_rec_change(struct messaging_context *msg_ctx,
127                                void *private_data, uint32_t msg_type,
128                                struct server_id src, DATA_BLOB *data);
129 static void notifyd_trigger(struct messaging_context *msg_ctx,
130                             void *private_data, uint32_t msg_type,
131                             struct server_id src, DATA_BLOB *data);
132 static bool notifyd_get_db(struct messaging_context *msg_ctx,
133                            struct messaging_rec **prec,
134                            void *private_data);
135
136 #ifdef CLUSTER_SUPPORT
137 static bool notifyd_got_db(struct messaging_context *msg_ctx,
138                            struct messaging_rec **prec,
139                            void *private_data);
140 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
141                                      struct server_id src,
142                                      struct messaging_reclog *log);
143 #endif
144 static void notifyd_sys_callback(struct sys_notify_context *ctx,
145                                  void *private_data, struct notify_event *ev,
146                                  uint32_t filter);
147
148 #ifdef CLUSTER_SUPPORT
149 static struct tevent_req *notifyd_broadcast_reclog_send(
150         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
151         struct ctdbd_connection *ctdbd_conn, struct server_id src,
152         struct messaging_reclog *log);
153 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
154
155 static struct tevent_req *notifyd_clean_peers_send(
156         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
157         struct notifyd_state *notifyd);
158 static int notifyd_clean_peers_recv(struct tevent_req *req);
159 #endif
160
161 static int sys_notify_watch_dummy(
162         TALLOC_CTX *mem_ctx,
163         struct sys_notify_context *ctx,
164         const char *path,
165         uint32_t *filter,
166         uint32_t *subdir_filter,
167         void (*callback)(struct sys_notify_context *ctx,
168                          void *private_data,
169                          struct notify_event *ev,
170                          uint32_t filter),
171         void *private_data,
172         void *handle_p)
173 {
174         void **handle = handle_p;
175         *handle = NULL;
176         return 0;
177 }
178
179 static void notifyd_handler_done(struct tevent_req *subreq);
180
181 #ifdef CLUSTER_SUPPORT
182 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
183 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
184 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
185                                    uint64_t dst_srvid,
186                                    const uint8_t *msg, size_t msglen,
187                                    void *private_data);
188 #endif
189
190 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
191                                 struct messaging_context *msg_ctx,
192                                 struct ctdbd_connection *ctdbd_conn,
193                                 sys_notify_watch_fn sys_notify_watch,
194                                 struct sys_notify_context *sys_notify_ctx)
195 {
196         struct tevent_req *req, *subreq;
197         struct notifyd_state *state;
198         struct server_id_db *names_db;
199         NTSTATUS status;
200         int ret;
201
202         req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
203         if (req == NULL) {
204                 return NULL;
205         }
206         state->ev = ev;
207         state->msg_ctx = msg_ctx;
208         state->ctdbd_conn = ctdbd_conn;
209
210         if (sys_notify_watch == NULL) {
211                 sys_notify_watch = sys_notify_watch_dummy;
212         }
213
214         state->sys_notify_watch = sys_notify_watch;
215         state->sys_notify_ctx = sys_notify_ctx;
216
217         state->entries = db_open_rbt(state);
218         if (tevent_req_nomem(state->entries, req)) {
219                 return tevent_req_post(req, ev);
220         }
221
222         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
223                                     notifyd_rec_change);
224         if (tevent_req_nterror(req, status)) {
225                 return tevent_req_post(req, ev);
226         }
227
228         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER,
229                                     notifyd_trigger);
230         if (tevent_req_nterror(req, status)) {
231                 goto deregister_rec_change;
232         }
233
234         subreq = messaging_handler_send(state, ev, msg_ctx,
235                                         MSG_SMB_NOTIFY_GET_DB,
236                                         notifyd_get_db, state);
237         if (tevent_req_nomem(subreq, req)) {
238                 goto deregister_trigger;
239         }
240         tevent_req_set_callback(subreq, notifyd_handler_done, req);
241
242         names_db = messaging_names_db(msg_ctx);
243
244         ret = server_id_db_set_exclusive(names_db, "notify-daemon");
245         if (ret != 0) {
246                 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
247                            __func__, strerror(ret)));
248                 tevent_req_error(req, ret);
249                 goto deregister_trigger;
250         }
251
252         if (ctdbd_conn == NULL) {
253                 /*
254                  * No cluster around, skip the database replication
255                  * engine
256                  */
257                 return req;
258         }
259
260 #ifdef CLUSTER_SUPPORT
261         subreq = messaging_handler_send(state, ev, msg_ctx,
262                                         MSG_SMB_NOTIFY_DB,
263                                         notifyd_got_db, state);
264         if (tevent_req_nomem(subreq, req)) {
265                 goto deregister_trigger;
266         }
267         tevent_req_set_callback(subreq, notifyd_handler_done, req);
268
269         state->log = talloc_zero(state, struct messaging_reclog);
270         if (tevent_req_nomem(state->log, req)) {
271                 goto deregister_trigger;
272         }
273
274         subreq = notifyd_broadcast_reclog_send(
275                 state->log, ev, ctdbd_conn,
276                 messaging_server_id(msg_ctx),
277                 state->log);
278         if (tevent_req_nomem(subreq, req)) {
279                 goto deregister_trigger;
280         }
281         tevent_req_set_callback(subreq,
282                                 notifyd_broadcast_reclog_finished,
283                                 req);
284
285         subreq = notifyd_clean_peers_send(state, ev, state);
286         if (tevent_req_nomem(subreq, req)) {
287                 goto deregister_trigger;
288         }
289         tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
290                                 req);
291
292         ret = register_with_ctdbd(ctdbd_conn,
293                                   CTDB_SRVID_SAMBA_NOTIFY_PROXY,
294                                   notifyd_snoop_broadcast, state);
295         if (ret != 0) {
296                 tevent_req_error(req, ret);
297                 goto deregister_trigger;
298         }
299 #endif
300
301         return req;
302
303 deregister_trigger:
304         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state);
305 deregister_rec_change:
306         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
307         return tevent_req_post(req, ev);
308 }
309
310 static void notifyd_handler_done(struct tevent_req *subreq)
311 {
312         struct tevent_req *req = tevent_req_callback_data(
313                 subreq, struct tevent_req);
314         int ret;
315
316         ret = messaging_handler_recv(subreq);
317         TALLOC_FREE(subreq);
318         tevent_req_error(req, ret);
319 }
320
321 #ifdef CLUSTER_SUPPORT
322
323 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
324 {
325         struct tevent_req *req = tevent_req_callback_data(
326                 subreq, struct tevent_req);
327         int ret;
328
329         ret = notifyd_broadcast_reclog_recv(subreq);
330         TALLOC_FREE(subreq);
331         tevent_req_error(req, ret);
332 }
333
334 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
335 {
336         struct tevent_req *req = tevent_req_callback_data(
337                 subreq, struct tevent_req);
338         int ret;
339
340         ret = notifyd_clean_peers_recv(subreq);
341         TALLOC_FREE(subreq);
342         tevent_req_error(req, ret);
343 }
344
345 #endif
346
347 int notifyd_recv(struct tevent_req *req)
348 {
349         return tevent_req_simple_recv_unix(req);
350 }
351
352 /*
353  * Parse an entry in the notifyd_context->entries database
354  */
355
356 static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
357                                 struct notifyd_instance **instances,
358                                 size_t *num_instances)
359 {
360         if ((buflen % sizeof(struct notifyd_instance)) != 0) {
361                 DEBUG(1, ("%s: invalid buffer size: %u\n",
362                           __func__, (unsigned)buflen));
363                 return false;
364         }
365
366         if (instances != NULL) {
367                 *instances = (struct notifyd_instance *)buf;
368         }
369         if (num_instances != NULL) {
370                 *num_instances = buflen / sizeof(struct notifyd_instance);
371         }
372         return true;
373 }
374
375 static bool notifyd_apply_rec_change(
376         const struct server_id *client,
377         const char *path, size_t pathlen,
378         const struct notify_instance *chg,
379         struct db_context *entries,
380         sys_notify_watch_fn sys_notify_watch,
381         struct sys_notify_context *sys_notify_ctx,
382         struct messaging_context *msg_ctx)
383 {
384         struct db_record *rec;
385         struct notifyd_instance *instances;
386         size_t num_instances;
387         size_t i;
388         struct notifyd_instance *instance;
389         TDB_DATA value;
390         NTSTATUS status;
391         bool ok = false;
392
393         if (pathlen == 0) {
394                 DEBUG(1, ("%s: pathlen==0\n", __func__));
395                 return false;
396         }
397         if (path[pathlen-1] != '\0') {
398                 DEBUG(1, ("%s: path not 0-terminated\n", __func__));
399                 return false;
400         }
401
402         DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
403                    "private_data=%p\n", __func__, path,
404                    (unsigned)chg->filter, (unsigned)chg->subdir_filter,
405                    chg->private_data));
406
407         rec = dbwrap_fetch_locked(
408                 entries, entries,
409                 make_tdb_data((const uint8_t *)path, pathlen-1));
410
411         if (rec == NULL) {
412                 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
413                 goto fail;
414         }
415
416         num_instances = 0;
417         value = dbwrap_record_get_value(rec);
418
419         if (value.dsize != 0) {
420                 if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
421                                          &num_instances)) {
422                         goto fail;
423                 }
424         }
425
426         /*
427          * Overallocate by one instance to avoid a realloc when adding
428          */
429         instances = talloc_array(rec, struct notifyd_instance,
430                                  num_instances + 1);
431         if (instances == NULL) {
432                 DEBUG(1, ("%s: talloc failed\n", __func__));
433                 goto fail;
434         }
435
436         if (value.dsize != 0) {
437                 memcpy(instances, value.dptr, value.dsize);
438         }
439
440         for (i=0; i<num_instances; i++) {
441                 instance = &instances[i];
442
443                 if (server_id_equal(&instance->client, client) &&
444                     (instance->instance.private_data == chg->private_data)) {
445                         break;
446                 }
447         }
448
449         if (i < num_instances) {
450                 instance->instance = *chg;
451         } else {
452                 /*
453                  * We've overallocated for one instance
454                  */
455                 instance = &instances[num_instances];
456
457                 *instance = (struct notifyd_instance) {
458                         .client = *client,
459                         .instance = *chg,
460                         .internal_filter = chg->filter,
461                         .internal_subdir_filter = chg->subdir_filter
462                 };
463
464                 num_instances += 1;
465         }
466
467         if ((instance->instance.filter != 0) ||
468             (instance->instance.subdir_filter != 0)) {
469                 int ret;
470
471                 TALLOC_FREE(instance->sys_watch);
472
473                 ret = sys_notify_watch(entries, sys_notify_ctx, path,
474                                        &instance->internal_filter,
475                                        &instance->internal_subdir_filter,
476                                        notifyd_sys_callback, msg_ctx,
477                                        &instance->sys_watch);
478                 if (ret != 0) {
479                         DEBUG(1, ("%s: inotify_watch returned %s\n",
480                                   __func__, strerror(errno)));
481                 }
482         }
483
484         if ((instance->instance.filter == 0) &&
485             (instance->instance.subdir_filter == 0)) {
486                 /* This is a delete request */
487                 TALLOC_FREE(instance->sys_watch);
488                 *instance = instances[num_instances-1];
489                 num_instances -= 1;
490         }
491
492         DEBUG(10, ("%s: %s has %u instances\n", __func__,
493                    path, (unsigned)num_instances));
494
495         if (num_instances == 0) {
496                 status = dbwrap_record_delete(rec);
497                 if (!NT_STATUS_IS_OK(status)) {
498                         DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
499                                   __func__, nt_errstr(status)));
500                         goto fail;
501                 }
502         } else {
503                 value = make_tdb_data(
504                         (uint8_t *)instances,
505                         sizeof(struct notifyd_instance) * num_instances);
506
507                 status = dbwrap_record_store(rec, value, 0);
508                 if (!NT_STATUS_IS_OK(status)) {
509                         DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
510                                   __func__, nt_errstr(status)));
511                         goto fail;
512                 }
513         }
514
515         ok = true;
516 fail:
517         TALLOC_FREE(rec);
518         return ok;
519 }
520
521 static void notifyd_sys_callback(struct sys_notify_context *ctx,
522                                  void *private_data, struct notify_event *ev,
523                                  uint32_t filter)
524 {
525         struct messaging_context *msg_ctx = talloc_get_type_abort(
526                 private_data, struct messaging_context);
527         struct notify_trigger_msg msg;
528         struct iovec iov[4];
529         char slash = '/';
530
531         msg = (struct notify_trigger_msg) {
532                 .when = timespec_current(),
533                 .action = ev->action,
534                 .filter = filter,
535         };
536
537         iov[0].iov_base = &msg;
538         iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
539         iov[1].iov_base = discard_const_p(char, ev->dir);
540         iov[1].iov_len = strlen(ev->dir);
541         iov[2].iov_base = &slash;
542         iov[2].iov_len = 1;
543         iov[3].iov_base = discard_const_p(char, ev->path);
544         iov[3].iov_len = strlen(ev->path)+1;
545
546         messaging_send_iov(
547                 msg_ctx, messaging_server_id(msg_ctx),
548                 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
549 }
550
551 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
552                                      struct notify_rec_change_msg **pmsg,
553                                      size_t *pathlen)
554 {
555         struct notify_rec_change_msg *msg;
556
557         if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
558                 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
559                           (unsigned)bufsize));
560                 return false;
561         }
562
563         *pmsg = msg = (struct notify_rec_change_msg *)buf;
564         *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
565
566         DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
567                    "private_data=%p, path=%.*s\n",
568                    __func__, (unsigned)msg->instance.filter,
569                    (unsigned)msg->instance.subdir_filter,
570                    msg->instance.private_data, (int)(*pathlen), msg->path));
571
572         return true;
573 }
574
575 static void notifyd_rec_change(struct messaging_context *msg_ctx,
576                                void *private_data, uint32_t msg_type,
577                                struct server_id src, DATA_BLOB *data)
578 {
579         struct notifyd_state *state = talloc_get_type_abort(
580                 private_data, struct notifyd_state);
581         struct server_id_buf idbuf;
582         struct notify_rec_change_msg *msg;
583         size_t pathlen;
584         bool ok;
585
586         DBG_DEBUG("Got %zu bytes from %s\n", data->length,
587                   server_id_str_buf(src, &idbuf));
588
589         ok = notifyd_parse_rec_change(data->data, data->length,
590                                       &msg, &pathlen);
591         if (!ok) {
592                 return;
593         }
594
595         ok = notifyd_apply_rec_change(
596                 &src, msg->path, pathlen, &msg->instance,
597                 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
598                 state->msg_ctx);
599         if (!ok) {
600                 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
601                           __func__));
602                 return;
603         }
604
605         if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
606                 return;
607         }
608
609 #ifdef CLUSTER_SUPPORT
610         {
611
612         struct messaging_rec **tmp;
613         struct messaging_reclog *log;
614         struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
615
616         log = state->log;
617
618         tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
619                              log->num_recs+1);
620         if (tmp == NULL) {
621                 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
622                 return;
623         }
624         log->recs = tmp;
625
626         log->recs[log->num_recs] = messaging_rec_create(
627                 log->recs, src, messaging_server_id(msg_ctx),
628                 msg_type, &iov, 1, NULL, 0);
629
630         if (log->recs[log->num_recs] == NULL) {
631                 DBG_WARNING("messaging_rec_create failed, ignoring\n");
632                 return;
633         }
634
635         log->num_recs += 1;
636
637         if (log->num_recs >= 100) {
638                 /*
639                  * Don't let the log grow too large
640                  */
641                 notifyd_broadcast_reclog(state->ctdbd_conn,
642                                          messaging_server_id(msg_ctx), log);
643         }
644
645         }
646 #endif
647 }
648
649 struct notifyd_trigger_state {
650         struct messaging_context *msg_ctx;
651         struct notify_trigger_msg *msg;
652         bool recursive;
653         bool covered_by_sys_notify;
654 };
655
656 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
657                                    void *private_data);
658
659 static void notifyd_trigger(struct messaging_context *msg_ctx,
660                             void *private_data, uint32_t msg_type,
661                             struct server_id src, DATA_BLOB *data)
662 {
663         struct notifyd_state *state = talloc_get_type_abort(
664                 private_data, struct notifyd_state);
665         struct server_id my_id = messaging_server_id(msg_ctx);
666         struct notifyd_trigger_state tstate;
667         const char *path;
668         const char *p, *next_p;
669
670         if (data->length < offsetof(struct notify_trigger_msg, path) + 1) {
671                 DBG_WARNING("message too short, ignoring: %zu\n",
672                             data->length);
673                 return;
674         }
675         if (data->data[data->length-1] != 0) {
676                 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
677                 return;
678         }
679
680         tstate.msg_ctx = msg_ctx;
681
682         tstate.covered_by_sys_notify = (src.vnn == my_id.vnn);
683         tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id);
684
685         tstate.msg = (struct notify_trigger_msg *)data->data;
686         path = tstate.msg->path;
687
688         DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
689                    __func__, (unsigned)tstate.msg->action,
690                    (unsigned)tstate.msg->filter, path));
691
692         if (path[0] != '/') {
693                 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
694                           __func__, path));
695                 return;
696         }
697
698         for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
699                 ptrdiff_t path_len = p - path;
700                 TDB_DATA key;
701                 uint32_t i;
702
703                 next_p = strchr(p+1, '/');
704                 tstate.recursive = (next_p != NULL);
705
706                 DEBUG(10, ("%s: Trying path %.*s\n", __func__,
707                            (int)path_len, path));
708
709                 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
710                                    .dsize = path_len };
711
712                 dbwrap_parse_record(state->entries, key,
713                                     notifyd_trigger_parser, &tstate);
714
715                 if (state->peers == NULL) {
716                         continue;
717                 }
718
719                 if (src.vnn != my_id.vnn) {
720                         continue;
721                 }
722
723                 for (i=0; i<state->num_peers; i++) {
724                         if (state->peers[i]->db == NULL) {
725                                 /*
726                                  * Inactive peer, did not get a db yet
727                                  */
728                                 continue;
729                         }
730                         dbwrap_parse_record(state->peers[i]->db, key,
731                                             notifyd_trigger_parser, &tstate);
732                 }
733         }
734 }
735
736 static void notifyd_send_delete(struct messaging_context *msg_ctx,
737                                 TDB_DATA key,
738                                 struct notifyd_instance *instance);
739
740 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
741                                    void *private_data)
742
743 {
744         struct notifyd_trigger_state *tstate = private_data;
745         struct notify_event_msg msg = { .action = tstate->msg->action,
746                                         .when = tstate->msg->when };
747         struct iovec iov[2];
748         size_t path_len = key.dsize;
749         struct notifyd_instance *instances = NULL;
750         size_t num_instances = 0;
751         size_t i;
752
753         if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
754                                  &num_instances)) {
755                 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
756                 return;
757         }
758
759         DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
760                    (unsigned)num_instances, (int)key.dsize,
761                    (char *)key.dptr));
762
763         iov[0].iov_base = &msg;
764         iov[0].iov_len = offsetof(struct notify_event_msg, path);
765         iov[1].iov_base = tstate->msg->path + path_len + 1;
766         iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
767
768         for (i=0; i<num_instances; i++) {
769                 struct notifyd_instance *instance = &instances[i];
770                 struct server_id_buf idbuf;
771                 uint32_t i_filter;
772                 NTSTATUS status;
773
774                 if (tstate->covered_by_sys_notify) {
775                         if (tstate->recursive) {
776                                 i_filter = instance->internal_subdir_filter;
777                         } else {
778                                 i_filter = instance->internal_filter;
779                         }
780                 } else {
781                         if (tstate->recursive) {
782                                 i_filter = instance->instance.subdir_filter;
783                         } else {
784                                 i_filter = instance->instance.filter;
785                         }
786                 }
787
788                 if ((i_filter & tstate->msg->filter) == 0) {
789                         continue;
790                 }
791
792                 msg.private_data = instance->instance.private_data;
793
794                 status = messaging_send_iov(
795                         tstate->msg_ctx, instance->client,
796                         MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
797
798                 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
799                            __func__,
800                            server_id_str_buf(instance->client, &idbuf),
801                            nt_errstr(status)));
802
803                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
804                     procid_is_local(&instance->client)) {
805                         /*
806                          * That process has died
807                          */
808                         notifyd_send_delete(tstate->msg_ctx, key, instance);
809                         continue;
810                 }
811
812                 if (!NT_STATUS_IS_OK(status)) {
813                         DEBUG(1, ("%s: messaging_send_iov returned %s\n",
814                                   __func__, nt_errstr(status)));
815                 }
816         }
817 }
818
819 /*
820  * Send a delete request to ourselves to properly discard a notify
821  * record for an smbd that has died.
822  */
823
824 static void notifyd_send_delete(struct messaging_context *msg_ctx,
825                                 TDB_DATA key,
826                                 struct notifyd_instance *instance)
827 {
828         struct notify_rec_change_msg msg = {
829                 .instance.private_data = instance->instance.private_data
830         };
831         uint8_t nul = 0;
832         struct iovec iov[3];
833         int ret;
834
835         /*
836          * Send a rec_change to ourselves to delete a dead entry
837          */
838
839         iov[0] = (struct iovec) {
840                 .iov_base = &msg,
841                 .iov_len = offsetof(struct notify_rec_change_msg, path) };
842         iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
843         iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
844
845         ret = messaging_send_iov_from(
846                 msg_ctx, instance->client, messaging_server_id(msg_ctx),
847                 MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
848
849         if (ret != 0) {
850                 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
851                            __func__, strerror(ret)));
852         }
853 }
854
855 static bool notifyd_get_db(struct messaging_context *msg_ctx,
856                            struct messaging_rec **prec,
857                            void *private_data)
858 {
859         struct notifyd_state *state = talloc_get_type_abort(
860                 private_data, struct notifyd_state);
861         struct messaging_rec *rec = *prec;
862         struct server_id_buf id1, id2;
863         NTSTATUS status;
864         uint64_t rec_index = UINT64_MAX;
865         uint8_t index_buf[sizeof(uint64_t)];
866         size_t dbsize;
867         uint8_t *buf;
868         struct iovec iov[2];
869
870         dbsize = dbwrap_marshall(state->entries, NULL, 0);
871
872         buf = talloc_array(rec, uint8_t, dbsize);
873         if (buf == NULL) {
874                 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
875                           __func__, (uintmax_t)dbsize));
876                 return true;
877         }
878
879         dbsize = dbwrap_marshall(state->entries, buf, dbsize);
880
881         if (dbsize != talloc_get_size(buf)) {
882                 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
883                           (uintmax_t)talloc_get_size(buf),
884                           (uintmax_t)dbsize));
885                 TALLOC_FREE(buf);
886                 return true;
887         }
888
889         if (state->log != NULL) {
890                 rec_index = state->log->rec_index;
891         }
892         SBVAL(index_buf, 0, rec_index);
893
894         iov[0] = (struct iovec) { .iov_base = index_buf,
895                                   .iov_len = sizeof(index_buf) };
896         iov[1] = (struct iovec) { .iov_base = buf,
897                                   .iov_len = dbsize };
898
899         DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
900                    (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
901                    server_id_str_buf(messaging_server_id(msg_ctx), &id1),
902                    server_id_str_buf(rec->src, &id2)));
903
904         status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
905                                     iov, ARRAY_SIZE(iov), NULL, 0);
906         TALLOC_FREE(buf);
907         if (!NT_STATUS_IS_OK(status)) {
908                 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
909                           __func__, nt_errstr(status)));
910         }
911
912         return true;
913 }
914
915 #ifdef CLUSTER_SUPPORT
916
917 static int notifyd_add_proxy_syswatches(struct db_record *rec,
918                                         void *private_data);
919
920 static bool notifyd_got_db(struct messaging_context *msg_ctx,
921                            struct messaging_rec **prec,
922                            void *private_data)
923 {
924         struct notifyd_state *state = talloc_get_type_abort(
925                 private_data, struct notifyd_state);
926         struct messaging_rec *rec = *prec;
927         struct notifyd_peer *p = NULL;
928         struct server_id_buf idbuf;
929         NTSTATUS status;
930         int count;
931         size_t i;
932
933         for (i=0; i<state->num_peers; i++) {
934                 if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
935                         p = state->peers[i];
936                         break;
937                 }
938         }
939
940         if (p == NULL) {
941                 DEBUG(10, ("%s: Did not find peer for db from %s\n",
942                            __func__, server_id_str_buf(rec->src, &idbuf)));
943                 return true;
944         }
945
946         if (rec->buf.length < 8) {
947                 DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
948                            (unsigned)rec->buf.length,
949                            server_id_str_buf(rec->src, &idbuf)));
950                 TALLOC_FREE(p);
951                 return true;
952         }
953
954         p->rec_index = BVAL(rec->buf.data, 0);
955
956         p->db = db_open_rbt(p);
957         if (p->db == NULL) {
958                 DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
959                 TALLOC_FREE(p);
960                 return true;
961         }
962
963         status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
964                                    rec->buf.length - 8);
965         if (!NT_STATUS_IS_OK(status)) {
966                 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
967                            __func__, nt_errstr(status),
968                            server_id_str_buf(rec->src, &idbuf)));
969                 TALLOC_FREE(p);
970                 return true;
971         }
972
973         dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
974                              &count);
975
976         DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
977                    server_id_str_buf(rec->src, &idbuf), count));
978
979         return true;
980 }
981
982 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
983                                      struct server_id src,
984                                      struct messaging_reclog *log)
985 {
986         enum ndr_err_code ndr_err;
987         uint8_t msghdr[MESSAGE_HDR_LENGTH];
988         DATA_BLOB blob;
989         struct iovec iov[2];
990         int ret;
991
992         if (log == NULL) {
993                 return;
994         }
995
996         DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
997                    (uintmax_t)log->rec_index, (unsigned)log->num_recs));
998
999         message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
1000                         (struct server_id) {0 });
1001         iov[0] = (struct iovec) { .iov_base = msghdr,
1002                                   .iov_len = sizeof(msghdr) };
1003
1004         ndr_err = ndr_push_struct_blob(
1005                 &blob, log, log,
1006                 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
1007         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1008                 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
1009                           __func__, ndr_errstr(ndr_err)));
1010                 goto done;
1011         }
1012         iov[1] = (struct iovec) { .iov_base = blob.data,
1013                                   .iov_len = blob.length };
1014
1015         ret = ctdbd_messaging_send_iov(
1016                 ctdbd_conn, CTDB_BROADCAST_VNNMAP,
1017                 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
1018         TALLOC_FREE(blob.data);
1019         if (ret != 0) {
1020                 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
1021                           __func__, strerror(ret)));
1022                 goto done;
1023         }
1024
1025         log->rec_index += 1;
1026
1027 done:
1028         log->num_recs = 0;
1029         TALLOC_FREE(log->recs);
1030 }
1031
1032 struct notifyd_broadcast_reclog_state {
1033         struct tevent_context *ev;
1034         struct ctdbd_connection *ctdbd_conn;
1035         struct server_id src;
1036         struct messaging_reclog *log;
1037 };
1038
1039 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1040
1041 static struct tevent_req *notifyd_broadcast_reclog_send(
1042         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1043         struct ctdbd_connection *ctdbd_conn, struct server_id src,
1044         struct messaging_reclog *log)
1045 {
1046         struct tevent_req *req, *subreq;
1047         struct notifyd_broadcast_reclog_state *state;
1048
1049         req = tevent_req_create(mem_ctx, &state,
1050                                 struct notifyd_broadcast_reclog_state);
1051         if (req == NULL) {
1052                 return NULL;
1053         }
1054         state->ev = ev;
1055         state->ctdbd_conn = ctdbd_conn;
1056         state->src = src;
1057         state->log = log;
1058
1059         subreq = tevent_wakeup_send(state, state->ev,
1060                                     timeval_current_ofs_msec(1000));
1061         if (tevent_req_nomem(subreq, req)) {
1062                 return tevent_req_post(req, ev);
1063         }
1064         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1065         return req;
1066 }
1067
1068 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1069 {
1070         struct tevent_req *req = tevent_req_callback_data(
1071                 subreq, struct tevent_req);
1072         struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1073                 req, struct notifyd_broadcast_reclog_state);
1074         bool ok;
1075
1076         ok = tevent_wakeup_recv(subreq);
1077         TALLOC_FREE(subreq);
1078         if (!ok) {
1079                 tevent_req_oom(req);
1080                 return;
1081         }
1082
1083         notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1084
1085         subreq = tevent_wakeup_send(state, state->ev,
1086                                     timeval_current_ofs_msec(1000));
1087         if (tevent_req_nomem(subreq, req)) {
1088                 return;
1089         }
1090         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1091 }
1092
1093 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1094 {
1095         return tevent_req_simple_recv_unix(req);
1096 }
1097
1098 struct notifyd_clean_peers_state {
1099         struct tevent_context *ev;
1100         struct notifyd_state *notifyd;
1101 };
1102
1103 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1104
1105 static struct tevent_req *notifyd_clean_peers_send(
1106         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1107         struct notifyd_state *notifyd)
1108 {
1109         struct tevent_req *req, *subreq;
1110         struct notifyd_clean_peers_state *state;
1111
1112         req = tevent_req_create(mem_ctx, &state,
1113                                 struct notifyd_clean_peers_state);
1114         if (req == NULL) {
1115                 return NULL;
1116         }
1117         state->ev = ev;
1118         state->notifyd = notifyd;
1119
1120         subreq = tevent_wakeup_send(state, state->ev,
1121                                     timeval_current_ofs_msec(30000));
1122         if (tevent_req_nomem(subreq, req)) {
1123                 return tevent_req_post(req, ev);
1124         }
1125         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1126         return req;
1127 }
1128
1129 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1130 {
1131         struct tevent_req *req = tevent_req_callback_data(
1132                 subreq, struct tevent_req);
1133         struct notifyd_clean_peers_state *state = tevent_req_data(
1134                 req, struct notifyd_clean_peers_state);
1135         struct notifyd_state *notifyd = state->notifyd;
1136         size_t i;
1137         bool ok;
1138         time_t now = time(NULL);
1139
1140         ok = tevent_wakeup_recv(subreq);
1141         TALLOC_FREE(subreq);
1142         if (!ok) {
1143                 tevent_req_oom(req);
1144                 return;
1145         }
1146
1147         i = 0;
1148         while (i < notifyd->num_peers) {
1149                 struct notifyd_peer *p = notifyd->peers[i];
1150
1151                 if ((now - p->last_broadcast) > 60) {
1152                         struct server_id_buf idbuf;
1153
1154                         /*
1155                          * Haven't heard for more than 60 seconds. Call this
1156                          * peer dead
1157                          */
1158
1159                         DEBUG(10, ("%s: peer %s died\n", __func__,
1160                                    server_id_str_buf(p->pid, &idbuf)));
1161                         /*
1162                          * This implicitly decrements notifyd->num_peers
1163                          */
1164                         TALLOC_FREE(p);
1165                 } else {
1166                         i += 1;
1167                 }
1168         }
1169
1170         subreq = tevent_wakeup_send(state, state->ev,
1171                                     timeval_current_ofs_msec(30000));
1172         if (tevent_req_nomem(subreq, req)) {
1173                 return;
1174         }
1175         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1176 }
1177
1178 static int notifyd_clean_peers_recv(struct tevent_req *req)
1179 {
1180         return tevent_req_simple_recv_unix(req);
1181 }
1182
1183 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1184                                         void *private_data)
1185 {
1186         struct notifyd_state *state = talloc_get_type_abort(
1187                 private_data, struct notifyd_state);
1188         struct db_context *db = dbwrap_record_get_db(rec);
1189         TDB_DATA key = dbwrap_record_get_key(rec);
1190         TDB_DATA value = dbwrap_record_get_value(rec);
1191         struct notifyd_instance *instances = NULL;
1192         size_t num_instances = 0;
1193         size_t i;
1194         char path[key.dsize+1];
1195         bool ok;
1196
1197         memcpy(path, key.dptr, key.dsize);
1198         path[key.dsize] = '\0';
1199
1200         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1201                                  &num_instances);
1202         if (!ok) {
1203                 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1204                           __func__, path));
1205                 return 0;
1206         }
1207
1208         for (i=0; i<num_instances; i++) {
1209                 struct notifyd_instance *instance = &instances[i];
1210                 uint32_t filter = instance->instance.filter;
1211                 uint32_t subdir_filter = instance->instance.subdir_filter;
1212                 int ret;
1213
1214                 /*
1215                  * This is a remote database. Pointers that we were
1216                  * given don't make sense locally. Initialize to NULL
1217                  * in case sys_notify_watch fails.
1218                  */
1219                 instances[i].sys_watch = NULL;
1220
1221                 ret = state->sys_notify_watch(
1222                         db, state->sys_notify_ctx, path,
1223                         &filter, &subdir_filter,
1224                         notifyd_sys_callback, state->msg_ctx,
1225                         &instance->sys_watch);
1226                 if (ret != 0) {
1227                         DEBUG(1, ("%s: inotify_watch returned %s\n",
1228                                   __func__, strerror(errno)));
1229                 }
1230         }
1231
1232         return 0;
1233 }
1234
1235 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1236 {
1237         TDB_DATA key = dbwrap_record_get_key(rec);
1238         TDB_DATA value = dbwrap_record_get_value(rec);
1239         struct notifyd_instance *instances = NULL;
1240         size_t num_instances = 0;
1241         size_t i;
1242         bool ok;
1243
1244         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1245                                  &num_instances);
1246         if (!ok) {
1247                 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1248                           __func__, (int)key.dsize, (char *)key.dptr));
1249                 return 0;
1250         }
1251         for (i=0; i<num_instances; i++) {
1252                 TALLOC_FREE(instances[i].sys_watch);
1253         }
1254         return 0;
1255 }
1256
1257 static int notifyd_peer_destructor(struct notifyd_peer *p)
1258 {
1259         struct notifyd_state *state = p->state;
1260         size_t i;
1261
1262         if (p->db != NULL) {
1263                 dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1264                                      NULL, NULL);
1265         }
1266
1267         for (i = 0; i<state->num_peers; i++) {
1268                 if (p == state->peers[i]) {
1269                         state->peers[i] = state->peers[state->num_peers-1];
1270                         state->num_peers -= 1;
1271                         break;
1272                 }
1273         }
1274         return 0;
1275 }
1276
1277 static struct notifyd_peer *notifyd_peer_new(
1278         struct notifyd_state *state, struct server_id pid)
1279 {
1280         struct notifyd_peer *p, **tmp;
1281
1282         tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1283                              state->num_peers+1);
1284         if (tmp == NULL) {
1285                 return NULL;
1286         }
1287         state->peers = tmp;
1288
1289         p = talloc_zero(state->peers, struct notifyd_peer);
1290         if (p == NULL) {
1291                 return NULL;
1292         }
1293         p->state = state;
1294         p->pid = pid;
1295
1296         state->peers[state->num_peers] = p;
1297         state->num_peers += 1;
1298
1299         talloc_set_destructor(p, notifyd_peer_destructor);
1300
1301         return p;
1302 }
1303
1304 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1305                                  const uint8_t *msg, size_t msglen)
1306 {
1307         struct notifyd_state *state = peer->state;
1308         DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1309                            .length = msglen };
1310         struct server_id_buf idbuf;
1311         struct messaging_reclog *log;
1312         enum ndr_err_code ndr_err;
1313         uint32_t i;
1314
1315         if (peer->db == NULL) {
1316                 /*
1317                  * No db yet
1318                  */
1319                 return;
1320         }
1321
1322         log = talloc(peer, struct messaging_reclog);
1323         if (log == NULL) {
1324                 DEBUG(10, ("%s: talloc failed\n", __func__));
1325                 return;
1326         }
1327
1328         ndr_err = ndr_pull_struct_blob_all(
1329                 &blob, log, log,
1330                 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1331         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1332                 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1333                            __func__, ndr_errstr(ndr_err)));
1334                 goto fail;
1335         }
1336
1337         DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
1338                    (unsigned)log->num_recs, (uintmax_t)log->rec_index,
1339                    server_id_str_buf(peer->pid, &idbuf)));
1340
1341         if (log->rec_index != peer->rec_index) {
1342                 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1343                           __func__, (uintmax_t)log->rec_index,
1344                           server_id_str_buf(peer->pid, &idbuf),
1345                           (uintmax_t)peer->rec_index));
1346                 goto fail;
1347         }
1348
1349         for (i=0; i<log->num_recs; i++) {
1350                 struct messaging_rec *r = log->recs[i];
1351                 struct notify_rec_change_msg *chg;
1352                 size_t pathlen;
1353                 bool ok;
1354
1355                 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1356                                               &chg, &pathlen);
1357                 if (!ok) {
1358                         DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1359                                   __func__));
1360                         goto fail;
1361                 }
1362
1363                 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1364                                               &chg->instance, peer->db,
1365                                               state->sys_notify_watch,
1366                                               state->sys_notify_ctx,
1367                                               state->msg_ctx);
1368                 if (!ok) {
1369                         DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1370                                   __func__));
1371                         goto fail;
1372                 }
1373         }
1374
1375         peer->rec_index += 1;
1376         peer->last_broadcast = time(NULL);
1377
1378         TALLOC_FREE(log);
1379         return;
1380
1381 fail:
1382         DEBUG(10, ("%s: Dropping peer %s\n", __func__,
1383                    server_id_str_buf(peer->pid, &idbuf)));
1384         TALLOC_FREE(peer);
1385 }
1386
1387 /*
1388  * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1389  * messages) broadcasts by other notifyds. Several cases:
1390  *
1391  * We don't know the source. This creates a new peer. Creating a peer
1392  * involves asking the peer for its full database. We assume ordered
1393  * messages, so the new database will arrive before the next broadcast
1394  * will.
1395  *
1396  * We know the source and the log index matches. We will apply the log
1397  * locally to our peer's db as if we had received it from a local
1398  * client.
1399  *
1400  * We know the source but the log index does not match. This means we
1401  * lost a message. We just drop the whole peer and wait for the next
1402  * broadcast, which will then trigger a fresh database pull.
1403  */
1404
1405 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
1406                                    uint64_t dst_srvid,
1407                                    const uint8_t *msg, size_t msglen,
1408                                    void *private_data)
1409 {
1410         struct notifyd_state *state = talloc_get_type_abort(
1411                 private_data, struct notifyd_state);
1412         struct server_id my_id = messaging_server_id(state->msg_ctx);
1413         struct notifyd_peer *p;
1414         uint32_t i;
1415         uint32_t msg_type;
1416         struct server_id src, dst;
1417         struct server_id_buf idbuf;
1418         NTSTATUS status;
1419
1420         if (msglen < MESSAGE_HDR_LENGTH) {
1421                 DEBUG(10, ("%s: Got short broadcast\n", __func__));
1422                 return 0;
1423         }
1424         message_hdr_get(&msg_type, &src, &dst, msg);
1425
1426         if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1427                 DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
1428                            (unsigned)msg_type));
1429                 return 0;
1430         }
1431         if (server_id_equal(&src, &my_id)) {
1432                 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
1433                 return 0;
1434         }
1435
1436         DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1437                    __func__, server_id_str_buf(src, &idbuf)));
1438
1439         for (i=0; i<state->num_peers; i++) {
1440                 if (server_id_equal(&state->peers[i]->pid, &src)) {
1441
1442                         DEBUG(10, ("%s: Applying changes to peer %u\n",
1443                                    __func__, (unsigned)i));
1444
1445                         notifyd_apply_reclog(state->peers[i],
1446                                              msg + MESSAGE_HDR_LENGTH,
1447                                              msglen - MESSAGE_HDR_LENGTH);
1448                         return 0;
1449                 }
1450         }
1451
1452         DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
1453                    server_id_str_buf(src, &idbuf)));
1454
1455         p = notifyd_peer_new(state, src);
1456         if (p == NULL) {
1457                 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
1458                 return 0;
1459         }
1460
1461         status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1462                                     NULL, 0);
1463         if (!NT_STATUS_IS_OK(status)) {
1464                 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1465                            __func__, nt_errstr(status)));
1466                 TALLOC_FREE(p);
1467                 return 0;
1468         }
1469
1470         return 0;
1471 }
1472 #endif
1473
1474 struct notifyd_parse_db_state {
1475         bool (*fn)(const char *path,
1476                    struct server_id server,
1477                    const struct notify_instance *instance,
1478                    void *private_data);
1479         void *private_data;
1480 };
1481
1482 static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
1483                                     void *private_data)
1484 {
1485         struct notifyd_parse_db_state *state = private_data;
1486         char path[key.dsize+1];
1487         struct notifyd_instance *instances = NULL;
1488         size_t num_instances = 0;
1489         size_t i;
1490         bool ok;
1491
1492         memcpy(path, key.dptr, key.dsize);
1493         path[key.dsize] = 0;
1494
1495         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1496                                  &num_instances);
1497         if (!ok) {
1498                 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1499                            __func__, path));
1500                 return true;
1501         }
1502
1503         for (i=0; i<num_instances; i++) {
1504                 ok = state->fn(path, instances[i].client,
1505                                &instances[i].instance,
1506                                state->private_data);
1507                 if (!ok) {
1508                         return false;
1509                 }
1510         }
1511
1512         return true;
1513 }
1514
1515 int notifyd_parse_db(const uint8_t *buf, size_t buflen,
1516                      uint64_t *log_index,
1517                      bool (*fn)(const char *path,
1518                                 struct server_id server,
1519                                 const struct notify_instance *instance,
1520                                 void *private_data),
1521                      void *private_data)
1522 {
1523         struct notifyd_parse_db_state state = {
1524                 .fn = fn, .private_data = private_data
1525         };
1526         NTSTATUS status;
1527
1528         if (buflen < 8) {
1529                 return EINVAL;
1530         }
1531         *log_index = BVAL(buf, 0);
1532
1533         buf += 8;
1534         buflen -= 8;
1535
1536         status = dbwrap_parse_marshall_buf(
1537                 buf, buflen, notifyd_parse_db_parser, &state);
1538         if (!NT_STATUS_IS_OK(status)) {
1539                 return map_errno_from_nt_status(status);
1540         }
1541
1542         return 0;
1543 }