4d885651bae89aa189e57b0c4a5c69468125c0dc
[kai/samba-autobuild/.git] / source3 / smbd / notify_internal.c
1 /*
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Andrew Tridgell 2006
5    Copyright (C) Volker Lendecke 2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22   this is the change notify database. It implements mechanisms for
23   storing current change notify waiters in a tdb, and checking if a
24   given event matches any of the stored notify waiters.
25 */
26
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "librpc/gen_ndr/ndr_notify.h"
30 #include "dbwrap/dbwrap.h"
31 #include "dbwrap/dbwrap_open.h"
32 #include "dbwrap/dbwrap_tdb.h"
33 #include "smbd/smbd.h"
34 #include "messages.h"
35 #include "lib/tdb_wrap/tdb_wrap.h"
36 #include "util_tdb.h"
37 #include "lib/param/param.h"
38 #include "lib/dbwrap/dbwrap_cache.h"
39 #include "ctdb_srvids.h"
40 #include "ctdbd_conn.h"
41 #include "ctdb_conn.h"
42 #include "lib/util/tevent_unix.h"
43
44 struct notify_list {
45         struct notify_list *next, *prev;
46         const char *path;
47         void (*callback)(void *, const struct notify_event *);
48         void *private_data;
49 };
50
51 struct notify_context {
52         struct messaging_context *msg;
53         struct notify_list *list;
54
55         /*
56          * The notify database is split up into two databases: One
57          * relatively static index db and the real notify db with the
58          * volatile entries.
59          */
60
61         /*
62          * "db_notify" is indexed by pathname. Per record it stores an
63          * array of notify_db_entry structs. These represent the
64          * notify records as requested by the smb client. This
65          * database is always held locally, it is never clustered.
66          */
67         struct db_context *db_notify;
68
69         /*
70          * "db_index" is indexed by pathname. The records are an array
71          * of VNNs which have any interest in notifies for this path
72          * name.
73          *
74          * In the non-clustered case this database is cached in RAM by
75          * means of db_cache_open, which maintains a cache per
76          * process. Cache consistency is maintained by the tdb
77          * sequence number.
78          *
79          * In the clustered case right now we can not use the tdb
80          * sequence number, but by means of read only records we
81          * should be able to avoid a lot of full migrations.
82          *
83          * In both cases, it is important to keep the update
84          * operations to db_index to a minimum. This is achieved by
85          * delayed deletion. When a db_notify is initially created,
86          * the db_index record is also created. When more notifies are
87          * added for a path, then only the db_notify record needs to be
88          * modified, the db_index record is not touched. When the last
89          * entry from the db_notify record is deleted, the db_index
90          * record is not immediately deleted. Instead, the db_notify
91          * record is replaced with a current timestamp. A regular
92          * cleanup process will delete all db_index records that are
93          * older than a minute.
94          */
95         struct db_context *db_index;
96 };
97
98 static void notify_trigger_local(struct notify_context *notify,
99                                  uint32_t action, uint32_t filter,
100                                  const char *path, size_t path_len,
101                                  bool recursive);
102 static NTSTATUS notify_send(struct notify_context *notify,
103                             struct server_id *pid,
104                             const char *path, uint32_t action,
105                             void *private_data);
106 static NTSTATUS notify_add_entry(struct db_record *rec,
107                                  const struct notify_db_entry *e,
108                                  bool *p_add_idx);
109 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn);
110
111 static NTSTATUS notify_del_entry(struct db_record *rec,
112                                  const struct server_id *pid,
113                                  void *private_data);
114 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn);
115
116 static int notify_context_destructor(struct notify_context *notify);
117
118 static void notify_handler(struct messaging_context *msg_ctx,
119                            void *private_data, uint32_t msg_type,
120                            struct server_id server_id, DATA_BLOB *data);
121
122 struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
123                                    struct messaging_context *msg,
124                                    struct tevent_context *ev)
125 {
126         struct loadparm_context *lp_ctx;
127         struct notify_context *notify;
128
129         notify = talloc(mem_ctx, struct notify_context);
130         if (notify == NULL) {
131                 goto fail;
132         }
133         notify->msg = msg;
134         notify->list = NULL;
135
136         lp_ctx = loadparm_init_s3(notify, loadparm_s3_helpers());
137         notify->db_notify = db_open_tdb(
138                 notify, lp_ctx, lock_path("notify.tdb"),
139                 0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
140                 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_2, DBWRAP_FLAG_NONE);
141                 talloc_unlink(notify, lp_ctx);
142         if (notify->db_notify == NULL) {
143                 goto fail;
144         }
145         notify->db_index = db_open(
146                 notify, lock_path("notify_index.tdb"),
147                 0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
148                 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_3, DBWRAP_FLAG_NONE);
149         if (notify->db_index == NULL) {
150                 goto fail;
151         }
152         if (!lp_clustering()) {
153                 notify->db_index = db_open_cache(notify, notify->db_index);
154                 if (notify->db_index == NULL) {
155                         goto fail;
156                 }
157         }
158
159         if (notify->msg != NULL) {
160                 NTSTATUS status;
161
162                 status = messaging_register(notify->msg, notify,
163                                             MSG_PVFS_NOTIFY, notify_handler);
164                 if (!NT_STATUS_IS_OK(status)) {
165                         DEBUG(1, ("messaging_register returned %s\n",
166                                   nt_errstr(status)));
167                         goto fail;
168                 }
169         }
170
171         talloc_set_destructor(notify, notify_context_destructor);
172
173         return notify;
174 fail:
175         TALLOC_FREE(notify);
176         return NULL;
177 }
178
179 static int notify_context_destructor(struct notify_context *notify)
180 {
181         DEBUG(10, ("notify_context_destructor called\n"));
182
183         if (notify->msg != NULL) {
184                 messaging_deregister(notify->msg, MSG_PVFS_NOTIFY, notify);
185         }
186
187         while (notify->list != NULL) {
188                 DEBUG(10, ("Removing private_data=%p\n",
189                            notify->list->private_data));
190                 notify_remove(notify, notify->list->private_data);
191         }
192         return 0;
193 }
194
195 NTSTATUS notify_add(struct notify_context *notify,
196                     const char *path, uint32_t filter, uint32_t subdir_filter,
197                     void (*callback)(void *, const struct notify_event *),
198                     void *private_data)
199 {
200         struct notify_db_entry e;
201         struct notify_list *listel;
202         struct db_record *notify_rec, *idx_rec;
203         bool add_idx;
204         NTSTATUS status;
205         TDB_DATA key, notify_copy;
206
207         if (notify == NULL) {
208                 return NT_STATUS_NOT_IMPLEMENTED;
209         }
210
211         DEBUG(10, ("notify_add: path=[%s], private_data=%p\n", path,
212                    private_data));
213
214         listel = talloc(notify, struct notify_list);
215         if (listel == NULL) {
216                 return NT_STATUS_NO_MEMORY;
217         }
218         listel->callback = callback;
219         listel->private_data = private_data;
220         listel->path = talloc_strdup(listel, path);
221         if (listel->path == NULL) {
222                 TALLOC_FREE(listel);
223                 return NT_STATUS_NO_MEMORY;
224         }
225         DLIST_ADD(notify->list, listel);
226
227         ZERO_STRUCT(e);
228         e.filter = filter;
229         e.subdir_filter = subdir_filter;
230         e.server = messaging_server_id(notify->msg);
231         e.private_data = private_data;
232
233         key = string_tdb_data(path);
234
235         notify_rec = dbwrap_fetch_locked(notify->db_notify,
236                                          talloc_tos(), key);
237         if (notify_rec == NULL) {
238                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
239                 goto fail;
240         }
241
242         /*
243          * Make a copy of the notify_rec for easy restore in case
244          * updating the index_rec fails;
245          */
246         notify_copy = dbwrap_record_get_value(notify_rec);
247         if (notify_copy.dsize != 0) {
248                 notify_copy.dptr = (uint8_t *)talloc_memdup(
249                         notify_rec, notify_copy.dptr,
250                         notify_copy.dsize);
251                 if (notify_copy.dptr == NULL) {
252                         TALLOC_FREE(notify_rec);
253                         status = NT_STATUS_NO_MEMORY;
254                         goto fail;
255                 }
256         }
257
258         if (DEBUGLEVEL >= 10) {
259                 NDR_PRINT_DEBUG(notify_db_entry, &e);
260         }
261
262         status = notify_add_entry(notify_rec, &e, &add_idx);
263         if (!NT_STATUS_IS_OK(status)) {
264                 goto fail;
265         }
266         if (!add_idx) {
267                 /*
268                  * Someone else has added the idx entry already
269                  */
270                 TALLOC_FREE(notify_rec);
271                 return NT_STATUS_OK;
272         }
273
274         idx_rec = dbwrap_fetch_locked(notify->db_index,
275                                       talloc_tos(), key);
276         if (idx_rec == NULL) {
277                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
278                 goto restore_notify;
279         }
280         status = notify_add_idx(idx_rec, get_my_vnn());
281         if (!NT_STATUS_IS_OK(status)) {
282                 goto restore_notify;
283         }
284
285         TALLOC_FREE(idx_rec);
286         TALLOC_FREE(notify_rec);
287         return NT_STATUS_OK;
288
289 restore_notify:
290         if (notify_copy.dsize != 0) {
291                 dbwrap_record_store(notify_rec, notify_copy, 0);
292         } else {
293                 dbwrap_record_delete(notify_rec);
294         }
295         TALLOC_FREE(notify_rec);
296 fail:
297         DLIST_REMOVE(notify->list, listel);
298         TALLOC_FREE(listel);
299         return status;
300 }
301
302 static NTSTATUS notify_add_entry(struct db_record *rec,
303                                  const struct notify_db_entry *e,
304                                  bool *p_add_idx)
305 {
306         TDB_DATA value = dbwrap_record_get_value(rec);
307         struct notify_db_entry *entries;
308         size_t num_entries;
309         bool add_idx = true;
310         NTSTATUS status;
311
312         if (value.dsize == sizeof(time_t)) {
313                 DEBUG(10, ("Re-using deleted entry\n"));
314                 value.dsize = 0;
315                 add_idx = false;
316         }
317
318         if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
319                 DEBUG(1, ("Invalid value.dsize = %u\n",
320                           (unsigned)value.dsize));
321                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
322         }
323         num_entries = value.dsize / sizeof(struct notify_db_entry);
324
325         if (num_entries != 0) {
326                 add_idx = false;
327         }
328
329         entries = talloc_array(rec, struct notify_db_entry, num_entries + 1);
330         if (entries == NULL) {
331                 return NT_STATUS_NO_MEMORY;
332         }
333         memcpy(entries, value.dptr, value.dsize);
334
335         entries[num_entries] = *e;
336         value = make_tdb_data((uint8_t *)entries, talloc_get_size(entries));
337         status = dbwrap_record_store(rec, value, 0);
338         TALLOC_FREE(entries);
339         if (!NT_STATUS_IS_OK(status)) {
340                 return status;
341         }
342         *p_add_idx = add_idx;
343         return NT_STATUS_OK;
344 }
345
346 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn)
347 {
348         TDB_DATA value = dbwrap_record_get_value(rec);
349         uint32_t *vnns;
350         size_t i, num_vnns;
351         NTSTATUS status;
352
353         if ((value.dsize % sizeof(uint32_t)) != 0) {
354                 DEBUG(1, ("Invalid value.dsize = %u\n",
355                           (unsigned)value.dsize));
356                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
357         }
358         num_vnns = value.dsize / sizeof(uint32_t);
359         vnns = (uint32_t *)value.dptr;
360
361         for (i=0; i<num_vnns; i++) {
362                 if (vnns[i] == vnn) {
363                         return NT_STATUS_OK;
364                 }
365                 if (vnns[i] > vnn) {
366                         break;
367                 }
368         }
369
370         value.dptr = (uint8_t *)talloc_realloc(
371                 rec, value.dptr, uint32_t, num_vnns + 1);
372         if (value.dptr == NULL) {
373                 return NT_STATUS_NO_MEMORY;
374         }
375         value.dsize = talloc_get_size(value.dptr);
376
377         vnns = (uint32_t *)value.dptr;
378
379         memmove(&vnns[i+1], &vnns[i], sizeof(uint32_t) * (num_vnns - i));
380         vnns[i] = vnn;
381
382         status = dbwrap_record_store(rec, value, 0);
383         if (!NT_STATUS_IS_OK(status)) {
384                 return status;
385         }
386         return NT_STATUS_OK;
387 }
388
389 NTSTATUS notify_remove(struct notify_context *notify, void *private_data)
390 {
391         struct server_id pid;
392         struct notify_list *listel;
393         struct db_record *notify_rec;
394         NTSTATUS status;
395
396         if ((notify == NULL) || (notify->msg == NULL)) {
397                 return NT_STATUS_NOT_IMPLEMENTED;
398         }
399
400         DEBUG(10, ("notify_remove: private_data=%p\n", private_data));
401
402         pid = messaging_server_id(notify->msg);
403
404         for (listel=notify->list;listel;listel=listel->next) {
405                 if (listel->private_data == private_data) {
406                         DLIST_REMOVE(notify->list, listel);
407                         break;
408                 }
409         }
410         if (listel == NULL) {
411                 DEBUG(10, ("%p not found\n", private_data));
412                 return NT_STATUS_NOT_FOUND;
413         }
414         notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(),
415                                          string_tdb_data(listel->path));
416         TALLOC_FREE(listel);
417         if (notify_rec == NULL) {
418                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
419         }
420         status = notify_del_entry(notify_rec, &pid, private_data);
421         DEBUG(10, ("del_entry returned %s\n", nt_errstr(status)));
422         TALLOC_FREE(notify_rec);
423         return status;
424 }
425
426 static NTSTATUS notify_del_entry(struct db_record *rec,
427                                  const struct server_id *pid,
428                                  void *private_data)
429 {
430         TDB_DATA value = dbwrap_record_get_value(rec);
431         struct notify_db_entry *entries;
432         size_t i, num_entries;
433         time_t now;
434
435         DEBUG(10, ("del_entry called for %s %p\n", procid_str_static(pid),
436                    private_data));
437
438         if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
439                 DEBUG(1, ("Invalid value.dsize = %u\n",
440                           (unsigned)value.dsize));
441                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
442         }
443         num_entries = value.dsize / sizeof(struct notify_db_entry);
444         entries = (struct notify_db_entry *)value.dptr;
445
446         for (i=0; i<num_entries; i++) {
447                 struct notify_db_entry *e = &entries[i];
448
449                 if (DEBUGLEVEL >= 10) {
450                         NDR_PRINT_DEBUG(notify_db_entry, e);
451                 }
452
453                 if (e->private_data != private_data) {
454                         continue;
455                 }
456                 if (serverid_equal(&e->server, pid)) {
457                         break;
458                 }
459         }
460         if (i == num_entries) {
461                 return NT_STATUS_NOT_FOUND;
462         }
463         entries[i] = entries[num_entries-1];
464         value.dsize -= sizeof(struct notify_db_entry);
465
466         if (value.dsize == 0) {
467                 now = time(NULL);
468                 value.dptr = (uint8_t *)&now;
469                 value.dsize = sizeof(now);
470         }
471         return dbwrap_record_store(rec, value, 0);
472 }
473
474 struct notify_trigger_index_state {
475         TALLOC_CTX *mem_ctx;
476         uint32_t *vnns;
477         uint32_t my_vnn;
478         bool found_my_vnn;
479 };
480
481 static void notify_trigger_index_parser(TDB_DATA key, TDB_DATA data,
482                                         void *private_data)
483 {
484         struct notify_trigger_index_state *state =
485                 (struct notify_trigger_index_state *)private_data;
486         uint32_t *new_vnns;
487         size_t i, num_vnns, num_new_vnns, num_remote_vnns;
488
489         if ((data.dsize % sizeof(uint32_t)) != 0) {
490                 DEBUG(1, ("Invalid record size in notify index db: %u\n",
491                           (unsigned)data.dsize));
492                 return;
493         }
494         new_vnns = (uint32_t *)data.dptr;
495         num_new_vnns = data.dsize / sizeof(uint32_t);
496         num_remote_vnns = num_new_vnns;
497
498         for (i=0; i<num_new_vnns; i++) {
499                 if (new_vnns[i] == state->my_vnn) {
500                         state->found_my_vnn = true;
501                         num_remote_vnns -= 1;
502                 }
503         }
504         if (num_remote_vnns == 0) {
505                 return;
506         }
507
508         num_vnns = talloc_array_length(state->vnns);
509         state->vnns = talloc_realloc(state->mem_ctx, state->vnns, uint32_t,
510                                      num_vnns + num_remote_vnns);
511         if (state->vnns == NULL) {
512                 DEBUG(1, ("talloc_realloc failed\n"));
513                 return;
514         }
515
516         for (i=0; i<num_new_vnns; i++) {
517                 if (new_vnns[i] != state->my_vnn) {
518                         state->vnns[num_vnns] = new_vnns[i];
519                         num_vnns += 1;
520                 }
521         }
522 }
523
524 static int vnn_cmp(const void *p1, const void *p2)
525 {
526         const uint32_t *vnn1 = (const uint32_t *)p1;
527         const uint32_t *vnn2 = (const uint32_t *)p2;
528
529         if (*vnn1 < *vnn2) {
530                 return -1;
531         }
532         if (*vnn1 == *vnn2) {
533                 return 0;
534         }
535         return 1;
536 }
537
538 static bool notify_push_remote_blob(TALLOC_CTX *mem_ctx, uint32_t action,
539                                     uint32_t filter, const char *path,
540                                     uint8_t **pblob, size_t *pblob_len)
541 {
542         struct notify_remote_event ev;
543         DATA_BLOB data;
544         enum ndr_err_code ndr_err;
545
546         ev.action = action;
547         ev.filter = filter;
548         ev.path = path;
549
550         if (DEBUGLEVEL >= 10) {
551                 NDR_PRINT_DEBUG(notify_remote_event, &ev);
552         }
553
554         ndr_err = ndr_push_struct_blob(
555                 &data, mem_ctx, &ev,
556                 (ndr_push_flags_fn_t)ndr_push_notify_remote_event);
557         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
558                 return false;
559         }
560         *pblob = data.data;
561         *pblob_len = data.length;
562         return true;
563 }
564
565 static bool notify_pull_remote_blob(TALLOC_CTX *mem_ctx,
566                                     const uint8_t *blob, size_t blob_len,
567                                     uint32_t *paction, uint32_t *pfilter,
568                                     char **path)
569 {
570         struct notify_remote_event *ev;
571         enum ndr_err_code ndr_err;
572         DATA_BLOB data;
573         char *p;
574
575         data.data = discard_const_p(uint8_t, blob);
576         data.length = blob_len;
577
578         ev = talloc(mem_ctx, struct notify_remote_event);
579         if (ev == NULL) {
580                 return false;
581         }
582
583         ndr_err = ndr_pull_struct_blob(
584                 &data, ev, ev,
585                 (ndr_pull_flags_fn_t)ndr_pull_notify_remote_event);
586         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
587                 TALLOC_FREE(ev);
588                 return false;
589         }
590         if (DEBUGLEVEL >= 10) {
591                 NDR_PRINT_DEBUG(notify_remote_event, ev);
592         }
593         *paction = ev->action;
594         *pfilter = ev->filter;
595         p = discard_const_p(char, ev->path);
596         *path = talloc_move(mem_ctx, &p);
597
598         TALLOC_FREE(ev);
599         return true;
600 }
601
602 void notify_trigger(struct notify_context *notify,
603                     uint32_t action, uint32_t filter, const char *path)
604 {
605         struct ctdbd_connection *ctdbd_conn;
606         struct notify_trigger_index_state idx_state;
607         const char *p, *next_p;
608         size_t i, num_vnns;
609         uint32_t last_vnn;
610         uint8_t *remote_blob = NULL;
611         size_t remote_blob_len = 0;
612
613         DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, "
614                    "path=%s\n", (unsigned)action, (unsigned)filter, path));
615
616         /* see if change notify is enabled at all */
617         if (notify == NULL) {
618                 return;
619         }
620
621         if (path[0] != '/') {
622                 /*
623                  * The rest of this routine assumes an absolute path.
624                  */
625                 return;
626         }
627
628         idx_state.mem_ctx = talloc_tos();
629         idx_state.vnns = NULL;
630         idx_state.found_my_vnn = false;
631         idx_state.my_vnn = get_my_vnn();
632
633         for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
634                 ptrdiff_t path_len = p - path;
635                 bool recursive;
636
637                 next_p = strchr(p+1, '/');
638                 recursive = (next_p != NULL);
639
640                 dbwrap_parse_record(
641                         notify->db_index,
642                         make_tdb_data(discard_const_p(uint8_t, path), path_len),
643                         notify_trigger_index_parser, &idx_state);
644
645                 if (idx_state.found_my_vnn) {
646                         notify_trigger_local(notify, action, filter,
647                                              path, path_len, recursive);
648                         idx_state.found_my_vnn = false;
649                 }
650         }
651
652         if (idx_state.vnns == NULL) {
653                 goto done;
654         }
655
656         ctdbd_conn = messaging_ctdbd_connection();
657         if (ctdbd_conn == NULL) {
658                 goto done;
659         }
660
661         num_vnns = talloc_array_length(idx_state.vnns);
662         qsort(idx_state.vnns, num_vnns, sizeof(uint32_t), vnn_cmp);
663
664         last_vnn = 0xffffffff;
665
666         if (!notify_push_remote_blob(talloc_tos(), action, filter, path,
667                                      &remote_blob, &remote_blob_len)) {
668                 DEBUG(1, ("notify_push_remote_blob failed\n"));
669                 goto done;
670         }
671
672         for (i=0; i<num_vnns; i++) {
673                 uint32_t vnn = idx_state.vnns[i];
674                 NTSTATUS status;
675
676                 if (vnn == last_vnn) {
677                         continue;
678                 }
679
680                 status = ctdbd_messaging_send_blob(
681                         ctdbd_conn, vnn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
682                         remote_blob, remote_blob_len);
683                 if (!NT_STATUS_IS_OK(status)) {
684                         DEBUG(10, ("ctdbd_messaging_send_blob to vnn %d "
685                                    "returned %s, ignoring\n", (int)vnn,
686                                    nt_errstr(status)));
687                 }
688
689                 last_vnn = vnn;
690         }
691
692 done:
693         TALLOC_FREE(remote_blob);
694         TALLOC_FREE(idx_state.vnns);
695 }
696
697 static void notify_trigger_local(struct notify_context *notify,
698                                  uint32_t action, uint32_t filter,
699                                  const char *path, size_t path_len,
700                                  bool recursive)
701 {
702         TDB_DATA data;
703         struct notify_db_entry *entries;
704         size_t i, num_entries;
705         NTSTATUS status;
706
707         DEBUG(10, ("notify_trigger_local called for %*s, path_len=%d, "
708                    "filter=%d\n", (int)path_len, path, (int)path_len,
709                    (int)filter));
710
711         status = dbwrap_fetch(
712                 notify->db_notify, talloc_tos(),
713                 make_tdb_data(discard_const_p(uint8_t, path), path_len), &data);
714         if (!NT_STATUS_IS_OK(status)) {
715                 DEBUG(10, ("dbwrap_fetch returned %s\n",
716                            nt_errstr(status)));
717                 return;
718         }
719         if (data.dsize == sizeof(time_t)) {
720                 DEBUG(10, ("Got deleted record\n"));
721                 goto done;
722         }
723         if ((data.dsize % sizeof(struct notify_db_entry)) != 0) {
724                 DEBUG(1, ("Invalid data.dsize = %u\n",
725                           (unsigned)data.dsize));
726                 goto done;
727         }
728
729         entries = (struct notify_db_entry *)data.dptr;
730         num_entries = data.dsize / sizeof(struct notify_db_entry);
731
732         DEBUG(10, ("recursive = %s pathlen=%d (%c)\n",
733                    recursive ? "true" : "false", (int)path_len,
734                    path[path_len]));
735
736         for (i=0; i<num_entries; i++) {
737                 struct notify_db_entry *e = &entries[i];
738                 uint32_t e_filter;
739
740                 if (DEBUGLEVEL >= 10) {
741                         NDR_PRINT_DEBUG(notify_db_entry, e);
742                 }
743
744                 e_filter = recursive ? e->subdir_filter : e->filter;
745
746                 if ((filter & e_filter) == 0) {
747                         continue;
748                 }
749
750                 if (!procid_is_local(&e->server)) {
751                         DEBUG(1, ("internal error: Non-local pid %s in "
752                                   "notify.tdb\n",
753                                   procid_str_static(&e->server)));
754                         continue;
755                 }
756
757                 status = notify_send(notify, &e->server, path + path_len + 1,
758                                      action, e->private_data);
759                 if (!NT_STATUS_IS_OK(status)) {
760                         DEBUG(10, ("notify_send returned %s\n",
761                                    nt_errstr(status)));
762                 }
763         }
764
765 done:
766         TALLOC_FREE(data.dptr);
767 }
768
769 static NTSTATUS notify_send(struct notify_context *notify,
770                             struct server_id *pid,
771                             const char *path, uint32_t action,
772                             void *private_data)
773 {
774         struct notify_event ev;
775         DATA_BLOB data;
776         NTSTATUS status;
777         enum ndr_err_code ndr_err;
778
779         ev.action = action;
780         ev.path = path;
781         ev.private_data = private_data;
782
783         ndr_err = ndr_push_struct_blob(
784                 &data, talloc_tos(), &ev,
785                 (ndr_push_flags_fn_t)ndr_push_notify_event);
786         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
787                 return ndr_map_error2ntstatus(ndr_err);
788         }
789         status = messaging_send(notify->msg, *pid, MSG_PVFS_NOTIFY,
790                                 &data);
791         TALLOC_FREE(data.data);
792         return status;
793 }
794
795 static void notify_handler(struct messaging_context *msg_ctx,
796                            void *private_data, uint32_t msg_type,
797                            struct server_id server_id, DATA_BLOB *data)
798 {
799         struct notify_context *notify = talloc_get_type_abort(
800                 private_data, struct notify_context);
801         enum ndr_err_code ndr_err;
802         struct notify_event *n;
803         struct notify_list *listel;
804
805         n = talloc(talloc_tos(), struct notify_event);
806         if (n == NULL) {
807                 DEBUG(1, ("talloc failed\n"));
808                 return;
809         }
810
811         ndr_err = ndr_pull_struct_blob(
812                 data, n, n, (ndr_pull_flags_fn_t)ndr_pull_notify_event);
813         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
814                 TALLOC_FREE(n);
815                 return;
816         }
817         if (DEBUGLEVEL >= 10) {
818                 NDR_PRINT_DEBUG(notify_event, n);
819         }
820
821         for (listel=notify->list;listel;listel=listel->next) {
822                 if (listel->private_data == n->private_data) {
823                         listel->callback(listel->private_data, n);
824                         break;
825                 }
826         }
827         TALLOC_FREE(n);
828 }
829
830 struct notify_walk_idx_state {
831         void (*fn)(const char *path,
832                    uint32_t *vnns, size_t num_vnns,
833                    void *private_data);
834         void *private_data;
835 };
836
837 static int notify_walk_idx_fn(struct db_record *rec, void *private_data)
838 {
839         struct notify_walk_idx_state *state =
840                 (struct notify_walk_idx_state *)private_data;
841         TDB_DATA key, value;
842         char *path;
843
844         key = dbwrap_record_get_key(rec);
845         value = dbwrap_record_get_value(rec);
846
847         if ((value.dsize % sizeof(uint32_t)) != 0) {
848                 DEBUG(1, ("invalid value size in notify index db: %u\n",
849                           (unsigned)(value.dsize)));
850                 return 0;
851         }
852
853         path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
854         if (path == NULL) {
855                 DEBUG(1, ("talloc_strndup failed\n"));
856                 return 0;
857         }
858         state->fn(path, (uint32_t *)value.dptr, value.dsize/sizeof(uint32_t),
859                   state->private_data);
860         TALLOC_FREE(path);
861         return 0;
862 }
863
864 void notify_walk_idx(struct notify_context *notify,
865                      void (*fn)(const char *path,
866                                 uint32_t *vnns, size_t num_vnns,
867                                 void *private_data),
868                      void *private_data)
869 {
870         struct notify_walk_idx_state state;
871         state.fn = fn;
872         state.private_data = private_data;
873         dbwrap_traverse_read(notify->db_index, notify_walk_idx_fn, &state,
874                              NULL);
875 }
876
877 struct notify_walk_state {
878         void (*fn)(const char *path,
879                    struct notify_db_entry *entries, size_t num_entries,
880                    time_t deleted_time, void *private_data);
881         void *private_data;
882 };
883
884 static int notify_walk_fn(struct db_record *rec, void *private_data)
885 {
886         struct notify_walk_state *state =
887                 (struct notify_walk_state *)private_data;
888         TDB_DATA key, value;
889         struct notify_db_entry *entries;
890         size_t num_entries;
891         time_t deleted_time;
892         char *path;
893
894         key = dbwrap_record_get_key(rec);
895         value = dbwrap_record_get_value(rec);
896
897         if (value.dsize == sizeof(deleted_time)) {
898                 memcpy(&deleted_time, value.dptr, sizeof(deleted_time));
899                 entries = NULL;
900                 num_entries = 0;
901         } else {
902                 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
903                         DEBUG(1, ("invalid value size in notify db: %u\n",
904                                   (unsigned)(value.dsize)));
905                         return 0;
906                 }
907                 entries = (struct notify_db_entry *)value.dptr;
908                 num_entries = value.dsize / sizeof(struct notify_db_entry);
909                 deleted_time = 0;
910         }
911
912         path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
913         if (path == NULL) {
914                 DEBUG(1, ("talloc_strndup failed\n"));
915                 return 0;
916         }
917         state->fn(path, entries, num_entries, deleted_time,
918                   state->private_data);
919         TALLOC_FREE(path);
920         return 0;
921 }
922
923 void notify_walk(struct notify_context *notify,
924                  void (*fn)(const char *path,
925                             struct notify_db_entry *entries,
926                             size_t num_entries,
927                             time_t deleted_time, void *private_data),
928                  void *private_data)
929 {
930         struct notify_walk_state state;
931         state.fn = fn;
932         state.private_data = private_data;
933         dbwrap_traverse_read(notify->db_notify, notify_walk_fn, &state,
934                              NULL);
935 }
936
937 struct notify_cleanup_state {
938         TALLOC_CTX *mem_ctx;
939         time_t delete_before;
940         ssize_t array_size;
941         uint32_t num_paths;
942         char **paths;
943 };
944
945 static void notify_cleanup_collect(
946         const char *path, struct notify_db_entry *entries, size_t num_entries,
947         time_t deleted_time, void *private_data)
948 {
949         struct notify_cleanup_state *state =
950                 (struct notify_cleanup_state *)private_data;
951         char *p;
952
953         if (num_entries != 0) {
954                 return;
955         }
956         if (deleted_time >= state->delete_before) {
957                 return;
958         }
959
960         p = talloc_strdup(state->mem_ctx, path);
961         if (p == NULL) {
962                 DEBUG(1, ("talloc_strdup failed\n"));
963                 return;
964         }
965         add_to_large_array(state->mem_ctx, sizeof(p), (void *)&p,
966                            &state->paths, &state->num_paths,
967                            &state->array_size);
968         if (state->array_size == -1) {
969                 TALLOC_FREE(p);
970         }
971 }
972
973 static bool notify_cleanup_path(struct notify_context *notify,
974                               const char *path, time_t delete_before);
975
976 void notify_cleanup(struct notify_context *notify)
977 {
978         struct notify_cleanup_state state;
979         uint32_t failure_pool;
980
981         ZERO_STRUCT(state);
982         state.mem_ctx = talloc_stackframe();
983
984         state.delete_before = time(NULL)
985                 - lp_parm_int(-1, "smbd", "notify cleanup interval", 60);
986
987         notify_walk(notify, notify_cleanup_collect, &state);
988
989         failure_pool = state.num_paths;
990
991         while (state.num_paths != 0) {
992                 size_t idx;
993
994                 /*
995                  * This loop is designed to be as kind as possible to
996                  * ctdb. ctdb does not like it if many smbds hammer on a
997                  * single record. If on many nodes the cleanup process starts
998                  * running, it can happen that all of them need to clean up
999                  * records in the same order. This would generate a ctdb
1000                  * migrate storm on these records. Randomizing the load across
1001                  * multiple records reduces the load on the individual record.
1002                  */
1003
1004                 generate_random_buffer((uint8_t *)&idx, sizeof(idx));
1005                 idx = idx % state.num_paths;
1006
1007                 if (!notify_cleanup_path(notify, state.paths[idx],
1008                                          state.delete_before)) {
1009                         /*
1010                          * notify_cleanup_path failed, the most likely reason
1011                          * is that dbwrap_try_fetch_locked failed due to
1012                          * contention. We allow one failed attempt per deleted
1013                          * path on average before we give up.
1014                          */
1015                         failure_pool -= 1;
1016                         if (failure_pool == 0) {
1017                                 /*
1018                                  * Too many failures. We will come back here,
1019                                  * maybe next time there is less contention.
1020                                  */
1021                                 break;
1022                         }
1023                 }
1024
1025                 TALLOC_FREE(state.paths[idx]);
1026                 state.paths[idx] = state.paths[state.num_paths-1];
1027                 state.num_paths -= 1;
1028         }
1029         TALLOC_FREE(state.mem_ctx);
1030 }
1031
1032 static bool notify_cleanup_path(struct notify_context *notify,
1033                                 const char *path, time_t delete_before)
1034 {
1035         struct db_record *notify_rec = NULL;
1036         struct db_record *idx_rec = NULL;
1037         TDB_DATA key = string_tdb_data(path);
1038         TDB_DATA value;
1039         time_t deleted;
1040         NTSTATUS status;
1041
1042         notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(), key);
1043         if (notify_rec == NULL) {
1044                 DEBUG(10, ("Could not fetch notify_rec\n"));
1045                 return false;
1046         }
1047         value = dbwrap_record_get_value(notify_rec);
1048
1049         if (value.dsize != sizeof(deleted)) {
1050                 DEBUG(10, ("record %s has been re-used\n", path));
1051                 goto done;
1052         }
1053         memcpy(&deleted, value.dptr, sizeof(deleted));
1054
1055         if (deleted >= delete_before) {
1056                 DEBUG(10, ("record %s too young\n", path));
1057                 goto done;
1058         }
1059
1060         /*
1061          * Be kind to ctdb and only try one dmaster migration at most.
1062          */
1063         idx_rec = dbwrap_try_fetch_locked(notify->db_index, talloc_tos(), key);
1064         if (idx_rec == NULL) {
1065                 DEBUG(10, ("Could not fetch idx_rec\n"));
1066                 goto done;
1067         }
1068
1069         status = dbwrap_record_delete(notify_rec);
1070         if (!NT_STATUS_IS_OK(status)) {
1071                 DEBUG(10, ("Could not delete notify_rec: %s\n",
1072                            nt_errstr(status)));
1073         }
1074
1075         status = notify_del_idx(idx_rec, get_my_vnn());
1076         if (!NT_STATUS_IS_OK(status)) {
1077                 DEBUG(10, ("Could not delete idx_rec: %s\n",
1078                            nt_errstr(status)));
1079         }
1080
1081 done:
1082         TALLOC_FREE(idx_rec);
1083         TALLOC_FREE(notify_rec);
1084         return true;
1085 }
1086
1087 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn)
1088 {
1089         TDB_DATA value = dbwrap_record_get_value(rec);
1090         uint32_t *vnns;
1091         size_t i, num_vnns;
1092
1093         if ((value.dsize % sizeof(uint32_t)) != 0) {
1094                 DEBUG(1, ("Invalid value.dsize = %u\n",
1095                           (unsigned)value.dsize));
1096                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
1097         }
1098         num_vnns = value.dsize / sizeof(uint32_t);
1099         vnns = (uint32_t *)value.dptr;
1100
1101         for (i=0; i<num_vnns; i++) {
1102                 if (vnns[i] == vnn) {
1103                         break;
1104                 }
1105         }
1106
1107         if (i == num_vnns) {
1108                 /*
1109                  * Not found. Should not happen, but okay...
1110                  */
1111                 return NT_STATUS_OK;
1112         }
1113
1114         memmove(&vnns[i], &vnns[i+1], sizeof(uint32_t) * (num_vnns - i - 1));
1115         value.dsize -= sizeof(uint32_t);
1116
1117         if (value.dsize == 0) {
1118                 return dbwrap_record_delete(rec);
1119         }
1120         return dbwrap_record_store(rec, value, 0);
1121 }
1122
1123 struct notify_cluster_proxy_state {
1124         struct tevent_context *ev;
1125         struct notify_context *notify;
1126         struct ctdb_msg_channel *chan;
1127 };
1128
1129 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq);
1130 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq);
1131 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1132                                          uint32_t action, uint32_t filter,
1133                                          char *path);
1134
1135 struct tevent_req *notify_cluster_proxy_send(
1136         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1137         struct notify_context *notify)
1138 {
1139         struct tevent_req *req, *subreq;
1140         struct notify_cluster_proxy_state *state;
1141
1142         req = tevent_req_create(mem_ctx, &state,
1143                                 struct notify_cluster_proxy_state);
1144         if (req == NULL) {
1145                 return NULL;
1146         }
1147         state->ev = ev;
1148         state->notify = notify;
1149
1150         subreq = ctdb_msg_channel_init_send(
1151                 state, state->ev,  lp_ctdbd_socket(),
1152                 CTDB_SRVID_SAMBA_NOTIFY_PROXY);
1153         if (tevent_req_nomem(subreq, req)) {
1154                 return tevent_req_post(req, ev);
1155         }
1156         tevent_req_set_callback(subreq, notify_cluster_proxy_got_chan, req);
1157         return req;
1158 }
1159
1160 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq)
1161 {
1162         struct tevent_req *req = tevent_req_callback_data(
1163                 subreq, struct tevent_req);
1164         struct notify_cluster_proxy_state *state = tevent_req_data(
1165                 req, struct notify_cluster_proxy_state);
1166         int ret;
1167
1168         ret = ctdb_msg_channel_init_recv(subreq, state, &state->chan);
1169         TALLOC_FREE(subreq);
1170         if (ret != 0) {
1171                 tevent_req_error(req, ret);
1172                 return;
1173         }
1174         subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1175         if (tevent_req_nomem(subreq, req)) {
1176                 return;
1177         }
1178         tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1179 }
1180
1181 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq)
1182 {
1183         struct tevent_req *req = tevent_req_callback_data(
1184                 subreq, struct tevent_req);
1185         struct notify_cluster_proxy_state *state = tevent_req_data(
1186                 req, struct notify_cluster_proxy_state);
1187         uint8_t *msg;
1188         size_t msg_len;
1189         uint32_t action, filter;
1190         char *path;
1191         int ret;
1192         bool res;
1193
1194         ret = ctdb_msg_read_recv(subreq, talloc_tos(), &msg, &msg_len);
1195         TALLOC_FREE(subreq);
1196         if (ret != 0) {
1197                 tevent_req_error(req, ret);
1198                 return;
1199         }
1200
1201         res = notify_pull_remote_blob(talloc_tos(), msg, msg_len,
1202                                       &action, &filter, &path);
1203         TALLOC_FREE(msg);
1204         if (!res) {
1205                 tevent_req_error(req, EIO);
1206                 return;
1207         }
1208         notify_cluster_proxy_trigger(state->notify, action, filter, path);
1209         TALLOC_FREE(path);
1210
1211         subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1212         if (tevent_req_nomem(subreq, req)) {
1213                 return;
1214         }
1215         tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1216 }
1217
1218 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1219                                          uint32_t action, uint32_t filter,
1220                                          char *path)
1221 {
1222         const char *p, *next_p;
1223
1224         for (p = path; p != NULL; p = next_p) {
1225                 ptrdiff_t path_len = p - path;
1226                 bool recursive;
1227
1228                 next_p = strchr(p+1, '/');
1229                 recursive = (next_p != NULL);
1230
1231                 notify_trigger_local(notify, action, filter,
1232                                      path, path_len, recursive);
1233         }
1234 }
1235
1236 int notify_cluster_proxy_recv(struct tevent_req *req)
1237 {
1238         int err;
1239
1240         if (tevent_req_is_unix_error(req, &err)) {
1241                 return err;
1242         }
1243         return 0;
1244 }