dbwrap: Convert backend store to storev
[metze/samba/wip.git] / source3 / lib / dbwrap / dbwrap_watch.c
1 /*
2    Unix SMB/CIFS implementation.
3    Watch dbwrap record changes
4    Copyright (C) Volker Lendecke 2012
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "lib/util/server_id.h"
23 #include "dbwrap/dbwrap.h"
24 #include "dbwrap_watch.h"
25 #include "dbwrap_open.h"
26 #include "lib/util/util_tdb.h"
27 #include "lib/util/tevent_ntstatus.h"
28 #include "server_id_watch.h"
29 #include "lib/dbwrap/dbwrap_private.h"
30
31 static ssize_t dbwrap_record_watchers_key(struct db_context *db,
32                                           struct db_record *rec,
33                                           uint8_t *wkey, size_t wkey_len)
34 {
35         size_t db_id_len = dbwrap_db_id(db, NULL, 0);
36         uint8_t db_id[db_id_len];
37         size_t needed;
38         TDB_DATA key;
39
40         dbwrap_db_id(db, db_id, db_id_len);
41
42         key = dbwrap_record_get_key(rec);
43
44         needed = sizeof(uint32_t) + db_id_len;
45         if (needed < sizeof(uint32_t)) {
46                 return -1;
47         }
48
49         needed += key.dsize;
50         if (needed < key.dsize) {
51                 return -1;
52         }
53
54         if (wkey_len >= needed) {
55                 SIVAL(wkey, 0, db_id_len);
56                 memcpy(wkey + sizeof(uint32_t), db_id, db_id_len);
57                 memcpy(wkey + sizeof(uint32_t) + db_id_len,
58                        key.dptr, key.dsize);
59         }
60
61         return needed;
62 }
63
64 static bool dbwrap_record_watchers_key_parse(
65         TDB_DATA wkey, uint8_t **p_db_id, size_t *p_db_id_len, TDB_DATA *key)
66 {
67         size_t db_id_len;
68
69         if (wkey.dsize < sizeof(uint32_t)) {
70                 DBG_WARNING("Invalid watchers key, dsize=%zu\n", wkey.dsize);
71                 return false;
72         }
73         db_id_len = IVAL(wkey.dptr, 0);
74         if (db_id_len > (wkey.dsize - sizeof(uint32_t))) {
75                 DBG_WARNING("Invalid watchers key, wkey.dsize=%zu, "
76                             "db_id_len=%zu\n", wkey.dsize, db_id_len);
77                 return false;
78         }
79         if (p_db_id != NULL) {
80                 *p_db_id = wkey.dptr + sizeof(uint32_t);
81         }
82         if (p_db_id_len != NULL) {
83                 *p_db_id_len = db_id_len;
84         }
85         if (key != NULL) {
86                 key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
87                 key->dsize = wkey.dsize - sizeof(uint32_t) - db_id_len;
88         }
89         return true;
90 }
91
92 /*
93  * Watched records contain a header of:
94  *
95  * [uint32] num_records | deleted bit
96  * 0 [SERVER_ID_BUF_LENGTH]                   \
97  * 1 [SERVER_ID_BUF_LENGTH]                   |
98  * ..                                         |- Array of watchers
99  * (num_records-1)[SERVER_ID_BUF_LENGTH]      /
100  *
101  * [Remainder of record....]
102  *
103  * If this header is absent then this is a
104  * fresh record of length zero (no watchers).
105  *
106  * Note that a record can be deleted with
107  * watchers present. If so the deleted bit
108  * is set and the watcher server_id's are
109  * woken to allow them to remove themselves
110  * from the watcher array. The record is left
111  * present marked with the deleted bit until all
112  * watchers are removed, then the record itself
113  * is deleted.
114  */
115
116 #define NUM_WATCHERS_DELETED_BIT (1UL<<31)
117 #define NUM_WATCHERS_MASK (NUM_WATCHERS_DELETED_BIT-1)
118
119 static ssize_t dbwrap_watched_parse(TDB_DATA data, struct server_id *ids,
120                                     size_t num_ids, bool *pdeleted,
121                                     TDB_DATA *pdata)
122 {
123         size_t i, num_watchers;
124         bool deleted;
125
126         if (data.dsize < sizeof(uint32_t)) {
127                 /* Fresh or invalid record */
128                 return -1;
129         }
130
131         num_watchers = IVAL(data.dptr, 0);
132
133         deleted = num_watchers & NUM_WATCHERS_DELETED_BIT;
134         num_watchers &= NUM_WATCHERS_MASK;
135
136         data.dptr += sizeof(uint32_t);
137         data.dsize -= sizeof(uint32_t);
138
139         if (num_watchers > data.dsize/SERVER_ID_BUF_LENGTH) {
140                 /* Invalid record */
141                 return -1;
142         }
143
144         if (num_watchers > num_ids) {
145                 /*
146                  * Not enough space to store the watchers server_id's.
147                  * Just move past all of them to allow the remaining part
148                  * of the record to be returned.
149                  */
150                 data.dptr += num_watchers * SERVER_ID_BUF_LENGTH;
151                 data.dsize -= num_watchers * SERVER_ID_BUF_LENGTH;
152                 goto done;
153         }
154
155         /*
156          * Note, even if marked deleted we still must
157          * return the id's array to allow awoken
158          * watchers to remove themselves.
159          */
160
161         for (i=0; i<num_watchers; i++) {
162                 server_id_get(&ids[i], data.dptr);
163                 data.dptr += SERVER_ID_BUF_LENGTH;
164                 data.dsize -= SERVER_ID_BUF_LENGTH;
165         }
166
167 done:
168         if (deleted) {
169                 data = (TDB_DATA) {0};
170         }
171         if (pdata != NULL) {
172                 *pdata = data;
173         }
174         if (pdeleted != NULL) {
175                 *pdeleted = deleted;
176         }
177
178         return num_watchers;
179 }
180
181 static ssize_t dbwrap_watched_unparse(const struct server_id *watchers,
182                                       size_t num_watchers, bool deleted,
183                                       TDB_DATA data,
184                                       uint8_t *buf, size_t buflen)
185 {
186         size_t i, len, ofs;
187         uint32_t num_watchers_buf;
188
189         if (num_watchers > UINT32_MAX/SERVER_ID_BUF_LENGTH) {
190                 return -1;
191         }
192
193         len = num_watchers * SERVER_ID_BUF_LENGTH;
194
195         len += sizeof(uint32_t);
196         if (len < sizeof(uint32_t)) {
197                 return -1;
198         }
199
200         len += data.dsize;
201         if (len < data.dsize) {
202                 return -1;
203         }
204
205         if (len > buflen) {
206                 return len;
207         }
208
209         num_watchers_buf = num_watchers;
210         if (deleted) {
211                 num_watchers_buf |= NUM_WATCHERS_DELETED_BIT;
212         }
213
214         ofs = 0;
215         SIVAL(buf, ofs, num_watchers_buf);
216         ofs += 4;
217
218         for (i=0; i<num_watchers; i++) {
219                 server_id_put(buf+ofs, watchers[i]);
220                 ofs += SERVER_ID_BUF_LENGTH;
221         }
222
223         if ((data.dptr != NULL) && (data.dsize != 0)) {
224                 memcpy(buf + ofs, data.dptr, data.dsize);
225         }
226
227         return len;
228 }
229
230 struct db_watched_ctx {
231         struct db_context *backend;
232         struct messaging_context *msg;
233 };
234
235 struct db_watched_subrec {
236         struct db_record *subrec;
237         struct server_id *watchers;
238         bool deleted;
239 };
240
241 static NTSTATUS dbwrap_watched_storev(struct db_record *rec,
242                                       const TDB_DATA *dbufs, int num_dbufs,
243                                       int flag);
244 static NTSTATUS dbwrap_watched_delete(struct db_record *rec);
245
246 static struct db_record *dbwrap_watched_fetch_locked(
247         struct db_context *db, TALLOC_CTX *mem_ctx, TDB_DATA key)
248 {
249         struct db_watched_ctx *ctx = talloc_get_type_abort(
250                 db->private_data, struct db_watched_ctx);
251         struct db_record *rec;
252         struct db_watched_subrec *subrec;
253         TDB_DATA subrec_value;
254         ssize_t num_watchers;
255
256         rec = talloc_zero(mem_ctx, struct db_record);
257         if (rec == NULL) {
258                 return NULL;
259         }
260         subrec = talloc_zero(rec, struct db_watched_subrec);
261         if (subrec == NULL) {
262                 TALLOC_FREE(rec);
263                 return NULL;
264         }
265         rec->private_data = subrec;
266
267         subrec->subrec = dbwrap_fetch_locked(ctx->backend, subrec, key);
268         if (subrec->subrec == NULL) {
269                 TALLOC_FREE(rec);
270                 return NULL;
271         }
272
273         rec->db = db;
274         rec->key = dbwrap_record_get_key(subrec->subrec);
275         rec->storev = dbwrap_watched_storev;
276         rec->delete_rec = dbwrap_watched_delete;
277
278         subrec_value = dbwrap_record_get_value(subrec->subrec);
279
280         num_watchers = dbwrap_watched_parse(subrec_value, NULL, 0, NULL, NULL);
281         if (num_watchers == -1) {
282                 /* Fresh or invalid record */
283                 rec->value = (TDB_DATA) { 0 };
284                 return rec;
285         }
286
287         subrec->watchers = talloc_array(subrec, struct server_id,
288                                         num_watchers);
289         if (subrec->watchers == NULL) {
290                 TALLOC_FREE(rec);
291                 return NULL;
292         }
293
294         dbwrap_watched_parse(subrec_value, subrec->watchers, num_watchers,
295                              &subrec->deleted, &rec->value);
296
297         return rec;
298 }
299
300 static void dbwrap_watched_wakeup(struct db_record *rec,
301                                   struct db_watched_subrec *subrec)
302 {
303         struct db_context *db = dbwrap_record_get_db(rec);
304         struct db_watched_ctx *ctx = talloc_get_type_abort(
305                 db->private_data, struct db_watched_ctx);
306         size_t i, num_watchers;
307         size_t db_id_len = dbwrap_db_id(db, NULL, 0);
308         uint8_t db_id[db_id_len];
309         uint8_t len_buf[4];
310         struct iovec iov[3];
311
312         SIVAL(len_buf, 0, db_id_len);
313
314         iov[0] = (struct iovec) { .iov_base = len_buf, .iov_len = 4 };
315         iov[1] = (struct iovec) { .iov_base = db_id, .iov_len = db_id_len };
316         iov[2] = (struct iovec) { .iov_base = rec->key.dptr,
317                                   .iov_len = rec->key.dsize };
318
319         dbwrap_db_id(db, db_id, db_id_len);
320
321         num_watchers = talloc_array_length(subrec->watchers);
322
323         i = 0;
324
325         while (i < num_watchers) {
326                 NTSTATUS status;
327                 struct server_id_buf tmp;
328
329                 DBG_DEBUG("Alerting %s\n",
330                           server_id_str_buf(subrec->watchers[i], &tmp));
331
332                 status = messaging_send_iov(ctx->msg, subrec->watchers[i],
333                                             MSG_DBWRAP_MODIFIED,
334                                             iov, ARRAY_SIZE(iov), NULL, 0);
335                 if (!NT_STATUS_IS_OK(status)) {
336                         DBG_DEBUG("messaging_send_iov to %s failed: %s\n",
337                                   server_id_str_buf(subrec->watchers[i], &tmp),
338                                   nt_errstr(status));
339                 }
340                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
341                         subrec->watchers[i] = subrec->watchers[num_watchers-1];
342                         num_watchers -= 1;
343
344                         subrec->watchers = talloc_realloc(
345                                 subrec, subrec->watchers, struct server_id,
346                                 num_watchers);
347                         continue;
348                 }
349
350                 i += 1;
351         }
352 }
353
354 static NTSTATUS dbwrap_watched_save(struct db_watched_subrec *subrec,
355                                     TDB_DATA data, int flag)
356 {
357         size_t num_watchers;
358         ssize_t len;
359         uint8_t *buf;
360         NTSTATUS status;
361
362         num_watchers = talloc_array_length(subrec->watchers);
363
364         len = dbwrap_watched_unparse(subrec->watchers, num_watchers,
365                                      subrec->deleted, data, NULL, 0);
366         if (len == -1) {
367                 return NT_STATUS_INSUFFICIENT_RESOURCES;
368         }
369
370         buf = talloc_array(subrec, uint8_t, len);
371         if (buf == NULL) {
372                 return NT_STATUS_NO_MEMORY;
373         }
374
375         dbwrap_watched_unparse(subrec->watchers, num_watchers,
376                                subrec->deleted, data, buf, len);
377
378         status = dbwrap_record_store(
379                 subrec->subrec, (TDB_DATA) { .dptr = buf, .dsize = len },
380                 flag);
381
382         TALLOC_FREE(buf);
383
384         return status;
385 }
386
387 static NTSTATUS dbwrap_watched_storev(struct db_record *rec,
388                                       const TDB_DATA *dbufs, int num_dbufs,
389                                       int flag)
390 {
391         struct db_watched_subrec *subrec = talloc_get_type_abort(
392                 rec->private_data, struct db_watched_subrec);
393         NTSTATUS status;
394         TDB_DATA data;
395
396         data = dbwrap_merge_dbufs(rec, dbufs, num_dbufs);
397         if (data.dptr == NULL) {
398                 return NT_STATUS_NO_MEMORY;
399         }
400
401         dbwrap_watched_wakeup(rec, subrec);
402
403         subrec->deleted = false;
404
405         status = dbwrap_watched_save(subrec, data, flag);
406
407         TALLOC_FREE(data.dptr);
408
409         return status;
410 }
411
412 static NTSTATUS dbwrap_watched_delete(struct db_record *rec)
413 {
414         struct db_watched_subrec *subrec = talloc_get_type_abort(
415                 rec->private_data, struct db_watched_subrec);
416         size_t num_watchers;
417
418         dbwrap_watched_wakeup(rec, subrec);
419
420         num_watchers = talloc_array_length(subrec->watchers);
421         if (num_watchers == 0) {
422                 return dbwrap_record_delete(subrec->subrec);
423         }
424
425         subrec->deleted = true;
426
427         return dbwrap_watched_save(subrec, (TDB_DATA) {0}, 0);
428 }
429
430 struct dbwrap_watched_traverse_state {
431         int (*fn)(struct db_record *rec, void *private_data);
432         void *private_data;
433 };
434
435 static int dbwrap_watched_traverse_fn(struct db_record *rec,
436                                       void *private_data)
437 {
438         struct dbwrap_watched_traverse_state *state = private_data;
439         ssize_t num_watchers;
440         struct db_record prec = *rec;
441         bool deleted;
442
443         num_watchers = dbwrap_watched_parse(rec->value, NULL, 0, &deleted,
444                                             &prec.value);
445
446         if ((num_watchers == -1) || deleted) {
447                 return 0;
448         }
449
450         return state->fn(&prec, state->private_data);
451 }
452
453 static int dbwrap_watched_traverse(struct db_context *db,
454                                    int (*fn)(struct db_record *rec,
455                                              void *private_data),
456                                    void *private_data)
457 {
458         struct db_watched_ctx *ctx = talloc_get_type_abort(
459                 db->private_data, struct db_watched_ctx);
460         struct dbwrap_watched_traverse_state state = {
461                 .fn = fn, .private_data = private_data };
462         NTSTATUS status;
463         int ret;
464
465         status = dbwrap_traverse(
466                 ctx->backend, dbwrap_watched_traverse_fn, &state, &ret);
467         if (!NT_STATUS_IS_OK(status)) {
468                 return -1;
469         }
470         return ret;
471 }
472
473 static int dbwrap_watched_traverse_read(struct db_context *db,
474                                         int (*fn)(struct db_record *rec,
475                                                   void *private_data),
476                                         void *private_data)
477 {
478         struct db_watched_ctx *ctx = talloc_get_type_abort(
479                 db->private_data, struct db_watched_ctx);
480         struct dbwrap_watched_traverse_state state = {
481                 .fn = fn, .private_data = private_data };
482         NTSTATUS status;
483         int ret;
484
485         status = dbwrap_traverse_read(
486                 ctx->backend, dbwrap_watched_traverse_fn, &state, &ret);
487         if (!NT_STATUS_IS_OK(status)) {
488                 return -1;
489         }
490         return ret;
491 }
492
493 static int dbwrap_watched_get_seqnum(struct db_context *db)
494 {
495         struct db_watched_ctx *ctx = talloc_get_type_abort(
496                 db->private_data, struct db_watched_ctx);
497         return dbwrap_get_seqnum(ctx->backend);
498 }
499
500 static int dbwrap_watched_transaction_start(struct db_context *db)
501 {
502         struct db_watched_ctx *ctx = talloc_get_type_abort(
503                 db->private_data, struct db_watched_ctx);
504         return dbwrap_transaction_start(ctx->backend);
505 }
506
507 static int dbwrap_watched_transaction_commit(struct db_context *db)
508 {
509         struct db_watched_ctx *ctx = talloc_get_type_abort(
510                 db->private_data, struct db_watched_ctx);
511         return dbwrap_transaction_commit(ctx->backend);
512 }
513
514 static int dbwrap_watched_transaction_cancel(struct db_context *db)
515 {
516         struct db_watched_ctx *ctx = talloc_get_type_abort(
517                 db->private_data, struct db_watched_ctx);
518         return dbwrap_transaction_cancel(ctx->backend);
519 }
520
521 struct dbwrap_watched_parse_record_state {
522         void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data);
523         void *private_data;
524         bool deleted;
525 };
526
527 static void dbwrap_watched_parse_record_parser(TDB_DATA key, TDB_DATA data,
528                                                void *private_data)
529 {
530         struct dbwrap_watched_parse_record_state *state = private_data;
531         ssize_t num_watchers;
532         TDB_DATA userdata;
533
534         num_watchers = dbwrap_watched_parse(data, NULL, 0, &state->deleted,
535                                             &userdata);
536         if (num_watchers == -1) {
537                 state->deleted = true;
538         }
539         if (state->deleted) {
540                 return;
541         }
542         state->parser(key, userdata, state->private_data);
543 }
544
545 static NTSTATUS dbwrap_watched_parse_record(
546         struct db_context *db, TDB_DATA key,
547         void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data),
548         void *private_data)
549 {
550         struct db_watched_ctx *ctx = talloc_get_type_abort(
551                 db->private_data, struct db_watched_ctx);
552         struct dbwrap_watched_parse_record_state state = {
553                 .parser = parser,
554                 .private_data = private_data,
555                 .deleted = false
556         };
557         NTSTATUS status;
558
559         status = dbwrap_parse_record(
560                 ctx->backend, key, dbwrap_watched_parse_record_parser, &state);
561         if (!NT_STATUS_IS_OK(status)) {
562                 return status;
563         }
564         if (state.deleted) {
565                 return NT_STATUS_NOT_FOUND;
566         }
567         return NT_STATUS_OK;
568 }
569
570 static void dbwrap_watched_parse_record_done(struct tevent_req *subreq);
571
572 static struct tevent_req *dbwrap_watched_parse_record_send(
573         TALLOC_CTX *mem_ctx,
574         struct tevent_context *ev,
575         struct db_context *db,
576         TDB_DATA key,
577         void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data),
578         void *private_data,
579         enum dbwrap_req_state *req_state)
580 {
581         struct db_watched_ctx *ctx = talloc_get_type_abort(
582                 db->private_data, struct db_watched_ctx);
583         struct tevent_req *req = NULL;
584         struct tevent_req *subreq = NULL;
585         struct dbwrap_watched_parse_record_state *state = NULL;
586
587         req = tevent_req_create(mem_ctx, &state,
588                                 struct dbwrap_watched_parse_record_state);
589         if (req == NULL) {
590                 *req_state = DBWRAP_REQ_ERROR;
591                 return NULL;
592         }
593
594         *state = (struct dbwrap_watched_parse_record_state) {
595                 .parser = parser,
596                 .private_data = private_data,
597                 .deleted = false,
598         };
599
600         subreq = dbwrap_parse_record_send(state,
601                                           ev,
602                                           ctx->backend,
603                                           key,
604                                           dbwrap_watched_parse_record_parser,
605                                           state,
606                                           req_state);
607         if (tevent_req_nomem(subreq, req)) {
608                 *req_state = DBWRAP_REQ_ERROR;
609                 return tevent_req_post(req, ev);
610         }
611
612         tevent_req_set_callback(subreq, dbwrap_watched_parse_record_done, req);
613         return req;
614 }
615
616 static void dbwrap_watched_parse_record_done(struct tevent_req *subreq)
617 {
618         struct tevent_req *req = tevent_req_callback_data(
619                 subreq, struct tevent_req);
620         struct dbwrap_watched_parse_record_state *state = tevent_req_data(
621                 req, struct dbwrap_watched_parse_record_state);
622         NTSTATUS status;
623
624         status = dbwrap_parse_record_recv(subreq);
625         TALLOC_FREE(subreq);
626         if (tevent_req_nterror(req, status)) {
627                 return;
628         }
629
630         if (state->deleted) {
631                 tevent_req_nterror(req, NT_STATUS_NOT_FOUND);
632                 return;
633         }
634
635         tevent_req_done(req);
636         return;
637 }
638
639 static NTSTATUS dbwrap_watched_parse_record_recv(struct tevent_req *req)
640 {
641         NTSTATUS status;
642
643         if (tevent_req_is_nterror(req, &status)) {
644                 tevent_req_received(req);
645                 return status;
646         }
647
648         tevent_req_received(req);
649         return NT_STATUS_OK;
650 }
651
652 static int dbwrap_watched_exists(struct db_context *db, TDB_DATA key)
653 {
654         struct db_watched_ctx *ctx = talloc_get_type_abort(
655                 db->private_data, struct db_watched_ctx);
656
657         return dbwrap_exists(ctx->backend, key);
658 }
659
660 static size_t dbwrap_watched_id(struct db_context *db, uint8_t *id,
661                                 size_t idlen)
662 {
663         struct db_watched_ctx *ctx = talloc_get_type_abort(
664                 db->private_data, struct db_watched_ctx);
665
666         return dbwrap_db_id(ctx->backend, id, idlen);
667 }
668
669 struct db_context *db_open_watched(TALLOC_CTX *mem_ctx,
670                                    struct db_context *backend,
671                                    struct messaging_context *msg)
672 {
673         struct db_context *db;
674         struct db_watched_ctx *ctx;
675
676         db = talloc_zero(mem_ctx, struct db_context);
677         if (db == NULL) {
678                 return NULL;
679         }
680         ctx = talloc_zero(db, struct db_watched_ctx);
681         if (ctx == NULL) {
682                 TALLOC_FREE(db);
683                 return NULL;
684         }
685         db->private_data = ctx;
686
687         ctx->msg = msg;
688
689         db->lock_order = backend->lock_order;
690         backend->lock_order = DBWRAP_LOCK_ORDER_NONE;
691         ctx->backend = talloc_move(ctx, &backend);
692
693         db->fetch_locked = dbwrap_watched_fetch_locked;
694         db->traverse = dbwrap_watched_traverse;
695         db->traverse_read = dbwrap_watched_traverse_read;
696         db->get_seqnum = dbwrap_watched_get_seqnum;
697         db->transaction_start = dbwrap_watched_transaction_start;
698         db->transaction_commit = dbwrap_watched_transaction_commit;
699         db->transaction_cancel = dbwrap_watched_transaction_cancel;
700         db->parse_record = dbwrap_watched_parse_record;
701         db->parse_record_send = dbwrap_watched_parse_record_send;
702         db->parse_record_recv = dbwrap_watched_parse_record_recv;
703         db->exists = dbwrap_watched_exists;
704         db->id = dbwrap_watched_id;
705         db->name = dbwrap_name(ctx->backend);
706
707         return db;
708 }
709
710 struct dbwrap_watched_watch_state {
711         struct db_context *db;
712         struct server_id me;
713         TDB_DATA w_key;
714         struct server_id blocker;
715         bool blockerdead;
716 };
717
718 static bool dbwrap_watched_msg_filter(struct messaging_rec *rec,
719                                       void *private_data);
720 static void dbwrap_watched_watch_done(struct tevent_req *subreq);
721 static void dbwrap_watched_watch_blocker_died(struct tevent_req *subreq);
722 static int dbwrap_watched_watch_state_destructor(
723         struct dbwrap_watched_watch_state *state);
724
725 struct tevent_req *dbwrap_watched_watch_send(TALLOC_CTX *mem_ctx,
726                                              struct tevent_context *ev,
727                                              struct db_record *rec,
728                                              struct server_id blocker)
729 {
730         struct db_watched_subrec *subrec = talloc_get_type_abort(
731                 rec->private_data, struct db_watched_subrec);
732         struct db_context *db = dbwrap_record_get_db(rec);
733         struct db_watched_ctx *ctx = talloc_get_type_abort(
734                 db->private_data, struct db_watched_ctx);
735
736         struct tevent_req *req, *subreq;
737         struct dbwrap_watched_watch_state *state;
738         ssize_t needed;
739         size_t num_watchers;
740         struct server_id *tmp;
741         NTSTATUS status;
742
743         req = tevent_req_create(mem_ctx, &state,
744                                 struct dbwrap_watched_watch_state);
745         if (req == NULL) {
746                 return NULL;
747         }
748         state->db = db;
749         state->blocker = blocker;
750
751         if (ctx->msg == NULL) {
752                 tevent_req_nterror(req, NT_STATUS_NOT_SUPPORTED);
753                 return tevent_req_post(req, ev);
754         }
755
756         state->me = messaging_server_id(ctx->msg);
757
758         needed = dbwrap_record_watchers_key(db, rec, NULL, 0);
759         if (needed == -1) {
760                 tevent_req_nterror(req, NT_STATUS_INSUFFICIENT_RESOURCES);
761                 return tevent_req_post(req, ev);
762         }
763         state->w_key.dsize = needed;
764
765         state->w_key.dptr = talloc_array(state, uint8_t, state->w_key.dsize);
766         if (tevent_req_nomem(state->w_key.dptr, req)) {
767                 return tevent_req_post(req, ev);
768         }
769         dbwrap_record_watchers_key(db, rec, state->w_key.dptr,
770                                    state->w_key.dsize);
771
772         subreq = messaging_filtered_read_send(
773                 state, ev, ctx->msg, dbwrap_watched_msg_filter, state);
774         if (tevent_req_nomem(subreq, req)) {
775                 return tevent_req_post(req, ev);
776         }
777         tevent_req_set_callback(subreq, dbwrap_watched_watch_done, req);
778
779         num_watchers = talloc_array_length(subrec->watchers);
780
781         tmp = talloc_realloc(subrec, subrec->watchers, struct server_id,
782                              num_watchers + 1);
783         if (tevent_req_nomem(tmp, req)) {
784                 return tevent_req_post(req, ev);
785         }
786         subrec->watchers = tmp;
787         subrec->watchers[num_watchers] = state->me;
788
789         status = dbwrap_watched_save(subrec, rec->value, 0);
790         if (tevent_req_nterror(req, status)) {
791                 return tevent_req_post(req, ev);
792         }
793
794         talloc_set_destructor(state, dbwrap_watched_watch_state_destructor);
795
796         if (blocker.pid != 0) {
797                 subreq = server_id_watch_send(state, ev, ctx->msg, blocker);
798                 if (tevent_req_nomem(subreq, req)) {
799                         return tevent_req_post(req, ev);
800                 }
801                 tevent_req_set_callback(
802                         subreq, dbwrap_watched_watch_blocker_died, req);
803         }
804
805         return req;
806 }
807
808 static void dbwrap_watched_watch_blocker_died(struct tevent_req *subreq)
809 {
810         struct tevent_req *req = tevent_req_callback_data(
811                 subreq, struct tevent_req);
812         struct dbwrap_watched_watch_state *state = tevent_req_data(
813                 req, struct dbwrap_watched_watch_state);
814         int ret;
815
816         ret = server_id_watch_recv(subreq, NULL);
817         TALLOC_FREE(subreq);
818         if (ret != 0) {
819                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
820                 return;
821         }
822         state->blockerdead = true;
823         tevent_req_done(req);
824 }
825
826 static bool dbwrap_watched_remove_waiter(struct db_watched_subrec *subrec,
827                                          struct server_id id)
828 {
829         size_t i, num_watchers;
830
831         num_watchers = talloc_array_length(subrec->watchers);
832
833         for (i=0; i<num_watchers; i++) {
834                 if (server_id_equal(&id, &subrec->watchers[i])) {
835                         break;
836                 }
837         }
838
839         if (i == num_watchers) {
840                 struct server_id_buf buf;
841                 DBG_WARNING("Did not find %s in state->watchers\n",
842                             server_id_str_buf(id, &buf));
843                 return false;
844         }
845
846         subrec->watchers[i] = subrec->watchers[num_watchers-1];
847         subrec->watchers = talloc_realloc(subrec, subrec->watchers,
848                                           struct server_id, num_watchers-1);
849
850         return true;
851 }
852
853 static int dbwrap_watched_watch_state_destructor(
854         struct dbwrap_watched_watch_state *state)
855 {
856         struct db_record *rec;
857         struct db_watched_subrec *subrec;
858         TDB_DATA key;
859         bool ok;
860
861         ok = dbwrap_record_watchers_key_parse(state->w_key, NULL, NULL, &key);
862         if (!ok) {
863                 DBG_WARNING("dbwrap_record_watchers_key_parse failed\n");
864                 return 0;
865         }
866
867         rec = dbwrap_fetch_locked(state->db, state, key);
868         if (rec == NULL) {
869                 DBG_WARNING("dbwrap_fetch_locked failed\n");
870                 return 0;
871         }
872
873         subrec = talloc_get_type_abort(
874                 rec->private_data, struct db_watched_subrec);
875
876         ok = dbwrap_watched_remove_waiter(subrec, state->me);
877         if (ok) {
878                 NTSTATUS status;
879                 status = dbwrap_watched_save(subrec, rec->value, 0);
880                 if (!NT_STATUS_IS_OK(status)) {
881                         DBG_WARNING("dbwrap_watched_save failed: %s\n",
882                                     nt_errstr(status));
883                 }
884         }
885
886         TALLOC_FREE(rec);
887         return 0;
888 }
889
890 static bool dbwrap_watched_msg_filter(struct messaging_rec *rec,
891                                       void *private_data)
892 {
893         struct dbwrap_watched_watch_state *state = talloc_get_type_abort(
894                 private_data, struct dbwrap_watched_watch_state);
895         int cmp;
896
897         if (rec->msg_type != MSG_DBWRAP_MODIFIED) {
898                 return false;
899         }
900         if (rec->num_fds != 0) {
901                 return false;
902         }
903         if (rec->buf.length != state->w_key.dsize) {
904                 return false;
905         }
906
907         cmp = memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length);
908
909         return (cmp == 0);
910 }
911
912 static void dbwrap_watched_watch_done(struct tevent_req *subreq)
913 {
914         struct tevent_req *req = tevent_req_callback_data(
915                 subreq, struct tevent_req);
916         struct messaging_rec *rec;
917         int ret;
918
919         ret = messaging_filtered_read_recv(subreq, talloc_tos(), &rec);
920         TALLOC_FREE(subreq);
921         if (ret != 0) {
922                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
923                 return;
924         }
925         tevent_req_done(req);
926 }
927
928 NTSTATUS dbwrap_watched_watch_recv(struct tevent_req *req,
929                                    TALLOC_CTX *mem_ctx,
930                                    struct db_record **prec,
931                                    bool *blockerdead,
932                                    struct server_id *blocker)
933 {
934         struct dbwrap_watched_watch_state *state = tevent_req_data(
935                 req, struct dbwrap_watched_watch_state);
936         struct db_watched_subrec *subrec;
937         NTSTATUS status;
938         TDB_DATA key;
939         struct db_record *rec;
940         bool ok;
941
942         if (tevent_req_is_nterror(req, &status)) {
943                 return status;
944         }
945         if (blockerdead != NULL) {
946                 *blockerdead = state->blockerdead;
947         }
948         if (blocker != NULL) {
949                 *blocker = state->blocker;
950         }
951         if (prec == NULL) {
952                 return NT_STATUS_OK;
953         }
954
955         ok = dbwrap_record_watchers_key_parse(state->w_key, NULL, NULL, &key);
956         if (!ok) {
957                 return NT_STATUS_INTERNAL_DB_ERROR;
958         }
959
960         rec = dbwrap_fetch_locked(state->db, mem_ctx, key);
961         if (rec == NULL) {
962                 return NT_STATUS_INTERNAL_DB_ERROR;
963         }
964
965         talloc_set_destructor(state, NULL);
966
967         subrec = talloc_get_type_abort(
968                 rec->private_data, struct db_watched_subrec);
969
970         ok = dbwrap_watched_remove_waiter(subrec, state->me);
971         if (ok) {
972                 status = dbwrap_watched_save(subrec, rec->value, 0);
973                 if (!NT_STATUS_IS_OK(status)) {
974                         DBG_WARNING("dbwrap_watched_save failed: %s\n",
975                                     nt_errstr(status));
976                 }
977         }
978
979         *prec = rec;
980         return NT_STATUS_OK;
981 }