s3:netapi: Remove unused variables
[samba.git] / source3 / lib / g_lock.c
1 /*
2    Unix SMB/CIFS implementation.
3    global locks based on dbwrap and messaging
4    Copyright (C) 2009 by Volker Lendecke
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "lib/util/server_id.h"
23 #include "lib/util/debug.h"
24 #include "lib/util/talloc_stack.h"
25 #include "lib/util/samba_util.h"
26 #include "lib/util_path.h"
27 #include "dbwrap/dbwrap.h"
28 #include "dbwrap/dbwrap_open.h"
29 #include "dbwrap/dbwrap_watch.h"
30 #include "g_lock.h"
31 #include "util_tdb.h"
32 #include "../lib/util/tevent_ntstatus.h"
33 #include "messages.h"
34 #include "serverid.h"
35
36 struct g_lock_ctx {
37         struct db_context *db;
38         struct messaging_context *msg;
39         enum dbwrap_lock_order lock_order;
40         bool busy;
41 };
42
43 struct g_lock {
44         struct server_id exclusive;
45         size_t num_shared;
46         uint8_t *shared;
47         uint64_t unique_lock_epoch;
48         uint64_t unique_data_epoch;
49         size_t datalen;
50         uint8_t *data;
51 };
52
53 static bool g_lock_parse(uint8_t *buf, size_t buflen, struct g_lock *lck)
54 {
55         struct server_id exclusive;
56         size_t num_shared, shared_len;
57         uint64_t unique_lock_epoch;
58         uint64_t unique_data_epoch;
59
60         if (buflen < (SERVER_ID_BUF_LENGTH + /* exclusive */
61                       sizeof(uint64_t) +     /* seqnum */
62                       sizeof(uint32_t))) {   /* num_shared */
63                 struct g_lock ret = {
64                         .exclusive.pid = 0,
65                         .unique_lock_epoch = generate_unique_u64(0),
66                         .unique_data_epoch = generate_unique_u64(0),
67                 };
68                 *lck = ret;
69                 return true;
70         }
71
72         server_id_get(&exclusive, buf);
73         buf += SERVER_ID_BUF_LENGTH;
74         buflen -= SERVER_ID_BUF_LENGTH;
75
76         unique_lock_epoch = BVAL(buf, 0);
77         buf += sizeof(uint64_t);
78         buflen -= sizeof(uint64_t);
79
80         unique_data_epoch = BVAL(buf, 0);
81         buf += sizeof(uint64_t);
82         buflen -= sizeof(uint64_t);
83
84         num_shared = IVAL(buf, 0);
85         buf += sizeof(uint32_t);
86         buflen -= sizeof(uint32_t);
87
88         if (num_shared > buflen/SERVER_ID_BUF_LENGTH) {
89                 DBG_DEBUG("num_shared=%zu, buflen=%zu\n",
90                           num_shared,
91                           buflen);
92                 return false;
93         }
94
95         shared_len = num_shared * SERVER_ID_BUF_LENGTH;
96
97         *lck = (struct g_lock) {
98                 .exclusive = exclusive,
99                 .num_shared = num_shared,
100                 .shared = buf,
101                 .unique_lock_epoch = unique_lock_epoch,
102                 .unique_data_epoch = unique_data_epoch,
103                 .datalen = buflen-shared_len,
104                 .data = buf+shared_len,
105         };
106
107         return true;
108 }
109
110 static void g_lock_get_shared(const struct g_lock *lck,
111                               size_t i,
112                               struct server_id *shared)
113 {
114         if (i >= lck->num_shared) {
115                 abort();
116         }
117         server_id_get(shared, lck->shared + i*SERVER_ID_BUF_LENGTH);
118 }
119
120 static void g_lock_del_shared(struct g_lock *lck, size_t i)
121 {
122         if (i >= lck->num_shared) {
123                 abort();
124         }
125         lck->num_shared -= 1;
126         if (i < lck->num_shared) {
127                 memcpy(lck->shared + i*SERVER_ID_BUF_LENGTH,
128                        lck->shared + lck->num_shared*SERVER_ID_BUF_LENGTH,
129                        SERVER_ID_BUF_LENGTH);
130         }
131 }
132
133 static NTSTATUS g_lock_store(
134         struct db_record *rec,
135         struct g_lock *lck,
136         struct server_id *new_shared,
137         const TDB_DATA *new_dbufs,
138         size_t num_new_dbufs)
139 {
140         uint8_t exclusive[SERVER_ID_BUF_LENGTH];
141         uint8_t seqnum_buf[sizeof(uint64_t)*2];
142         uint8_t sizebuf[sizeof(uint32_t)];
143         uint8_t new_shared_buf[SERVER_ID_BUF_LENGTH];
144
145         struct TDB_DATA dbufs[6 + num_new_dbufs];
146
147         dbufs[0] = (TDB_DATA) {
148                 .dptr = exclusive, .dsize = sizeof(exclusive),
149         };
150         dbufs[1] = (TDB_DATA) {
151                 .dptr = seqnum_buf, .dsize = sizeof(seqnum_buf),
152         };
153         dbufs[2] = (TDB_DATA) {
154                 .dptr = sizebuf, .dsize = sizeof(sizebuf),
155         };
156         dbufs[3] = (TDB_DATA) {
157                 .dptr = lck->shared,
158                 .dsize = lck->num_shared * SERVER_ID_BUF_LENGTH,
159         };
160         dbufs[4] = (TDB_DATA) { 0 };
161         dbufs[5] = (TDB_DATA) {
162                 .dptr = lck->data, .dsize = lck->datalen,
163         };
164
165         if (num_new_dbufs != 0) {
166                 memcpy(&dbufs[6],
167                        new_dbufs,
168                        num_new_dbufs * sizeof(TDB_DATA));
169         }
170
171         server_id_put(exclusive, lck->exclusive);
172         SBVAL(seqnum_buf, 0, lck->unique_lock_epoch);
173         SBVAL(seqnum_buf, 8, lck->unique_data_epoch);
174
175         if (new_shared != NULL) {
176                 if (lck->num_shared >= UINT32_MAX) {
177                         return NT_STATUS_BUFFER_OVERFLOW;
178                 }
179
180                 server_id_put(new_shared_buf, *new_shared);
181
182                 dbufs[4] = (TDB_DATA) {
183                         .dptr = new_shared_buf,
184                         .dsize = sizeof(new_shared_buf),
185                 };
186
187                 lck->num_shared += 1;
188         }
189
190         SIVAL(sizebuf, 0, lck->num_shared);
191
192         return dbwrap_record_storev(rec, dbufs, ARRAY_SIZE(dbufs), 0);
193 }
194
195 struct g_lock_ctx *g_lock_ctx_init_backend(
196         TALLOC_CTX *mem_ctx,
197         struct messaging_context *msg,
198         struct db_context **backend)
199 {
200         struct g_lock_ctx *result;
201
202         result = talloc_zero(mem_ctx, struct g_lock_ctx);
203         if (result == NULL) {
204                 return NULL;
205         }
206         result->msg = msg;
207         result->lock_order = DBWRAP_LOCK_ORDER_NONE;
208
209         result->db = db_open_watched(result, backend, msg);
210         if (result->db == NULL) {
211                 DBG_WARNING("db_open_watched failed\n");
212                 TALLOC_FREE(result);
213                 return NULL;
214         }
215         return result;
216 }
217
218 void g_lock_set_lock_order(struct g_lock_ctx *ctx,
219                            enum dbwrap_lock_order lock_order)
220 {
221         ctx->lock_order = lock_order;
222 }
223
224 struct g_lock_ctx *g_lock_ctx_init(TALLOC_CTX *mem_ctx,
225                                    struct messaging_context *msg)
226 {
227         char *db_path = NULL;
228         struct db_context *backend = NULL;
229         struct g_lock_ctx *ctx = NULL;
230
231         db_path = lock_path(mem_ctx, "g_lock.tdb");
232         if (db_path == NULL) {
233                 return NULL;
234         }
235
236         backend = db_open(
237                 mem_ctx,
238                 db_path,
239                 0,
240                 TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH|TDB_VOLATILE,
241                 O_RDWR|O_CREAT,
242                 0600,
243                 DBWRAP_LOCK_ORDER_3,
244                 DBWRAP_FLAG_NONE);
245         TALLOC_FREE(db_path);
246         if (backend == NULL) {
247                 DBG_WARNING("Could not open g_lock.tdb\n");
248                 return NULL;
249         }
250
251         ctx = g_lock_ctx_init_backend(mem_ctx, msg, &backend);
252         return ctx;
253 }
254
255 static void g_lock_cleanup_dead(
256         struct g_lock *lck,
257         struct server_id *dead_blocker)
258 {
259         bool exclusive_died;
260         struct server_id_buf tmp;
261
262         if (dead_blocker == NULL) {
263                 return;
264         }
265
266         exclusive_died = server_id_equal(dead_blocker, &lck->exclusive);
267
268         if (exclusive_died) {
269                 DBG_DEBUG("Exclusive holder %s died\n",
270                           server_id_str_buf(lck->exclusive, &tmp));
271                 lck->exclusive.pid = 0;
272         }
273
274         if (lck->num_shared != 0) {
275                 bool shared_died;
276                 struct server_id shared;
277
278                 g_lock_get_shared(lck, 0, &shared);
279                 shared_died = server_id_equal(dead_blocker, &shared);
280
281                 if (shared_died) {
282                         DBG_DEBUG("Shared holder %s died\n",
283                                   server_id_str_buf(shared, &tmp));
284                         g_lock_del_shared(lck, 0);
285                 }
286         }
287 }
288
289 static ssize_t g_lock_find_shared(
290         struct g_lock *lck,
291         const struct server_id *self)
292 {
293         size_t i;
294
295         for (i=0; i<lck->num_shared; i++) {
296                 struct server_id shared;
297                 bool same;
298
299                 g_lock_get_shared(lck, i, &shared);
300
301                 same = server_id_equal(self, &shared);
302                 if (same) {
303                         return i;
304                 }
305         }
306
307         return -1;
308 }
309
310 static void g_lock_cleanup_shared(struct g_lock *lck)
311 {
312         size_t i;
313         struct server_id check;
314         bool exists;
315
316         if (lck->num_shared == 0) {
317                 return;
318         }
319
320         /*
321          * Read locks can stay around forever if the process dies. Do
322          * a heuristic check for process existence: Check one random
323          * process for existence. Hopefully this will keep runaway
324          * read locks under control.
325          */
326         i = generate_random() % lck->num_shared;
327         g_lock_get_shared(lck, i, &check);
328
329         exists = serverid_exists(&check);
330         if (!exists) {
331                 struct server_id_buf tmp;
332                 DBG_DEBUG("Shared locker %s died -- removing\n",
333                           server_id_str_buf(check, &tmp));
334                 g_lock_del_shared(lck, i);
335         }
336 }
337
338 struct g_lock_lock_cb_state {
339         struct g_lock_ctx *ctx;
340         struct db_record *rec;
341         struct g_lock *lck;
342         struct server_id *new_shared;
343         g_lock_lock_cb_fn_t cb_fn;
344         void *cb_private;
345         TALLOC_CTX *update_mem_ctx;
346         TDB_DATA updated_data;
347         bool existed;
348         bool modified;
349         bool unlock;
350 };
351
352 NTSTATUS g_lock_lock_cb_dump(struct g_lock_lock_cb_state *cb_state,
353                              void (*fn)(struct server_id exclusive,
354                                         size_t num_shared,
355                                         const struct server_id *shared,
356                                         const uint8_t *data,
357                                         size_t datalen,
358                                         void *private_data),
359                              void *private_data)
360 {
361         struct g_lock *lck = cb_state->lck;
362
363         /* We allow a cn_fn only for G_LOCK_WRITE for now... */
364         SMB_ASSERT(lck->num_shared == 0);
365
366         fn(lck->exclusive,
367            0, /* num_shared */
368            NULL, /* shared */
369            lck->data,
370            lck->datalen,
371            private_data);
372
373         return NT_STATUS_OK;
374 }
375
376 NTSTATUS g_lock_lock_cb_writev(struct g_lock_lock_cb_state *cb_state,
377                                const TDB_DATA *dbufs,
378                                size_t num_dbufs)
379 {
380         NTSTATUS status;
381
382         status = dbwrap_merge_dbufs(&cb_state->updated_data,
383                                     cb_state->update_mem_ctx,
384                                     dbufs, num_dbufs);
385         if (!NT_STATUS_IS_OK(status)) {
386                 return status;
387         }
388
389         cb_state->modified = true;
390         cb_state->lck->data = cb_state->updated_data.dptr;
391         cb_state->lck->datalen = cb_state->updated_data.dsize;
392
393         return NT_STATUS_OK;
394 }
395
396 void g_lock_lock_cb_unlock(struct g_lock_lock_cb_state *cb_state)
397 {
398         cb_state->unlock = true;
399 }
400
401 struct g_lock_lock_cb_watch_data_state {
402         struct tevent_context *ev;
403         struct g_lock_ctx *ctx;
404         TDB_DATA key;
405         struct server_id blocker;
406         bool blockerdead;
407         uint64_t unique_lock_epoch;
408         uint64_t unique_data_epoch;
409         uint64_t watch_instance;
410         NTSTATUS status;
411 };
412
413 static void g_lock_lock_cb_watch_data_done(struct tevent_req *subreq);
414
415 struct tevent_req *g_lock_lock_cb_watch_data_send(
416         TALLOC_CTX *mem_ctx,
417         struct tevent_context *ev,
418         struct g_lock_lock_cb_state *cb_state,
419         struct server_id blocker)
420 {
421         struct tevent_req *req = NULL;
422         struct g_lock_lock_cb_watch_data_state *state = NULL;
423         struct tevent_req *subreq = NULL;
424         TDB_DATA key = dbwrap_record_get_key(cb_state->rec);
425
426         req = tevent_req_create(
427                 mem_ctx, &state, struct g_lock_lock_cb_watch_data_state);
428         if (req == NULL) {
429                 return NULL;
430         }
431         state->ev = ev;
432         state->ctx = cb_state->ctx;
433         state->blocker = blocker;
434
435         state->key = tdb_data_talloc_copy(state, key);
436         if (tevent_req_nomem(state->key.dptr, req)) {
437                 return tevent_req_post(req, ev);
438         }
439
440         state->unique_lock_epoch = cb_state->lck->unique_lock_epoch;
441         state->unique_data_epoch = cb_state->lck->unique_data_epoch;
442
443         DBG_DEBUG("state->unique_data_epoch=%"PRIu64"\n", state->unique_data_epoch);
444
445         subreq = dbwrap_watched_watch_send(
446                 state, state->ev, cb_state->rec, 0, state->blocker);
447         if (tevent_req_nomem(subreq, req)) {
448                 return tevent_req_post(req, ev);
449         }
450         tevent_req_set_callback(subreq, g_lock_lock_cb_watch_data_done, req);
451
452         return req;
453 }
454
455 static void g_lock_lock_cb_watch_data_done_fn(
456         struct db_record *rec,
457         TDB_DATA value,
458         void *private_data)
459 {
460         struct tevent_req *req = talloc_get_type_abort(
461                 private_data, struct tevent_req);
462         struct g_lock_lock_cb_watch_data_state *state = tevent_req_data(
463                 req, struct g_lock_lock_cb_watch_data_state);
464         struct tevent_req *subreq = NULL;
465         struct g_lock lck;
466         bool ok;
467
468         ok = g_lock_parse(value.dptr, value.dsize, &lck);
469         if (!ok) {
470                 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
471                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
472                 return;
473         }
474
475         if (lck.unique_data_epoch != state->unique_data_epoch) {
476                 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
477                 DBG_DEBUG("lck.unique_data_epoch=%"PRIu64", "
478                           "state->unique_data_epoch=%"PRIu64"\n",
479                           lck.unique_data_epoch,
480                           state->unique_data_epoch);
481                 state->status = NT_STATUS_OK;
482                 return;
483         }
484
485         /*
486          * The lock epoch changed, so we better
487          * remove ourself from the waiter list
488          * (most likely the first position)
489          * and re-add us at the end of the list.
490          *
491          * This gives other lock waiters a change
492          * to make progress.
493          *
494          * Otherwise we'll keep our waiter instance alive,
495          * keep waiting (most likely at first position).
496          */
497         if (lck.unique_lock_epoch != state->unique_lock_epoch) {
498                 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
499                 state->watch_instance = dbwrap_watched_watch_add_instance(rec);
500                 state->unique_lock_epoch = lck.unique_lock_epoch;
501         }
502
503         subreq = dbwrap_watched_watch_send(
504                 state, state->ev, rec, state->watch_instance, state->blocker);
505         if (subreq == NULL) {
506                 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
507                 state->status = NT_STATUS_NO_MEMORY;
508                 return;
509         }
510         tevent_req_set_callback(subreq, g_lock_lock_cb_watch_data_done, req);
511
512         state->status = NT_STATUS_EVENT_PENDING;
513 }
514
515 static void g_lock_lock_cb_watch_data_done(struct tevent_req *subreq)
516 {
517         struct tevent_req *req = tevent_req_callback_data(
518                 subreq, struct tevent_req);
519         struct g_lock_lock_cb_watch_data_state *state = tevent_req_data(
520                 req, struct g_lock_lock_cb_watch_data_state);
521         NTSTATUS status;
522         uint64_t instance = 0;
523
524         status = dbwrap_watched_watch_recv(
525                 subreq, &instance, &state->blockerdead, &state->blocker);
526         TALLOC_FREE(subreq);
527         if (tevent_req_nterror(req, status)) {
528                 DBG_DEBUG("dbwrap_watched_watch_recv returned %s\n",
529                           nt_errstr(status));
530                 return;
531         }
532
533         state->watch_instance = instance;
534
535         status = dbwrap_do_locked(
536                 state->ctx->db, state->key, g_lock_lock_cb_watch_data_done_fn, req);
537         if (tevent_req_nterror(req, status)) {
538                 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
539                 return;
540         }
541         if (NT_STATUS_EQUAL(state->status, NT_STATUS_EVENT_PENDING)) {
542                 return;
543         }
544         if (tevent_req_nterror(req, state->status)) {
545                 return;
546         }
547         tevent_req_done(req);
548 }
549
550 NTSTATUS g_lock_lock_cb_watch_data_recv(
551         struct tevent_req *req,
552         bool *blockerdead,
553         struct server_id *blocker)
554 {
555         struct g_lock_lock_cb_watch_data_state *state = tevent_req_data(
556                 req, struct g_lock_lock_cb_watch_data_state);
557         NTSTATUS status;
558
559         if (tevent_req_is_nterror(req, &status)) {
560                 return status;
561         }
562         if (blockerdead != NULL) {
563                 *blockerdead = state->blockerdead;
564         }
565         if (blocker != NULL) {
566                 *blocker = state->blocker;
567         }
568
569         return NT_STATUS_OK;
570 }
571
572 void g_lock_lock_cb_wake_watchers(struct g_lock_lock_cb_state *cb_state)
573 {
574         struct g_lock *lck = cb_state->lck;
575
576         lck->unique_data_epoch = generate_unique_u64(lck->unique_data_epoch);
577         cb_state->modified = true;
578 }
579
580 static NTSTATUS g_lock_lock_cb_run_and_store(struct g_lock_lock_cb_state *cb_state)
581 {
582         struct g_lock *lck = cb_state->lck;
583         NTSTATUS success_status = NT_STATUS_OK;
584         NTSTATUS status;
585
586         if (cb_state->cb_fn != NULL) {
587
588                 SMB_ASSERT(lck->num_shared == 0);
589                 SMB_ASSERT(cb_state->new_shared == NULL);
590
591                 if (cb_state->ctx->lock_order != DBWRAP_LOCK_ORDER_NONE) {
592                         const char *name = dbwrap_name(cb_state->ctx->db);
593                         dbwrap_lock_order_lock(name, cb_state->ctx->lock_order);
594                 }
595
596                 cb_state->ctx->busy = true;
597                 cb_state->cb_fn(cb_state, cb_state->cb_private);
598                 cb_state->ctx->busy = false;
599
600                 if (cb_state->ctx->lock_order != DBWRAP_LOCK_ORDER_NONE) {
601                         const char *name = dbwrap_name(cb_state->ctx->db);
602                         dbwrap_lock_order_unlock(name, cb_state->ctx->lock_order);
603                 }
604         }
605
606         if (cb_state->unlock) {
607                 /*
608                  * Unlocked should wake up watchers.
609                  *
610                  * We no longer need the lock, so
611                  * force a wakeup of the next watchers,
612                  * even if we don't do any update.
613                  */
614                 dbwrap_watched_watch_reset_alerting(cb_state->rec);
615                 dbwrap_watched_watch_force_alerting(cb_state->rec);
616                 if (!cb_state->modified) {
617                         /*
618                          * The record was not changed at
619                          * all, so we can also avoid
620                          * storing the lck.unique_lock_epoch
621                          * change
622                          */
623                         return NT_STATUS_WAS_UNLOCKED;
624                 }
625                 lck->exclusive = (struct server_id) { .pid = 0 };
626                 cb_state->new_shared = NULL;
627
628                 if (lck->datalen == 0) {
629                         if (!cb_state->existed) {
630                                 return NT_STATUS_WAS_UNLOCKED;
631                         }
632
633                         status = dbwrap_record_delete(cb_state->rec);
634                         if (!NT_STATUS_IS_OK(status)) {
635                                 DBG_WARNING("dbwrap_record_delete() failed: %s\n",
636                                     nt_errstr(status));
637                                 return status;
638                         }
639                         return NT_STATUS_WAS_UNLOCKED;
640                 }
641
642                 success_status = NT_STATUS_WAS_UNLOCKED;
643         }
644
645         status = g_lock_store(cb_state->rec,
646                               cb_state->lck,
647                               cb_state->new_shared,
648                               NULL, 0);
649         if (!NT_STATUS_IS_OK(status)) {
650                 DBG_WARNING("g_lock_store() failed: %s\n",
651                             nt_errstr(status));
652                 return status;
653         }
654
655         return success_status;
656 }
657
658 struct g_lock_lock_state {
659         struct tevent_context *ev;
660         struct g_lock_ctx *ctx;
661         TDB_DATA key;
662         enum g_lock_type type;
663         bool retry;
664         g_lock_lock_cb_fn_t cb_fn;
665         void *cb_private;
666 };
667
668 struct g_lock_lock_fn_state {
669         struct g_lock_lock_state *req_state;
670         struct server_id *dead_blocker;
671
672         struct tevent_req *watch_req;
673         uint64_t watch_instance;
674         NTSTATUS status;
675 };
676
677 static int g_lock_lock_state_destructor(struct g_lock_lock_state *s);
678
679 static NTSTATUS g_lock_trylock(
680         struct db_record *rec,
681         struct g_lock_lock_fn_state *state,
682         TDB_DATA data,
683         struct server_id *blocker)
684 {
685         struct g_lock_lock_state *req_state = state->req_state;
686         struct server_id self = messaging_server_id(req_state->ctx->msg);
687         enum g_lock_type type = req_state->type;
688         bool retry = req_state->retry;
689         struct g_lock lck = { .exclusive.pid = 0 };
690         struct g_lock_lock_cb_state cb_state = {
691                 .ctx = req_state->ctx,
692                 .rec = rec,
693                 .lck = &lck,
694                 .cb_fn = req_state->cb_fn,
695                 .cb_private = req_state->cb_private,
696                 .existed = data.dsize != 0,
697                 .update_mem_ctx = talloc_tos(),
698         };
699         struct server_id_buf tmp;
700         NTSTATUS status;
701         bool ok;
702
703         ok = g_lock_parse(data.dptr, data.dsize, &lck);
704         if (!ok) {
705                 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
706                 DBG_DEBUG("g_lock_parse failed\n");
707                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
708         }
709
710         g_lock_cleanup_dead(&lck, state->dead_blocker);
711
712         lck.unique_lock_epoch = generate_unique_u64(lck.unique_lock_epoch);
713
714         if (lck.exclusive.pid != 0) {
715                 bool self_exclusive = server_id_equal(&self, &lck.exclusive);
716
717                 if (!self_exclusive) {
718                         bool exists = serverid_exists(&lck.exclusive);
719                         if (!exists) {
720                                 lck.exclusive = (struct server_id) { .pid=0 };
721                                 goto noexclusive;
722                         }
723
724                         DBG_DEBUG("%s has an exclusive lock\n",
725                                   server_id_str_buf(lck.exclusive, &tmp));
726
727                         if (type == G_LOCK_DOWNGRADE) {
728                                 struct server_id_buf tmp2;
729
730                                 dbwrap_watched_watch_remove_instance(rec,
731                                                 state->watch_instance);
732
733                                 DBG_DEBUG("%s: Trying to downgrade %s\n",
734                                           server_id_str_buf(self, &tmp),
735                                           server_id_str_buf(
736                                                   lck.exclusive, &tmp2));
737                                 return NT_STATUS_NOT_LOCKED;
738                         }
739
740                         if (type == G_LOCK_UPGRADE) {
741                                 ssize_t shared_idx;
742
743                                 dbwrap_watched_watch_remove_instance(rec,
744                                                 state->watch_instance);
745
746                                 shared_idx = g_lock_find_shared(&lck, &self);
747
748                                 if (shared_idx == -1) {
749                                         DBG_DEBUG("Trying to upgrade %s "
750                                                   "without "
751                                                   "existing shared lock\n",
752                                                   server_id_str_buf(
753                                                           self, &tmp));
754                                         return NT_STATUS_NOT_LOCKED;
755                                 }
756
757                                 /*
758                                  * We're trying to upgrade, and the
759                                  * exlusive lock is taken by someone
760                                  * else. This means that someone else
761                                  * is waiting for us to give up our
762                                  * shared lock. If we now also wait
763                                  * for someone to give their shared
764                                  * lock, we will deadlock.
765                                  */
766
767                                 DBG_DEBUG("Trying to upgrade %s while "
768                                           "someone else is also "
769                                           "trying to upgrade\n",
770                                           server_id_str_buf(self, &tmp));
771                                 return NT_STATUS_POSSIBLE_DEADLOCK;
772                         }
773
774                         DBG_DEBUG("Waiting for lck.exclusive=%s\n",
775                                   server_id_str_buf(lck.exclusive, &tmp));
776
777                         /*
778                          * We will return NT_STATUS_LOCK_NOT_GRANTED
779                          * and need to monitor the record.
780                          *
781                          * If we don't have a watcher instance yet,
782                          * we should add one.
783                          */
784                         if (state->watch_instance == 0) {
785                                 state->watch_instance =
786                                         dbwrap_watched_watch_add_instance(rec);
787                         }
788
789                         *blocker = lck.exclusive;
790                         return NT_STATUS_LOCK_NOT_GRANTED;
791                 }
792
793                 if (type == G_LOCK_DOWNGRADE) {
794                         DBG_DEBUG("Downgrading %s from WRITE to READ\n",
795                                   server_id_str_buf(self, &tmp));
796
797                         lck.exclusive = (struct server_id) { .pid = 0 };
798                         goto do_shared;
799                 }
800
801                 if (!retry) {
802                         dbwrap_watched_watch_remove_instance(rec,
803                                                 state->watch_instance);
804
805                         DBG_DEBUG("%s already locked by self\n",
806                                   server_id_str_buf(self, &tmp));
807                         return NT_STATUS_WAS_LOCKED;
808                 }
809
810                 g_lock_cleanup_shared(&lck);
811
812                 if (lck.num_shared != 0) {
813                         g_lock_get_shared(&lck, 0, blocker);
814
815                         DBG_DEBUG("Continue waiting for shared lock %s\n",
816                                   server_id_str_buf(*blocker, &tmp));
817
818                         /*
819                          * We will return NT_STATUS_LOCK_NOT_GRANTED
820                          * and need to monitor the record.
821                          *
822                          * If we don't have a watcher instance yet,
823                          * we should add one.
824                          */
825                         if (state->watch_instance == 0) {
826                                 state->watch_instance =
827                                         dbwrap_watched_watch_add_instance(rec);
828                         }
829
830                         return NT_STATUS_LOCK_NOT_GRANTED;
831                 }
832
833                 /*
834                  * Retry after a conflicting lock was released..
835                  * All pending readers are gone so we got the lock...
836                  */
837                 goto got_lock;
838         }
839
840 noexclusive:
841
842         if (type == G_LOCK_UPGRADE) {
843                 ssize_t shared_idx = g_lock_find_shared(&lck, &self);
844
845                 if (shared_idx == -1) {
846                         dbwrap_watched_watch_remove_instance(rec,
847                                                 state->watch_instance);
848
849                         DBG_DEBUG("Trying to upgrade %s without "
850                                   "existing shared lock\n",
851                                   server_id_str_buf(self, &tmp));
852                         return NT_STATUS_NOT_LOCKED;
853                 }
854
855                 g_lock_del_shared(&lck, shared_idx);
856                 type = G_LOCK_WRITE;
857         }
858
859         if (type == G_LOCK_WRITE) {
860                 ssize_t shared_idx = g_lock_find_shared(&lck, &self);
861
862                 if (shared_idx != -1) {
863                         dbwrap_watched_watch_remove_instance(rec,
864                                                 state->watch_instance);
865                         DBG_DEBUG("Trying to writelock existing shared %s\n",
866                                   server_id_str_buf(self, &tmp));
867                         return NT_STATUS_WAS_LOCKED;
868                 }
869
870                 lck.exclusive = self;
871
872                 g_lock_cleanup_shared(&lck);
873
874                 if (lck.num_shared == 0) {
875                         /*
876                          * If we store ourself as exclusive writter,
877                          * without any pending readers ...
878                          */
879                         goto got_lock;
880                 }
881
882                 if (state->watch_instance == 0) {
883                         /*
884                          * Here we have lck.num_shared != 0.
885                          *
886                          * We will return NT_STATUS_LOCK_NOT_GRANTED
887                          * below.
888                          *
889                          * And don't have a watcher instance yet!
890                          *
891                          * We add it here before g_lock_store()
892                          * in order to trigger just one
893                          * low level dbwrap_do_locked() call.
894                          */
895                         state->watch_instance =
896                                 dbwrap_watched_watch_add_instance(rec);
897                 }
898
899                 status = g_lock_store(rec, &lck, NULL, NULL, 0);
900                 if (!NT_STATUS_IS_OK(status)) {
901                         DBG_DEBUG("g_lock_store() failed: %s\n",
902                                   nt_errstr(status));
903                         return status;
904                 }
905
906                 talloc_set_destructor(
907                         req_state, g_lock_lock_state_destructor);
908
909                 g_lock_get_shared(&lck, 0, blocker);
910
911                 DBG_DEBUG("Waiting for %zu shared locks, "
912                           "picking blocker %s\n",
913                           lck.num_shared,
914                           server_id_str_buf(*blocker, &tmp));
915
916                 return NT_STATUS_LOCK_NOT_GRANTED;
917         }
918
919 do_shared:
920
921         g_lock_cleanup_shared(&lck);
922         cb_state.new_shared = &self;
923         goto got_lock;
924
925 got_lock:
926         /*
927          * We got the lock we asked for, so we no
928          * longer need to monitor the record.
929          */
930         dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
931
932         status = g_lock_lock_cb_run_and_store(&cb_state);
933         if (!NT_STATUS_IS_OK(status) &&
934             !NT_STATUS_EQUAL(status, NT_STATUS_WAS_UNLOCKED))
935         {
936                 DBG_WARNING("g_lock_lock_cb_run_and_store() failed: %s\n",
937                             nt_errstr(status));
938                 return status;
939         }
940
941         talloc_set_destructor(req_state, NULL);
942         return status;
943 }
944
945 static void g_lock_lock_fn(
946         struct db_record *rec,
947         TDB_DATA value,
948         void *private_data)
949 {
950         struct g_lock_lock_fn_state *state = private_data;
951         struct server_id blocker = {0};
952
953         /*
954          * We're trying to get a lock and if we are
955          * successful in doing that, we should not
956          * wakeup any other waiters, all they would
957          * find is that we're holding a lock they
958          * are conflicting with.
959          */
960         dbwrap_watched_watch_skip_alerting(rec);
961
962         state->status = g_lock_trylock(rec, state, value, &blocker);
963         if (!NT_STATUS_IS_OK(state->status)) {
964                 DBG_DEBUG("g_lock_trylock returned %s\n",
965                           nt_errstr(state->status));
966         }
967         if (!NT_STATUS_EQUAL(state->status, NT_STATUS_LOCK_NOT_GRANTED)) {
968                 return;
969         }
970
971         state->watch_req = dbwrap_watched_watch_send(
972                 state->req_state, state->req_state->ev, rec, state->watch_instance, blocker);
973         if (state->watch_req == NULL) {
974                 state->status = NT_STATUS_NO_MEMORY;
975         }
976 }
977
978 static int g_lock_lock_state_destructor(struct g_lock_lock_state *s)
979 {
980         NTSTATUS status = g_lock_unlock(s->ctx, s->key);
981         if (!NT_STATUS_IS_OK(status)) {
982                 DBG_DEBUG("g_lock_unlock failed: %s\n", nt_errstr(status));
983         }
984         return 0;
985 }
986
987 static void g_lock_lock_retry(struct tevent_req *subreq);
988
989 struct tevent_req *g_lock_lock_send(TALLOC_CTX *mem_ctx,
990                                     struct tevent_context *ev,
991                                     struct g_lock_ctx *ctx,
992                                     TDB_DATA key,
993                                     enum g_lock_type type,
994                                     g_lock_lock_cb_fn_t cb_fn,
995                                     void *cb_private)
996 {
997         struct tevent_req *req;
998         struct g_lock_lock_state *state;
999         struct g_lock_lock_fn_state fn_state;
1000         NTSTATUS status;
1001         bool ok;
1002
1003         SMB_ASSERT(!ctx->busy);
1004
1005         req = tevent_req_create(mem_ctx, &state, struct g_lock_lock_state);
1006         if (req == NULL) {
1007                 return NULL;
1008         }
1009         state->ev = ev;
1010         state->ctx = ctx;
1011         state->key = key;
1012         state->type = type;
1013         state->cb_fn = cb_fn;
1014         state->cb_private = cb_private;
1015
1016         fn_state = (struct g_lock_lock_fn_state) {
1017                 .req_state = state,
1018         };
1019
1020         /*
1021          * We allow a cn_fn only for G_LOCK_WRITE for now.
1022          *
1023          * It's all we currently need and it makes a few things
1024          * easier to implement.
1025          */
1026         if (unlikely(cb_fn != NULL && type != G_LOCK_WRITE)) {
1027                 tevent_req_nterror(req, NT_STATUS_INVALID_PARAMETER_6);
1028                 return tevent_req_post(req, ev);
1029         }
1030
1031         status = dbwrap_do_locked(ctx->db, key, g_lock_lock_fn, &fn_state);
1032         if (tevent_req_nterror(req, status)) {
1033                 DBG_DEBUG("dbwrap_do_locked failed: %s\n",
1034                           nt_errstr(status));
1035                 return tevent_req_post(req, ev);
1036         }
1037
1038         if (NT_STATUS_IS_OK(fn_state.status)) {
1039                 tevent_req_done(req);
1040                 return tevent_req_post(req, ev);
1041         }
1042         if (!NT_STATUS_EQUAL(fn_state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
1043                 tevent_req_nterror(req, fn_state.status);
1044                 return tevent_req_post(req, ev);
1045         }
1046
1047         if (tevent_req_nomem(fn_state.watch_req, req)) {
1048                 return tevent_req_post(req, ev);
1049         }
1050
1051         ok = tevent_req_set_endtime(
1052                 fn_state.watch_req,
1053                 state->ev,
1054                 timeval_current_ofs(5 + generate_random() % 5, 0));
1055         if (!ok) {
1056                 tevent_req_oom(req);
1057                 return tevent_req_post(req, ev);
1058         }
1059         tevent_req_set_callback(fn_state.watch_req, g_lock_lock_retry, req);
1060
1061         return req;
1062 }
1063
1064 static void g_lock_lock_retry(struct tevent_req *subreq)
1065 {
1066         struct tevent_req *req = tevent_req_callback_data(
1067                 subreq, struct tevent_req);
1068         struct g_lock_lock_state *state = tevent_req_data(
1069                 req, struct g_lock_lock_state);
1070         struct g_lock_lock_fn_state fn_state;
1071         struct server_id blocker = { .pid = 0 };
1072         bool blockerdead = false;
1073         NTSTATUS status;
1074         uint64_t instance = 0;
1075
1076         status = dbwrap_watched_watch_recv(subreq, &instance, &blockerdead, &blocker);
1077         DBG_DEBUG("watch_recv returned %s\n", nt_errstr(status));
1078         TALLOC_FREE(subreq);
1079
1080         if (!NT_STATUS_IS_OK(status) &&
1081             !NT_STATUS_EQUAL(status, NT_STATUS_IO_TIMEOUT)) {
1082                 tevent_req_nterror(req, status);
1083                 return;
1084         }
1085
1086         state->retry = true;
1087
1088         fn_state = (struct g_lock_lock_fn_state) {
1089                 .req_state = state,
1090                 .dead_blocker = blockerdead ? &blocker : NULL,
1091                 .watch_instance = instance,
1092         };
1093
1094         status = dbwrap_do_locked(state->ctx->db, state->key,
1095                                   g_lock_lock_fn, &fn_state);
1096         if (tevent_req_nterror(req, status)) {
1097                 DBG_DEBUG("dbwrap_do_locked failed: %s\n",
1098                           nt_errstr(status));
1099                 return;
1100         }
1101
1102         if (NT_STATUS_IS_OK(fn_state.status)) {
1103                 tevent_req_done(req);
1104                 return;
1105         }
1106         if (!NT_STATUS_EQUAL(fn_state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
1107                 tevent_req_nterror(req, fn_state.status);
1108                 return;
1109         }
1110
1111         if (tevent_req_nomem(fn_state.watch_req, req)) {
1112                 return;
1113         }
1114
1115         if (!tevent_req_set_endtime(
1116                     fn_state.watch_req, state->ev,
1117                     timeval_current_ofs(5 + generate_random() % 5, 0))) {
1118                 return;
1119         }
1120         tevent_req_set_callback(fn_state.watch_req, g_lock_lock_retry, req);
1121 }
1122
1123 NTSTATUS g_lock_lock_recv(struct tevent_req *req)
1124 {
1125         struct g_lock_lock_state *state = tevent_req_data(
1126                 req, struct g_lock_lock_state);
1127         struct g_lock_ctx *ctx = state->ctx;
1128         NTSTATUS status;
1129
1130         if (tevent_req_is_nterror(req, &status)) {
1131                 if (NT_STATUS_EQUAL(status, NT_STATUS_WAS_UNLOCKED)) {
1132                         return NT_STATUS_OK;
1133                 }
1134                 return status;
1135         }
1136
1137         if ((ctx->lock_order != DBWRAP_LOCK_ORDER_NONE) &&
1138             ((state->type == G_LOCK_READ) ||
1139              (state->type == G_LOCK_WRITE))) {
1140                 const char *name = dbwrap_name(ctx->db);
1141                 dbwrap_lock_order_lock(name, ctx->lock_order);
1142         }
1143
1144         return NT_STATUS_OK;
1145 }
1146
1147 struct g_lock_lock_simple_state {
1148         struct g_lock_ctx *ctx;
1149         struct server_id me;
1150         enum g_lock_type type;
1151         NTSTATUS status;
1152         g_lock_lock_cb_fn_t cb_fn;
1153         void *cb_private;
1154 };
1155
1156 static void g_lock_lock_simple_fn(
1157         struct db_record *rec,
1158         TDB_DATA value,
1159         void *private_data)
1160 {
1161         struct g_lock_lock_simple_state *state = private_data;
1162         struct server_id_buf buf;
1163         struct g_lock lck = { .exclusive.pid = 0 };
1164         struct g_lock_lock_cb_state cb_state = {
1165                 .ctx = state->ctx,
1166                 .rec = rec,
1167                 .lck = &lck,
1168                 .cb_fn = state->cb_fn,
1169                 .cb_private = state->cb_private,
1170                 .existed = value.dsize != 0,
1171                 .update_mem_ctx = talloc_tos(),
1172         };
1173         bool ok;
1174
1175         ok = g_lock_parse(value.dptr, value.dsize, &lck);
1176         if (!ok) {
1177                 DBG_DEBUG("g_lock_parse failed\n");
1178                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1179                 return;
1180         }
1181
1182         if (lck.exclusive.pid != 0) {
1183                 DBG_DEBUG("locked by %s\n",
1184                           server_id_str_buf(lck.exclusive, &buf));
1185                 goto not_granted;
1186         }
1187
1188         if (state->type == G_LOCK_WRITE) {
1189                 if (lck.num_shared != 0) {
1190                         DBG_DEBUG("num_shared=%zu\n", lck.num_shared);
1191                         goto not_granted;
1192                 }
1193                 lck.exclusive = state->me;
1194         } else if (state->type == G_LOCK_READ) {
1195                 g_lock_cleanup_shared(&lck);
1196                 cb_state.new_shared = &state->me;
1197         } else {
1198                 smb_panic(__location__);
1199         }
1200
1201         lck.unique_lock_epoch = generate_unique_u64(lck.unique_lock_epoch);
1202
1203         /*
1204          * We are going to store us as owner,
1205          * so we got what we were waiting for.
1206          *
1207          * So we no longer need to monitor the
1208          * record.
1209          */
1210         dbwrap_watched_watch_skip_alerting(rec);
1211
1212         state->status = g_lock_lock_cb_run_and_store(&cb_state);
1213         if (!NT_STATUS_IS_OK(state->status) &&
1214             !NT_STATUS_EQUAL(state->status, NT_STATUS_WAS_UNLOCKED))
1215         {
1216                 DBG_WARNING("g_lock_lock_cb_run_and_store() failed: %s\n",
1217                             nt_errstr(state->status));
1218                 return;
1219         }
1220
1221         return;
1222
1223 not_granted:
1224         state->status = NT_STATUS_LOCK_NOT_GRANTED;
1225 }
1226
1227 NTSTATUS g_lock_lock(struct g_lock_ctx *ctx, TDB_DATA key,
1228                      enum g_lock_type type, struct timeval timeout,
1229                      g_lock_lock_cb_fn_t cb_fn,
1230                      void *cb_private)
1231 {
1232         TALLOC_CTX *frame;
1233         struct tevent_context *ev;
1234         struct tevent_req *req;
1235         struct timeval end;
1236         NTSTATUS status;
1237
1238         SMB_ASSERT(!ctx->busy);
1239
1240         /*
1241          * We allow a cn_fn only for G_LOCK_WRITE for now.
1242          *
1243          * It's all we currently need and it makes a few things
1244          * easier to implement.
1245          */
1246         if (unlikely(cb_fn != NULL && type != G_LOCK_WRITE)) {
1247                 return NT_STATUS_INVALID_PARAMETER_5;
1248         }
1249
1250         if ((type == G_LOCK_READ) || (type == G_LOCK_WRITE)) {
1251                 /*
1252                  * This is an abstraction violation: Normally we do
1253                  * the sync wrappers around async functions with full
1254                  * nested event contexts. However, this is used in
1255                  * very hot code paths, so avoid the event context
1256                  * creation for the good path where there's no lock
1257                  * contention. My benchmark gave a factor of 2
1258                  * improvement for lock/unlock.
1259                  */
1260                 struct g_lock_lock_simple_state state = {
1261                         .ctx = ctx,
1262                         .me = messaging_server_id(ctx->msg),
1263                         .type = type,
1264                         .cb_fn = cb_fn,
1265                         .cb_private = cb_private,
1266                 };
1267                 status = dbwrap_do_locked(
1268                         ctx->db, key, g_lock_lock_simple_fn, &state);
1269                 if (!NT_STATUS_IS_OK(status)) {
1270                         DBG_DEBUG("dbwrap_do_locked() failed: %s\n",
1271                                   nt_errstr(status));
1272                         return status;
1273                 }
1274
1275                 DBG_DEBUG("status=%s, state.status=%s\n",
1276                           nt_errstr(status),
1277                           nt_errstr(state.status));
1278
1279                 if (NT_STATUS_IS_OK(state.status)) {
1280                         if (ctx->lock_order != DBWRAP_LOCK_ORDER_NONE) {
1281                                 const char *name = dbwrap_name(ctx->db);
1282                                 dbwrap_lock_order_lock(name, ctx->lock_order);
1283                         }
1284                         return NT_STATUS_OK;
1285                 }
1286                 if (NT_STATUS_EQUAL(state.status, NT_STATUS_WAS_UNLOCKED)) {
1287                         /* without dbwrap_lock_order_lock() */
1288                         return NT_STATUS_OK;
1289                 }
1290                 if (!NT_STATUS_EQUAL(
1291                             state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
1292                         return state.status;
1293                 }
1294
1295                 if (timeval_is_zero(&timeout)) {
1296                         return NT_STATUS_LOCK_NOT_GRANTED;
1297                 }
1298
1299                 /*
1300                  * Fall back to the full g_lock_trylock logic,
1301                  * g_lock_lock_simple_fn() called above only covers
1302                  * the uncontended path.
1303                  */
1304         }
1305
1306         frame = talloc_stackframe();
1307         status = NT_STATUS_NO_MEMORY;
1308
1309         ev = samba_tevent_context_init(frame);
1310         if (ev == NULL) {
1311                 goto fail;
1312         }
1313         req = g_lock_lock_send(frame, ev, ctx, key, type, cb_fn, cb_private);
1314         if (req == NULL) {
1315                 goto fail;
1316         }
1317         end = timeval_current_ofs(timeout.tv_sec, timeout.tv_usec);
1318         if (!tevent_req_set_endtime(req, ev, end)) {
1319                 goto fail;
1320         }
1321         if (!tevent_req_poll_ntstatus(req, ev, &status)) {
1322                 goto fail;
1323         }
1324         status = g_lock_lock_recv(req);
1325  fail:
1326         TALLOC_FREE(frame);
1327         return status;
1328 }
1329
1330 struct g_lock_unlock_state {
1331         struct server_id self;
1332         NTSTATUS status;
1333 };
1334
1335 static void g_lock_unlock_fn(
1336         struct db_record *rec,
1337         TDB_DATA value,
1338         void *private_data)
1339 {
1340         struct g_lock_unlock_state *state = private_data;
1341         struct server_id_buf tmp1, tmp2;
1342         struct g_lock lck;
1343         size_t i;
1344         bool ok, exclusive;
1345
1346         ok = g_lock_parse(value.dptr, value.dsize, &lck);
1347         if (!ok) {
1348                 DBG_DEBUG("g_lock_parse() failed\n");
1349                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1350                 return;
1351         }
1352
1353         exclusive = server_id_equal(&state->self, &lck.exclusive);
1354
1355         for (i=0; i<lck.num_shared; i++) {
1356                 struct server_id shared;
1357                 g_lock_get_shared(&lck, i, &shared);
1358                 if (server_id_equal(&state->self, &shared)) {
1359                         break;
1360                 }
1361         }
1362
1363         if (i < lck.num_shared) {
1364                 if (exclusive) {
1365                         DBG_DEBUG("%s both exclusive and shared (%zu)\n",
1366                                   server_id_str_buf(state->self, &tmp1),
1367                                   i);
1368                         state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1369                         return;
1370                 }
1371                 g_lock_del_shared(&lck, i);
1372         } else {
1373                 if (!exclusive) {
1374                         DBG_DEBUG("Lock not found, self=%s, lck.exclusive=%s, "
1375                                   "num_shared=%zu\n",
1376                                   server_id_str_buf(state->self, &tmp1),
1377                                   server_id_str_buf(lck.exclusive, &tmp2),
1378                                   lck.num_shared);
1379                         state->status = NT_STATUS_NOT_FOUND;
1380                         return;
1381                 }
1382                 lck.exclusive = (struct server_id) { .pid = 0 };
1383         }
1384
1385         if ((lck.exclusive.pid == 0) &&
1386             (lck.num_shared == 0) &&
1387             (lck.datalen == 0)) {
1388                 state->status = dbwrap_record_delete(rec);
1389                 return;
1390         }
1391
1392         if (!exclusive && lck.exclusive.pid != 0) {
1393                 /*
1394                  * We only had a read lock and there's
1395                  * someone waiting for an exclusive lock.
1396                  *
1397                  * Don't alert the exclusive lock waiter
1398                  * if there are still other read lock holders.
1399                  */
1400                 g_lock_cleanup_shared(&lck);
1401                 if (lck.num_shared != 0) {
1402                         dbwrap_watched_watch_skip_alerting(rec);
1403                 }
1404         }
1405
1406         lck.unique_lock_epoch = generate_unique_u64(lck.unique_lock_epoch);
1407
1408         state->status = g_lock_store(rec, &lck, NULL, NULL, 0);
1409 }
1410
1411 NTSTATUS g_lock_unlock(struct g_lock_ctx *ctx, TDB_DATA key)
1412 {
1413         struct g_lock_unlock_state state = {
1414                 .self = messaging_server_id(ctx->msg),
1415         };
1416         NTSTATUS status;
1417
1418         SMB_ASSERT(!ctx->busy);
1419
1420         status = dbwrap_do_locked(ctx->db, key, g_lock_unlock_fn, &state);
1421         if (!NT_STATUS_IS_OK(status)) {
1422                 DBG_WARNING("dbwrap_do_locked failed: %s\n",
1423                             nt_errstr(status));
1424                 return status;
1425         }
1426         if (!NT_STATUS_IS_OK(state.status)) {
1427                 DBG_WARNING("g_lock_unlock_fn failed: %s\n",
1428                             nt_errstr(state.status));
1429                 return state.status;
1430         }
1431
1432         if (ctx->lock_order != DBWRAP_LOCK_ORDER_NONE) {
1433                 const char *name = dbwrap_name(ctx->db);
1434                 dbwrap_lock_order_unlock(name, ctx->lock_order);
1435         }
1436
1437         return NT_STATUS_OK;
1438 }
1439
1440 struct g_lock_writev_data_state {
1441         TDB_DATA key;
1442         struct server_id self;
1443         const TDB_DATA *dbufs;
1444         size_t num_dbufs;
1445         NTSTATUS status;
1446 };
1447
1448 static void g_lock_writev_data_fn(
1449         struct db_record *rec,
1450         TDB_DATA value,
1451         void *private_data)
1452 {
1453         struct g_lock_writev_data_state *state = private_data;
1454         struct g_lock lck;
1455         bool exclusive;
1456         bool ok;
1457
1458         /*
1459          * We're holding an exclusiv write lock.
1460          *
1461          * Now we're updating the content of the record.
1462          *
1463          * We should not wakeup any other waiters, all they
1464          * would find is that we're still holding a lock they
1465          * are conflicting with.
1466          */
1467         dbwrap_watched_watch_skip_alerting(rec);
1468
1469         ok = g_lock_parse(value.dptr, value.dsize, &lck);
1470         if (!ok) {
1471                 DBG_DEBUG("g_lock_parse for %s failed\n",
1472                           tdb_data_dbg(state->key));
1473                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1474                 return;
1475         }
1476
1477         exclusive = server_id_equal(&state->self, &lck.exclusive);
1478
1479         /*
1480          * Make sure we're really exclusive. We are marked as
1481          * exclusive when we are waiting for an exclusive lock
1482          */
1483         exclusive &= (lck.num_shared == 0);
1484
1485         if (!exclusive) {
1486                 struct server_id_buf buf1, buf2;
1487                 DBG_DEBUG("Not locked by us: self=%s, lck.exclusive=%s, "
1488                           "lck.num_shared=%zu\n",
1489                           server_id_str_buf(state->self, &buf1),
1490                           server_id_str_buf(lck.exclusive, &buf2),
1491                           lck.num_shared);
1492                 state->status = NT_STATUS_NOT_LOCKED;
1493                 return;
1494         }
1495
1496         lck.unique_data_epoch = generate_unique_u64(lck.unique_data_epoch);
1497         lck.data = NULL;
1498         lck.datalen = 0;
1499         state->status = g_lock_store(
1500                 rec, &lck, NULL, state->dbufs, state->num_dbufs);
1501 }
1502
1503 NTSTATUS g_lock_writev_data(
1504         struct g_lock_ctx *ctx,
1505         TDB_DATA key,
1506         const TDB_DATA *dbufs,
1507         size_t num_dbufs)
1508 {
1509         struct g_lock_writev_data_state state = {
1510                 .key = key,
1511                 .self = messaging_server_id(ctx->msg),
1512                 .dbufs = dbufs,
1513                 .num_dbufs = num_dbufs,
1514         };
1515         NTSTATUS status;
1516
1517         SMB_ASSERT(!ctx->busy);
1518
1519         status = dbwrap_do_locked(
1520                 ctx->db, key, g_lock_writev_data_fn, &state);
1521         if (!NT_STATUS_IS_OK(status)) {
1522                 DBG_WARNING("dbwrap_do_locked failed: %s\n",
1523                             nt_errstr(status));
1524                 return status;
1525         }
1526         if (!NT_STATUS_IS_OK(state.status)) {
1527                 DBG_WARNING("g_lock_writev_data_fn failed: %s\n",
1528                             nt_errstr(state.status));
1529                 return state.status;
1530         }
1531
1532         return NT_STATUS_OK;
1533 }
1534
1535 NTSTATUS g_lock_write_data(struct g_lock_ctx *ctx, TDB_DATA key,
1536                            const uint8_t *buf, size_t buflen)
1537 {
1538         TDB_DATA dbuf = {
1539                 .dptr = discard_const_p(uint8_t, buf),
1540                 .dsize = buflen,
1541         };
1542         return g_lock_writev_data(ctx, key, &dbuf, 1);
1543 }
1544
1545 struct g_lock_locks_state {
1546         int (*fn)(TDB_DATA key, void *private_data);
1547         void *private_data;
1548 };
1549
1550 static int g_lock_locks_fn(struct db_record *rec, void *priv)
1551 {
1552         TDB_DATA key;
1553         struct g_lock_locks_state *state = (struct g_lock_locks_state *)priv;
1554
1555         key = dbwrap_record_get_key(rec);
1556         return state->fn(key, state->private_data);
1557 }
1558
1559 int g_lock_locks(struct g_lock_ctx *ctx,
1560                  int (*fn)(TDB_DATA key, void *private_data),
1561                  void *private_data)
1562 {
1563         struct g_lock_locks_state state;
1564         NTSTATUS status;
1565         int count;
1566
1567         SMB_ASSERT(!ctx->busy);
1568
1569         state.fn = fn;
1570         state.private_data = private_data;
1571
1572         status = dbwrap_traverse_read(ctx->db, g_lock_locks_fn, &state, &count);
1573         if (!NT_STATUS_IS_OK(status)) {
1574                 return -1;
1575         }
1576         return count;
1577 }
1578
1579 struct g_lock_dump_state {
1580         TALLOC_CTX *mem_ctx;
1581         TDB_DATA key;
1582         void (*fn)(struct server_id exclusive,
1583                    size_t num_shared,
1584                    const struct server_id *shared,
1585                    const uint8_t *data,
1586                    size_t datalen,
1587                    void *private_data);
1588         void *private_data;
1589         NTSTATUS status;
1590         enum dbwrap_req_state req_state;
1591 };
1592
1593 static void g_lock_dump_fn(TDB_DATA key, TDB_DATA data,
1594                            void *private_data)
1595 {
1596         struct g_lock_dump_state *state = private_data;
1597         struct g_lock lck = (struct g_lock) { .exclusive.pid = 0 };
1598         struct server_id *shared = NULL;
1599         size_t i;
1600         bool ok;
1601
1602         ok = g_lock_parse(data.dptr, data.dsize, &lck);
1603         if (!ok) {
1604                 DBG_DEBUG("g_lock_parse failed for %s\n",
1605                           tdb_data_dbg(state->key));
1606                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1607                 return;
1608         }
1609
1610         if (lck.num_shared > 0) {
1611                 shared = talloc_array(
1612                         state->mem_ctx, struct server_id, lck.num_shared);
1613                 if (shared == NULL) {
1614                         DBG_DEBUG("talloc failed\n");
1615                         state->status = NT_STATUS_NO_MEMORY;
1616                         return;
1617                 }
1618         }
1619
1620         for (i=0; i<lck.num_shared; i++) {
1621                 g_lock_get_shared(&lck, i, &shared[i]);
1622         }
1623
1624         state->fn(lck.exclusive,
1625                   lck.num_shared,
1626                   shared,
1627                   lck.data,
1628                   lck.datalen,
1629                   state->private_data);
1630
1631         TALLOC_FREE(shared);
1632
1633         state->status = NT_STATUS_OK;
1634 }
1635
1636 NTSTATUS g_lock_dump(struct g_lock_ctx *ctx, TDB_DATA key,
1637                      void (*fn)(struct server_id exclusive,
1638                                 size_t num_shared,
1639                                 const struct server_id *shared,
1640                                 const uint8_t *data,
1641                                 size_t datalen,
1642                                 void *private_data),
1643                      void *private_data)
1644 {
1645         struct g_lock_dump_state state = {
1646                 .mem_ctx = ctx, .key = key,
1647                 .fn = fn, .private_data = private_data
1648         };
1649         NTSTATUS status;
1650
1651         SMB_ASSERT(!ctx->busy);
1652
1653         status = dbwrap_parse_record(ctx->db, key, g_lock_dump_fn, &state);
1654         if (!NT_STATUS_IS_OK(status)) {
1655                 DBG_DEBUG("dbwrap_parse_record returned %s\n",
1656                           nt_errstr(status));
1657                 return status;
1658         }
1659         if (!NT_STATUS_IS_OK(state.status)) {
1660                 DBG_DEBUG("g_lock_dump_fn returned %s\n",
1661                           nt_errstr(state.status));
1662                 return state.status;
1663         }
1664         return NT_STATUS_OK;
1665 }
1666
1667 static void g_lock_dump_done(struct tevent_req *subreq);
1668
1669 struct tevent_req *g_lock_dump_send(
1670         TALLOC_CTX *mem_ctx,
1671         struct tevent_context *ev,
1672         struct g_lock_ctx *ctx,
1673         TDB_DATA key,
1674         void (*fn)(struct server_id exclusive,
1675                    size_t num_shared,
1676                    const struct server_id *shared,
1677                    const uint8_t *data,
1678                    size_t datalen,
1679                    void *private_data),
1680         void *private_data)
1681 {
1682         struct tevent_req *req = NULL, *subreq = NULL;
1683         struct g_lock_dump_state *state = NULL;
1684
1685         SMB_ASSERT(!ctx->busy);
1686
1687         req = tevent_req_create(mem_ctx, &state, struct g_lock_dump_state);
1688         if (req == NULL) {
1689                 return NULL;
1690         }
1691         state->mem_ctx = state;
1692         state->key = key;
1693         state->fn = fn;
1694         state->private_data = private_data;
1695
1696         SMB_ASSERT(!ctx->busy);
1697
1698         subreq = dbwrap_parse_record_send(
1699                 state,
1700                 ev,
1701                 ctx->db,
1702                 key,
1703                 g_lock_dump_fn,
1704                 state,
1705                 &state->req_state);
1706         if (tevent_req_nomem(subreq, req)) {
1707                 return tevent_req_post(req, ev);
1708         }
1709         tevent_req_set_callback(subreq, g_lock_dump_done, req);
1710         return req;
1711 }
1712
1713 static void g_lock_dump_done(struct tevent_req *subreq)
1714 {
1715         struct tevent_req *req = tevent_req_callback_data(
1716                 subreq, struct tevent_req);
1717         struct g_lock_dump_state *state = tevent_req_data(
1718                 req, struct g_lock_dump_state);
1719         NTSTATUS status;
1720
1721         status = dbwrap_parse_record_recv(subreq);
1722         TALLOC_FREE(subreq);
1723         if (tevent_req_nterror(req, status) ||
1724             tevent_req_nterror(req, state->status)) {
1725                 return;
1726         }
1727         tevent_req_done(req);
1728 }
1729
1730 NTSTATUS g_lock_dump_recv(struct tevent_req *req)
1731 {
1732         return tevent_req_simple_recv_ntstatus(req);
1733 }
1734
1735 int g_lock_seqnum(struct g_lock_ctx *ctx)
1736 {
1737         return dbwrap_get_seqnum(ctx->db);
1738 }
1739
1740 struct g_lock_watch_data_state {
1741         struct tevent_context *ev;
1742         struct g_lock_ctx *ctx;
1743         TDB_DATA key;
1744         struct server_id blocker;
1745         bool blockerdead;
1746         uint64_t unique_lock_epoch;
1747         uint64_t unique_data_epoch;
1748         uint64_t watch_instance;
1749         NTSTATUS status;
1750 };
1751
1752 static void g_lock_watch_data_done(struct tevent_req *subreq);
1753
1754 static void g_lock_watch_data_send_fn(
1755         struct db_record *rec,
1756         TDB_DATA value,
1757         void *private_data)
1758 {
1759         struct tevent_req *req = talloc_get_type_abort(
1760                 private_data, struct tevent_req);
1761         struct g_lock_watch_data_state *state = tevent_req_data(
1762                 req, struct g_lock_watch_data_state);
1763         struct tevent_req *subreq = NULL;
1764         struct g_lock lck;
1765         bool ok;
1766
1767         ok = g_lock_parse(value.dptr, value.dsize, &lck);
1768         if (!ok) {
1769                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1770                 return;
1771         }
1772         state->unique_lock_epoch = lck.unique_lock_epoch;
1773         state->unique_data_epoch = lck.unique_data_epoch;
1774
1775         DBG_DEBUG("state->unique_data_epoch=%"PRIu64"\n", state->unique_data_epoch);
1776
1777         subreq = dbwrap_watched_watch_send(
1778                 state, state->ev, rec, 0, state->blocker);
1779         if (subreq == NULL) {
1780                 state->status = NT_STATUS_NO_MEMORY;
1781                 return;
1782         }
1783         tevent_req_set_callback(subreq, g_lock_watch_data_done, req);
1784
1785         state->status = NT_STATUS_EVENT_PENDING;
1786 }
1787
1788 struct tevent_req *g_lock_watch_data_send(
1789         TALLOC_CTX *mem_ctx,
1790         struct tevent_context *ev,
1791         struct g_lock_ctx *ctx,
1792         TDB_DATA key,
1793         struct server_id blocker)
1794 {
1795         struct tevent_req *req = NULL;
1796         struct g_lock_watch_data_state *state = NULL;
1797         NTSTATUS status;
1798
1799         SMB_ASSERT(!ctx->busy);
1800
1801         req = tevent_req_create(
1802                 mem_ctx, &state, struct g_lock_watch_data_state);
1803         if (req == NULL) {
1804                 return NULL;
1805         }
1806         state->ev = ev;
1807         state->ctx = ctx;
1808         state->blocker = blocker;
1809
1810         state->key = tdb_data_talloc_copy(state, key);
1811         if (tevent_req_nomem(state->key.dptr, req)) {
1812                 return tevent_req_post(req, ev);
1813         }
1814
1815         status = dbwrap_do_locked(
1816                 ctx->db, key, g_lock_watch_data_send_fn, req);
1817         if (tevent_req_nterror(req, status)) {
1818                 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
1819                 return tevent_req_post(req, ev);
1820         }
1821
1822         if (NT_STATUS_EQUAL(state->status, NT_STATUS_EVENT_PENDING)) {
1823                 return req;
1824         }
1825         if (tevent_req_nterror(req, state->status)) {
1826                 return tevent_req_post(req, ev);
1827         }
1828         tevent_req_done(req);
1829         return tevent_req_post(req, ev);
1830 }
1831
1832 static void g_lock_watch_data_done_fn(
1833         struct db_record *rec,
1834         TDB_DATA value,
1835         void *private_data)
1836 {
1837         struct tevent_req *req = talloc_get_type_abort(
1838                 private_data, struct tevent_req);
1839         struct g_lock_watch_data_state *state = tevent_req_data(
1840                 req, struct g_lock_watch_data_state);
1841         struct tevent_req *subreq = NULL;
1842         struct g_lock lck;
1843         bool ok;
1844
1845         ok = g_lock_parse(value.dptr, value.dsize, &lck);
1846         if (!ok) {
1847                 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
1848                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1849                 return;
1850         }
1851
1852         if (lck.unique_data_epoch != state->unique_data_epoch) {
1853                 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
1854                 DBG_DEBUG("lck.unique_data_epoch=%"PRIu64", "
1855                           "state->unique_data_epoch=%"PRIu64"\n",
1856                           lck.unique_data_epoch,
1857                           state->unique_data_epoch);
1858                 state->status = NT_STATUS_OK;
1859                 return;
1860         }
1861
1862         /*
1863          * The lock epoch changed, so we better
1864          * remove ourself from the waiter list
1865          * (most likely the first position)
1866          * and re-add us at the end of the list.
1867          *
1868          * This gives other lock waiters a change
1869          * to make progress.
1870          *
1871          * Otherwise we'll keep our waiter instance alive,
1872          * keep waiting (most likely at first position).
1873          */
1874         if (lck.unique_lock_epoch != state->unique_lock_epoch) {
1875                 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
1876                 state->watch_instance = dbwrap_watched_watch_add_instance(rec);
1877                 state->unique_lock_epoch = lck.unique_lock_epoch;
1878         }
1879
1880         subreq = dbwrap_watched_watch_send(
1881                 state, state->ev, rec, state->watch_instance, state->blocker);
1882         if (subreq == NULL) {
1883                 dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
1884                 state->status = NT_STATUS_NO_MEMORY;
1885                 return;
1886         }
1887         tevent_req_set_callback(subreq, g_lock_watch_data_done, req);
1888
1889         state->status = NT_STATUS_EVENT_PENDING;
1890 }
1891
1892 static void g_lock_watch_data_done(struct tevent_req *subreq)
1893 {
1894         struct tevent_req *req = tevent_req_callback_data(
1895                 subreq, struct tevent_req);
1896         struct g_lock_watch_data_state *state = tevent_req_data(
1897                 req, struct g_lock_watch_data_state);
1898         NTSTATUS status;
1899         uint64_t instance = 0;
1900
1901         status = dbwrap_watched_watch_recv(
1902                 subreq, &instance, &state->blockerdead, &state->blocker);
1903         TALLOC_FREE(subreq);
1904         if (tevent_req_nterror(req, status)) {
1905                 DBG_DEBUG("dbwrap_watched_watch_recv returned %s\n",
1906                           nt_errstr(status));
1907                 return;
1908         }
1909
1910         state->watch_instance = instance;
1911
1912         status = dbwrap_do_locked(
1913                 state->ctx->db, state->key, g_lock_watch_data_done_fn, req);
1914         if (tevent_req_nterror(req, status)) {
1915                 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
1916                 return;
1917         }
1918         if (NT_STATUS_EQUAL(state->status, NT_STATUS_EVENT_PENDING)) {
1919                 return;
1920         }
1921         if (tevent_req_nterror(req, state->status)) {
1922                 return;
1923         }
1924         tevent_req_done(req);
1925 }
1926
1927 NTSTATUS g_lock_watch_data_recv(
1928         struct tevent_req *req,
1929         bool *blockerdead,
1930         struct server_id *blocker)
1931 {
1932         struct g_lock_watch_data_state *state = tevent_req_data(
1933                 req, struct g_lock_watch_data_state);
1934         NTSTATUS status;
1935
1936         if (tevent_req_is_nterror(req, &status)) {
1937                 return status;
1938         }
1939         if (blockerdead != NULL) {
1940                 *blockerdead = state->blockerdead;
1941         }
1942         if (blocker != NULL) {
1943                 *blocker = state->blocker;
1944         }
1945
1946         return NT_STATUS_OK;
1947 }
1948
1949 static void g_lock_wake_watchers_fn(
1950         struct db_record *rec,
1951         TDB_DATA value,
1952         void *private_data)
1953 {
1954         struct g_lock lck = { .exclusive.pid = 0 };
1955         NTSTATUS status;
1956         bool ok;
1957
1958         ok = g_lock_parse(value.dptr, value.dsize, &lck);
1959         if (!ok) {
1960                 DBG_WARNING("g_lock_parse failed\n");
1961                 return;
1962         }
1963
1964         lck.unique_data_epoch = generate_unique_u64(lck.unique_data_epoch);
1965
1966         status = g_lock_store(rec, &lck, NULL, NULL, 0);
1967         if (!NT_STATUS_IS_OK(status)) {
1968                 DBG_WARNING("g_lock_store failed: %s\n", nt_errstr(status));
1969                 return;
1970         }
1971 }
1972
1973 void g_lock_wake_watchers(struct g_lock_ctx *ctx, TDB_DATA key)
1974 {
1975         NTSTATUS status;
1976
1977         SMB_ASSERT(!ctx->busy);
1978
1979         status = dbwrap_do_locked(ctx->db, key, g_lock_wake_watchers_fn, NULL);
1980         if (!NT_STATUS_IS_OK(status)) {
1981                 DBG_DEBUG("dbwrap_do_locked returned %s\n",
1982                           nt_errstr(status));
1983         }
1984 }