6898de88bc21a325d13f92f2bb950b2be688daa2
[samba.git] / source3 / lib / g_lock.c
1 /*
2    Unix SMB/CIFS implementation.
3    global locks based on dbwrap and messaging
4    Copyright (C) 2009 by Volker Lendecke
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "lib/util/server_id.h"
23 #include "lib/util/debug.h"
24 #include "lib/util/talloc_stack.h"
25 #include "lib/util/samba_util.h"
26 #include "lib/util_path.h"
27 #include "dbwrap/dbwrap.h"
28 #include "dbwrap/dbwrap_open.h"
29 #include "dbwrap/dbwrap_watch.h"
30 #include "g_lock.h"
31 #include "util_tdb.h"
32 #include "../lib/util/tevent_ntstatus.h"
33 #include "messages.h"
34 #include "serverid.h"
35
36 struct g_lock_ctx {
37         struct db_context *db;
38         struct messaging_context *msg;
39 };
40
41 struct g_lock {
42         struct server_id exclusive;
43         size_t num_shared;
44         uint8_t *shared;
45         uint64_t data_seqnum;
46         size_t datalen;
47         uint8_t *data;
48 };
49
50 static bool g_lock_parse(uint8_t *buf, size_t buflen, struct g_lock *lck)
51 {
52         struct server_id exclusive;
53         size_t num_shared, shared_len;
54         uint64_t data_seqnum;
55
56         if (buflen < (SERVER_ID_BUF_LENGTH + /* exclusive */
57                       sizeof(uint64_t) +     /* seqnum */
58                       sizeof(uint32_t))) {   /* num_shared */
59                 struct g_lock ret = { .exclusive.pid = 0 };
60                 generate_random_buffer(
61                         (uint8_t *)&ret.data_seqnum,
62                         sizeof(ret.data_seqnum));
63                 *lck = ret;
64                 return true;
65         }
66
67         server_id_get(&exclusive, buf);
68         buf += SERVER_ID_BUF_LENGTH;
69         buflen -= SERVER_ID_BUF_LENGTH;
70
71         data_seqnum = BVAL(buf, 0);
72         buf += sizeof(uint64_t);
73         buflen -= sizeof(uint64_t);
74
75         num_shared = IVAL(buf, 0);
76         buf += sizeof(uint32_t);
77         buflen -= sizeof(uint32_t);
78
79         if (num_shared > buflen/SERVER_ID_BUF_LENGTH) {
80                 DBG_DEBUG("num_shared=%zu, buflen=%zu\n",
81                           num_shared,
82                           buflen);
83                 return false;
84         }
85
86         shared_len = num_shared * SERVER_ID_BUF_LENGTH;
87
88         *lck = (struct g_lock) {
89                 .exclusive = exclusive,
90                 .num_shared = num_shared,
91                 .shared = buf,
92                 .data_seqnum = data_seqnum,
93                 .datalen = buflen-shared_len,
94                 .data = buf+shared_len,
95         };
96
97         return true;
98 }
99
100 static void g_lock_get_shared(const struct g_lock *lck,
101                               size_t i,
102                               struct server_id *shared)
103 {
104         if (i >= lck->num_shared) {
105                 abort();
106         }
107         server_id_get(shared, lck->shared + i*SERVER_ID_BUF_LENGTH);
108 }
109
110 static void g_lock_del_shared(struct g_lock *lck, size_t i)
111 {
112         if (i >= lck->num_shared) {
113                 abort();
114         }
115         lck->num_shared -= 1;
116         if (i < lck->num_shared) {
117                 memcpy(lck->shared + i*SERVER_ID_BUF_LENGTH,
118                        lck->shared + lck->num_shared*SERVER_ID_BUF_LENGTH,
119                        SERVER_ID_BUF_LENGTH);
120         }
121 }
122
123 static NTSTATUS g_lock_store(
124         struct db_record *rec,
125         struct g_lock *lck,
126         struct server_id *new_shared)
127 {
128         uint8_t exclusive[SERVER_ID_BUF_LENGTH];
129         uint8_t seqnum_buf[sizeof(uint64_t)];
130         uint8_t sizebuf[sizeof(uint32_t)];
131         uint8_t new_shared_buf[SERVER_ID_BUF_LENGTH];
132
133         struct TDB_DATA dbufs[] = {
134                 { .dptr = exclusive, .dsize = sizeof(exclusive) },
135                 { .dptr = seqnum_buf, .dsize = sizeof(seqnum_buf) },
136                 { .dptr = sizebuf, .dsize = sizeof(sizebuf) },
137                 { .dptr = lck->shared,
138                   .dsize = lck->num_shared * SERVER_ID_BUF_LENGTH },
139                 { 0 },
140                 { .dptr = lck->data, .dsize = lck->datalen }
141         };
142
143         server_id_put(exclusive, lck->exclusive);
144         SBVAL(seqnum_buf, 0, lck->data_seqnum);
145
146         if (new_shared != NULL) {
147                 if (lck->num_shared >= UINT32_MAX) {
148                         return NT_STATUS_BUFFER_OVERFLOW;
149                 }
150
151                 server_id_put(new_shared_buf, *new_shared);
152
153                 dbufs[4] = (TDB_DATA) {
154                         .dptr = new_shared_buf,
155                         .dsize = sizeof(new_shared_buf),
156                 };
157
158                 lck->num_shared += 1;
159         }
160
161         SIVAL(sizebuf, 0, lck->num_shared);
162
163         return dbwrap_record_storev(rec, dbufs, ARRAY_SIZE(dbufs), 0);
164 }
165
166 struct g_lock_ctx *g_lock_ctx_init_backend(
167         TALLOC_CTX *mem_ctx,
168         struct messaging_context *msg,
169         struct db_context **backend)
170 {
171         struct g_lock_ctx *result;
172
173         result = talloc(mem_ctx, struct g_lock_ctx);
174         if (result == NULL) {
175                 return NULL;
176         }
177         result->msg = msg;
178
179         result->db = db_open_watched(result, backend, msg);
180         if (result->db == NULL) {
181                 DBG_WARNING("db_open_watched failed\n");
182                 TALLOC_FREE(result);
183                 return NULL;
184         }
185         return result;
186 }
187
188 struct g_lock_ctx *g_lock_ctx_init(TALLOC_CTX *mem_ctx,
189                                    struct messaging_context *msg)
190 {
191         char *db_path = NULL;
192         struct db_context *backend = NULL;
193         struct g_lock_ctx *ctx = NULL;
194
195         db_path = lock_path(mem_ctx, "g_lock.tdb");
196         if (db_path == NULL) {
197                 return NULL;
198         }
199
200         backend = db_open(
201                 mem_ctx,
202                 db_path,
203                 0,
204                 TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
205                 O_RDWR|O_CREAT,
206                 0600,
207                 DBWRAP_LOCK_ORDER_3,
208                 DBWRAP_FLAG_NONE);
209         TALLOC_FREE(db_path);
210         if (backend == NULL) {
211                 DBG_WARNING("Could not open g_lock.tdb\n");
212                 return NULL;
213         }
214
215         ctx = g_lock_ctx_init_backend(mem_ctx, msg, &backend);
216         return ctx;
217 }
218
219 static NTSTATUS g_lock_cleanup_dead(
220         struct db_record *rec,
221         struct g_lock *lck,
222         struct server_id *dead_blocker)
223 {
224         bool modified = false;
225         bool exclusive_died;
226         NTSTATUS status = NT_STATUS_OK;
227         struct server_id_buf tmp;
228
229         if (dead_blocker == NULL) {
230                 return NT_STATUS_OK;
231         }
232
233         exclusive_died = server_id_equal(dead_blocker, &lck->exclusive);
234
235         if (exclusive_died) {
236                 DBG_DEBUG("Exclusive holder %s died\n",
237                           server_id_str_buf(lck->exclusive, &tmp));
238                 lck->exclusive.pid = 0;
239                 modified = true;
240         }
241
242         if (lck->num_shared != 0) {
243                 bool shared_died;
244                 struct server_id shared;
245
246                 g_lock_get_shared(lck, 0, &shared);
247                 shared_died = server_id_equal(dead_blocker, &shared);
248
249                 if (shared_died) {
250                         DBG_DEBUG("Shared holder %s died\n",
251                                   server_id_str_buf(shared, &tmp));
252                         g_lock_del_shared(lck, 0);
253                         modified = true;
254                 }
255         }
256
257         if (modified) {
258                 status = g_lock_store(rec, lck, NULL);
259                 if (!NT_STATUS_IS_OK(status)) {
260                         DBG_DEBUG("g_lock_store() failed: %s\n",
261                                   nt_errstr(status));
262                 }
263         }
264
265         return status;
266 }
267
268 static ssize_t g_lock_find_shared(
269         struct g_lock *lck,
270         const struct server_id *self)
271 {
272         size_t i;
273
274         for (i=0; i<lck->num_shared; i++) {
275                 struct server_id shared;
276                 bool same;
277
278                 g_lock_get_shared(lck, i, &shared);
279
280                 same = server_id_equal(self, &shared);
281                 if (same) {
282                         return i;
283                 }
284         }
285
286         return -1;
287 }
288
289 static void g_lock_cleanup_shared(struct g_lock *lck)
290 {
291         size_t i;
292         struct server_id check;
293         bool exists;
294
295         if (lck->num_shared == 0) {
296                 return;
297         }
298
299         /*
300          * Read locks can stay around forever if the process dies. Do
301          * a heuristic check for process existence: Check one random
302          * process for existence. Hopefully this will keep runaway
303          * read locks under control.
304          */
305         i = generate_random() % lck->num_shared;
306         g_lock_get_shared(lck, i, &check);
307
308         exists = serverid_exists(&check);
309         if (!exists) {
310                 struct server_id_buf tmp;
311                 DBG_DEBUG("Shared locker %s died -- removing\n",
312                           server_id_str_buf(check, &tmp));
313                 g_lock_del_shared(lck, i);
314         }
315 }
316
317 struct g_lock_lock_state {
318         struct tevent_context *ev;
319         struct g_lock_ctx *ctx;
320         TDB_DATA key;
321         enum g_lock_type type;
322         bool retry;
323 };
324
325 struct g_lock_lock_fn_state {
326         struct g_lock_lock_state *req_state;
327         struct server_id *dead_blocker;
328
329         struct tevent_req *watch_req;
330         NTSTATUS status;
331 };
332
333 static int g_lock_lock_state_destructor(struct g_lock_lock_state *s);
334
335 static NTSTATUS g_lock_trylock(
336         struct db_record *rec,
337         struct g_lock_lock_fn_state *state,
338         TDB_DATA data,
339         struct server_id *blocker)
340 {
341         struct g_lock_lock_state *req_state = state->req_state;
342         struct server_id self = messaging_server_id(req_state->ctx->msg);
343         enum g_lock_type type = req_state->type;
344         bool retry = req_state->retry;
345         struct g_lock lck = { .exclusive.pid = 0 };
346         struct server_id_buf tmp;
347         NTSTATUS status;
348         bool ok;
349
350         ok = g_lock_parse(data.dptr, data.dsize, &lck);
351         if (!ok) {
352                 DBG_DEBUG("g_lock_parse failed\n");
353                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
354         }
355
356         status = g_lock_cleanup_dead(rec, &lck, state->dead_blocker);
357         if (!NT_STATUS_IS_OK(status)) {
358                 DBG_DEBUG("g_lock_cleanup_dead() failed: %s\n",
359                           nt_errstr(status));
360                 return status;
361         }
362
363         if (lck.exclusive.pid != 0) {
364                 bool self_exclusive = server_id_equal(&self, &lck.exclusive);
365
366                 if (!self_exclusive) {
367                         bool exists = serverid_exists(&lck.exclusive);
368                         if (!exists) {
369                                 lck.exclusive = (struct server_id) { .pid=0 };
370                                 goto noexclusive;
371                         }
372
373                         DBG_DEBUG("%s has an exclusive lock\n",
374                                   server_id_str_buf(lck.exclusive, &tmp));
375
376                         if (type == G_LOCK_DOWNGRADE) {
377                                 struct server_id_buf tmp2;
378                                 DBG_DEBUG("%s: Trying to downgrade %s\n",
379                                           server_id_str_buf(self, &tmp),
380                                           server_id_str_buf(
381                                                   lck.exclusive, &tmp2));
382                                 return NT_STATUS_NOT_LOCKED;
383                         }
384
385                         if (type == G_LOCK_UPGRADE) {
386                                 ssize_t shared_idx;
387                                 shared_idx = g_lock_find_shared(&lck, &self);
388
389                                 if (shared_idx == -1) {
390                                         DBG_DEBUG("Trying to upgrade %s "
391                                                   "without "
392                                                   "existing shared lock\n",
393                                                   server_id_str_buf(
394                                                           self, &tmp));
395                                         return NT_STATUS_NOT_LOCKED;
396                                 }
397
398                                 /*
399                                  * We're trying to upgrade, and the
400                                  * exlusive lock is taken by someone
401                                  * else. This means that someone else
402                                  * is waiting for us to give up our
403                                  * shared lock. If we now also wait
404                                  * for someone to give their shared
405                                  * lock, we will deadlock.
406                                  */
407
408                                 DBG_DEBUG("Trying to upgrade %s while "
409                                           "someone else is also "
410                                           "trying to upgrade\n",
411                                           server_id_str_buf(self, &tmp));
412                                 return NT_STATUS_POSSIBLE_DEADLOCK;
413                         }
414
415                         *blocker = lck.exclusive;
416                         return NT_STATUS_LOCK_NOT_GRANTED;
417                 }
418
419                 if (type == G_LOCK_DOWNGRADE) {
420                         DBG_DEBUG("Downgrading %s from WRITE to READ\n",
421                                   server_id_str_buf(self, &tmp));
422
423                         lck.exclusive = (struct server_id) { .pid = 0 };
424                         goto do_shared;
425                 }
426
427                 if (!retry) {
428                         DBG_DEBUG("%s already locked by self\n",
429                                   server_id_str_buf(self, &tmp));
430                         return NT_STATUS_WAS_LOCKED;
431                 }
432
433                 if (lck.num_shared != 0) {
434                         g_lock_get_shared(&lck, 0, blocker);
435
436                         DBG_DEBUG("Continue waiting for shared lock %s\n",
437                                   server_id_str_buf(*blocker, &tmp));
438
439                         return NT_STATUS_LOCK_NOT_GRANTED;
440                 }
441
442                 talloc_set_destructor(req_state, NULL);
443
444                 /*
445                  * Retry after a conflicting lock was released
446                  */
447                 return NT_STATUS_OK;
448         }
449
450 noexclusive:
451
452         if (type == G_LOCK_UPGRADE) {
453                 ssize_t shared_idx = g_lock_find_shared(&lck, &self);
454
455                 if (shared_idx == -1) {
456                         DBG_DEBUG("Trying to upgrade %s without "
457                                   "existing shared lock\n",
458                                   server_id_str_buf(self, &tmp));
459                         return NT_STATUS_NOT_LOCKED;
460                 }
461
462                 g_lock_del_shared(&lck, shared_idx);
463                 type = G_LOCK_WRITE;
464         }
465
466         if (type == G_LOCK_WRITE) {
467                 ssize_t shared_idx = g_lock_find_shared(&lck, &self);
468
469                 if (shared_idx != -1) {
470                         DBG_DEBUG("Trying to writelock existing shared %s\n",
471                                   server_id_str_buf(self, &tmp));
472                         return NT_STATUS_WAS_LOCKED;
473                 }
474
475                 lck.exclusive = self;
476
477                 status = g_lock_store(rec, &lck, NULL);
478                 if (!NT_STATUS_IS_OK(status)) {
479                         DBG_DEBUG("g_lock_store() failed: %s\n",
480                                   nt_errstr(status));
481                         return status;
482                 }
483
484                 if (lck.num_shared != 0) {
485                         talloc_set_destructor(
486                                 req_state, g_lock_lock_state_destructor);
487
488                         g_lock_get_shared(&lck, 0, blocker);
489
490                         DBG_DEBUG("Waiting for %zu shared locks, "
491                                   "picking blocker %s\n",
492                                   lck.num_shared,
493                                   server_id_str_buf(*blocker, &tmp));
494
495                         return NT_STATUS_LOCK_NOT_GRANTED;
496                 }
497
498                 talloc_set_destructor(req_state, NULL);
499
500                 return NT_STATUS_OK;
501         }
502
503 do_shared:
504
505         if (lck.num_shared == 0) {
506                 status = g_lock_store(rec, &lck, &self);
507                 if (!NT_STATUS_IS_OK(status)) {
508                         DBG_DEBUG("g_lock_store() failed: %s\n",
509                                   nt_errstr(status));
510                 }
511
512                 return status;
513         }
514
515         g_lock_cleanup_shared(&lck);
516
517         status = g_lock_store(rec, &lck, &self);
518         if (!NT_STATUS_IS_OK(status)) {
519                 DBG_DEBUG("g_lock_store() failed: %s\n",
520                           nt_errstr(status));
521                 return status;
522         }
523
524         return NT_STATUS_OK;
525 }
526
527 static void g_lock_lock_fn(
528         struct db_record *rec,
529         TDB_DATA value,
530         void *private_data)
531 {
532         struct g_lock_lock_fn_state *state = private_data;
533         struct server_id blocker = {0};
534
535         state->status = g_lock_trylock(rec, state, value, &blocker);
536         if (!NT_STATUS_EQUAL(state->status, NT_STATUS_LOCK_NOT_GRANTED)) {
537                 return;
538         }
539
540         state->watch_req = dbwrap_watched_watch_send(
541                 state->req_state, state->req_state->ev, rec, blocker);
542         if (state->watch_req == NULL) {
543                 state->status = NT_STATUS_NO_MEMORY;
544         }
545 }
546
547 static int g_lock_lock_state_destructor(struct g_lock_lock_state *s)
548 {
549         NTSTATUS status = g_lock_unlock(s->ctx, s->key);
550         if (!NT_STATUS_IS_OK(status)) {
551                 DBG_DEBUG("g_lock_unlock failed: %s\n", nt_errstr(status));
552         }
553         return 0;
554 }
555
556 static void g_lock_lock_retry(struct tevent_req *subreq);
557
558 struct tevent_req *g_lock_lock_send(TALLOC_CTX *mem_ctx,
559                                     struct tevent_context *ev,
560                                     struct g_lock_ctx *ctx,
561                                     TDB_DATA key,
562                                     enum g_lock_type type)
563 {
564         struct tevent_req *req;
565         struct g_lock_lock_state *state;
566         struct g_lock_lock_fn_state fn_state;
567         NTSTATUS status;
568         bool ok;
569
570         req = tevent_req_create(mem_ctx, &state, struct g_lock_lock_state);
571         if (req == NULL) {
572                 return NULL;
573         }
574         state->ev = ev;
575         state->ctx = ctx;
576         state->key = key;
577         state->type = type;
578
579         fn_state = (struct g_lock_lock_fn_state) {
580                 .req_state = state,
581         };
582
583         status = dbwrap_do_locked(ctx->db, key, g_lock_lock_fn, &fn_state);
584         if (tevent_req_nterror(req, status)) {
585                 DBG_DEBUG("dbwrap_do_locked failed: %s\n",
586                           nt_errstr(status));
587                 return tevent_req_post(req, ev);
588         }
589
590         if (NT_STATUS_IS_OK(fn_state.status)) {
591                 tevent_req_done(req);
592                 return tevent_req_post(req, ev);
593         }
594         if (!NT_STATUS_EQUAL(fn_state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
595                 tevent_req_nterror(req, fn_state.status);
596                 return tevent_req_post(req, ev);
597         }
598
599         if (tevent_req_nomem(fn_state.watch_req, req)) {
600                 return tevent_req_post(req, ev);
601         }
602
603         ok = tevent_req_set_endtime(
604                 fn_state.watch_req,
605                 state->ev,
606                 timeval_current_ofs(5 + generate_random() % 5, 0));
607         if (!ok) {
608                 tevent_req_oom(req);
609                 return tevent_req_post(req, ev);
610         }
611         tevent_req_set_callback(fn_state.watch_req, g_lock_lock_retry, req);
612
613         return req;
614 }
615
616 static void g_lock_lock_retry(struct tevent_req *subreq)
617 {
618         struct tevent_req *req = tevent_req_callback_data(
619                 subreq, struct tevent_req);
620         struct g_lock_lock_state *state = tevent_req_data(
621                 req, struct g_lock_lock_state);
622         struct g_lock_lock_fn_state fn_state;
623         struct server_id blocker;
624         bool blockerdead;
625         NTSTATUS status;
626
627         status = dbwrap_watched_watch_recv(subreq, &blockerdead, &blocker);
628         DBG_DEBUG("watch_recv returned %s\n", nt_errstr(status));
629         TALLOC_FREE(subreq);
630
631         if (!NT_STATUS_IS_OK(status) &&
632             !NT_STATUS_EQUAL(status, NT_STATUS_IO_TIMEOUT)) {
633                 tevent_req_nterror(req, status);
634                 return;
635         }
636
637         state->retry = true;
638
639         fn_state = (struct g_lock_lock_fn_state) {
640                 .req_state = state,
641                 .dead_blocker = blockerdead ? &blocker : NULL,
642         };
643
644         status = dbwrap_do_locked(state->ctx->db, state->key,
645                                   g_lock_lock_fn, &fn_state);
646         if (tevent_req_nterror(req, status)) {
647                 DBG_DEBUG("dbwrap_do_locked failed: %s\n",
648                           nt_errstr(status));
649                 return;
650         }
651
652         if (NT_STATUS_IS_OK(fn_state.status)) {
653                 tevent_req_done(req);
654                 return;
655         }
656         if (!NT_STATUS_EQUAL(fn_state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
657                 tevent_req_nterror(req, fn_state.status);
658                 return;
659         }
660
661         if (tevent_req_nomem(fn_state.watch_req, req)) {
662                 return;
663         }
664
665         if (!tevent_req_set_endtime(
666                     fn_state.watch_req, state->ev,
667                     timeval_current_ofs(5 + generate_random() % 5, 0))) {
668                 return;
669         }
670         tevent_req_set_callback(fn_state.watch_req, g_lock_lock_retry, req);
671 }
672
673 NTSTATUS g_lock_lock_recv(struct tevent_req *req)
674 {
675         return tevent_req_simple_recv_ntstatus(req);
676 }
677
678 struct g_lock_lock_simple_state {
679         struct server_id me;
680         enum g_lock_type type;
681         NTSTATUS status;
682 };
683
684 static void g_lock_lock_simple_fn(
685         struct db_record *rec,
686         TDB_DATA value,
687         void *private_data)
688 {
689         struct g_lock_lock_simple_state *state = private_data;
690         struct g_lock lck = { .exclusive.pid = 0 };
691         bool ok;
692
693         ok = g_lock_parse(value.dptr, value.dsize, &lck);
694         if (!ok) {
695                 DBG_DEBUG("g_lock_parse failed\n");
696                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
697                 return;
698         }
699
700         if (lck.exclusive.pid != 0) {
701                 goto not_granted;
702         }
703
704         if (state->type == G_LOCK_WRITE) {
705                 if (lck.num_shared != 0) {
706                         goto not_granted;
707                 }
708                 lck.exclusive = state->me;
709                 state->status = g_lock_store(rec, &lck, NULL);
710                 return;
711         }
712
713         if (state->type == G_LOCK_READ) {
714                 g_lock_cleanup_shared(&lck);
715                 state->status = g_lock_store(rec, &lck, &state->me);
716                 return;
717         }
718
719 not_granted:
720         state->status = NT_STATUS_LOCK_NOT_GRANTED;
721 }
722
723 NTSTATUS g_lock_lock(struct g_lock_ctx *ctx, TDB_DATA key,
724                      enum g_lock_type type, struct timeval timeout)
725 {
726         TALLOC_CTX *frame;
727         struct tevent_context *ev;
728         struct tevent_req *req;
729         struct timeval end;
730         NTSTATUS status;
731
732         if ((type == G_LOCK_READ) || (type == G_LOCK_WRITE)) {
733                 /*
734                  * This is an abstraction violation: Normally we do
735                  * the sync wrappers around async functions with full
736                  * nested event contexts. However, this is used in
737                  * very hot code paths, so avoid the event context
738                  * creation for the good path where there's no lock
739                  * contention. My benchmark gave a factor of 2
740                  * improvement for lock/unlock.
741                  */
742                 struct g_lock_lock_simple_state state = {
743                         .me = messaging_server_id(ctx->msg),
744                         .type = type,
745                 };
746                 status = dbwrap_do_locked(
747                         ctx->db, key, g_lock_lock_simple_fn, &state);
748                 if (!NT_STATUS_IS_OK(status)) {
749                         DBG_DEBUG("dbwrap_do_locked() failed: %s\n",
750                                   nt_errstr(status));
751                         return status;
752                 }
753                 if (NT_STATUS_IS_OK(state.status)) {
754                         return NT_STATUS_OK;
755                 }
756                 if (!NT_STATUS_EQUAL(
757                             state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
758                         return state.status;
759                 }
760
761                 /*
762                  * Fall back to the full g_lock_trylock logic,
763                  * g_lock_lock_simple_fn() called above only covers
764                  * the uncontended path.
765                  */
766         }
767
768         frame = talloc_stackframe();
769         status = NT_STATUS_NO_MEMORY;
770
771         ev = samba_tevent_context_init(frame);
772         if (ev == NULL) {
773                 goto fail;
774         }
775         req = g_lock_lock_send(frame, ev, ctx, key, type);
776         if (req == NULL) {
777                 goto fail;
778         }
779         end = timeval_current_ofs(timeout.tv_sec, timeout.tv_usec);
780         if (!tevent_req_set_endtime(req, ev, end)) {
781                 goto fail;
782         }
783         if (!tevent_req_poll_ntstatus(req, ev, &status)) {
784                 goto fail;
785         }
786         status = g_lock_lock_recv(req);
787  fail:
788         TALLOC_FREE(frame);
789         return status;
790 }
791
792 struct g_lock_unlock_state {
793         struct server_id self;
794         NTSTATUS status;
795 };
796
797 static void g_lock_unlock_fn(
798         struct db_record *rec,
799         TDB_DATA value,
800         void *private_data)
801 {
802         struct g_lock_unlock_state *state = private_data;
803         struct server_id_buf tmp;
804         struct g_lock lck;
805         size_t i;
806         bool ok, exclusive;
807
808         ok = g_lock_parse(value.dptr, value.dsize, &lck);
809         if (!ok) {
810                 DBG_DEBUG("g_lock_parse() failed\n");
811                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
812                 return;
813         }
814
815         exclusive = server_id_equal(&state->self, &lck.exclusive);
816
817         for (i=0; i<lck.num_shared; i++) {
818                 struct server_id shared;
819                 g_lock_get_shared(&lck, i, &shared);
820                 if (server_id_equal(&state->self, &shared)) {
821                         break;
822                 }
823         }
824
825         if (i < lck.num_shared) {
826                 if (exclusive) {
827                         DBG_DEBUG("%s both exclusive and shared (%zu)\n",
828                                   server_id_str_buf(state->self, &tmp),
829                                   i);
830                         state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
831                         return;
832                 }
833                 g_lock_del_shared(&lck, i);
834         } else {
835                 if (!exclusive) {
836                         DBG_DEBUG("Lock %s not found, num_rec=%zu\n",
837                                   server_id_str_buf(state->self, &tmp),
838                                   lck.num_shared);
839                         state->status = NT_STATUS_NOT_FOUND;
840                         return;
841                 }
842                 lck.exclusive = (struct server_id) { .pid = 0 };
843         }
844
845         if ((lck.exclusive.pid == 0) &&
846             (lck.num_shared == 0) &&
847             (lck.datalen == 0)) {
848                 state->status = dbwrap_record_delete(rec);
849                 return;
850         }
851
852         state->status = g_lock_store(rec, &lck, NULL);
853 }
854
855 NTSTATUS g_lock_unlock(struct g_lock_ctx *ctx, TDB_DATA key)
856 {
857         struct g_lock_unlock_state state = {
858                 .self = messaging_server_id(ctx->msg),
859         };
860         NTSTATUS status;
861
862         status = dbwrap_do_locked(ctx->db, key, g_lock_unlock_fn, &state);
863         if (!NT_STATUS_IS_OK(status)) {
864                 DBG_WARNING("dbwrap_do_locked failed: %s\n",
865                             nt_errstr(status));
866                 return status;
867         }
868         if (!NT_STATUS_IS_OK(state.status)) {
869                 DBG_WARNING("g_lock_unlock_fn failed: %s\n",
870                             nt_errstr(state.status));
871                 return state.status;
872         }
873
874         return NT_STATUS_OK;
875 }
876
877 struct g_lock_write_data_state {
878         TDB_DATA key;
879         struct server_id self;
880         const uint8_t *data;
881         size_t datalen;
882         NTSTATUS status;
883 };
884
885 static void g_lock_write_data_fn(
886         struct db_record *rec,
887         TDB_DATA value,
888         void *private_data)
889 {
890         struct g_lock_write_data_state *state = private_data;
891         struct g_lock lck;
892         bool exclusive;
893         bool ok;
894
895         ok = g_lock_parse(value.dptr, value.dsize, &lck);
896         if (!ok) {
897                 DBG_DEBUG("g_lock_parse for %s failed\n",
898                           hex_encode_talloc(talloc_tos(),
899                                             state->key.dptr,
900                                             state->key.dsize));
901                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
902                 return;
903         }
904
905         exclusive = server_id_equal(&state->self, &lck.exclusive);
906
907         /*
908          * Make sure we're really exclusive. We are marked as
909          * exclusive when we are waiting for an exclusive lock
910          */
911         exclusive &= (lck.num_shared == 0);
912
913         if (!exclusive) {
914                 DBG_DEBUG("Not locked by us\n");
915                 state->status = NT_STATUS_NOT_LOCKED;
916                 return;
917         }
918
919         lck.data_seqnum += 1;
920         lck.data = discard_const_p(uint8_t, state->data);
921         lck.datalen = state->datalen;
922         state->status = g_lock_store(rec, &lck, NULL);
923 }
924
925 NTSTATUS g_lock_write_data(struct g_lock_ctx *ctx, TDB_DATA key,
926                            const uint8_t *buf, size_t buflen)
927 {
928         struct g_lock_write_data_state state = {
929                 .key = key, .self = messaging_server_id(ctx->msg),
930                 .data = buf, .datalen = buflen
931         };
932         NTSTATUS status;
933
934         status = dbwrap_do_locked(ctx->db, key,
935                                   g_lock_write_data_fn, &state);
936         if (!NT_STATUS_IS_OK(status)) {
937                 DBG_WARNING("dbwrap_do_locked failed: %s\n",
938                             nt_errstr(status));
939                 return status;
940         }
941         if (!NT_STATUS_IS_OK(state.status)) {
942                 DBG_WARNING("g_lock_write_data_fn failed: %s\n",
943                             nt_errstr(state.status));
944                 return state.status;
945         }
946
947         return NT_STATUS_OK;
948 }
949
950 struct g_lock_locks_state {
951         int (*fn)(TDB_DATA key, void *private_data);
952         void *private_data;
953 };
954
955 static int g_lock_locks_fn(struct db_record *rec, void *priv)
956 {
957         TDB_DATA key;
958         struct g_lock_locks_state *state = (struct g_lock_locks_state *)priv;
959
960         key = dbwrap_record_get_key(rec);
961         return state->fn(key, state->private_data);
962 }
963
964 int g_lock_locks(struct g_lock_ctx *ctx,
965                  int (*fn)(TDB_DATA key, void *private_data),
966                  void *private_data)
967 {
968         struct g_lock_locks_state state;
969         NTSTATUS status;
970         int count;
971
972         state.fn = fn;
973         state.private_data = private_data;
974
975         status = dbwrap_traverse_read(ctx->db, g_lock_locks_fn, &state, &count);
976         if (!NT_STATUS_IS_OK(status)) {
977                 return -1;
978         }
979         return count;
980 }
981
982 struct g_lock_dump_state {
983         TALLOC_CTX *mem_ctx;
984         TDB_DATA key;
985         void (*fn)(struct server_id exclusive,
986                    size_t num_shared,
987                    struct server_id *shared,
988                    const uint8_t *data,
989                    size_t datalen,
990                    void *private_data);
991         void *private_data;
992         NTSTATUS status;
993 };
994
995 static void g_lock_dump_fn(TDB_DATA key, TDB_DATA data,
996                            void *private_data)
997 {
998         struct g_lock_dump_state *state = private_data;
999         struct g_lock lck = (struct g_lock) { .exclusive.pid = 0 };
1000         struct server_id *shared = NULL;
1001         size_t i;
1002         bool ok;
1003
1004         ok = g_lock_parse(data.dptr, data.dsize, &lck);
1005         if (!ok) {
1006                 DBG_DEBUG("g_lock_parse failed for %s\n",
1007                           hex_encode_talloc(talloc_tos(),
1008                                             state->key.dptr,
1009                                             state->key.dsize));
1010                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1011                 return;
1012         }
1013
1014         shared = talloc_array(
1015                 state->mem_ctx, struct server_id, lck.num_shared);
1016         if (shared == NULL) {
1017                 DBG_DEBUG("talloc failed\n");
1018                 state->status = NT_STATUS_NO_MEMORY;
1019                 return;
1020         }
1021
1022         for (i=0; i<lck.num_shared; i++) {
1023                 g_lock_get_shared(&lck, i, &shared[i]);
1024         }
1025
1026         state->fn(lck.exclusive,
1027                   lck.num_shared,
1028                   shared,
1029                   lck.data,
1030                   lck.datalen,
1031                   state->private_data);
1032
1033         TALLOC_FREE(shared);
1034
1035         state->status = NT_STATUS_OK;
1036 }
1037
1038 NTSTATUS g_lock_dump(struct g_lock_ctx *ctx, TDB_DATA key,
1039                      void (*fn)(struct server_id exclusive,
1040                                 size_t num_shared,
1041                                 struct server_id *shared,
1042                                 const uint8_t *data,
1043                                 size_t datalen,
1044                                 void *private_data),
1045                      void *private_data)
1046 {
1047         struct g_lock_dump_state state = {
1048                 .mem_ctx = ctx, .key = key,
1049                 .fn = fn, .private_data = private_data
1050         };
1051         NTSTATUS status;
1052
1053         status = dbwrap_parse_record(ctx->db, key, g_lock_dump_fn, &state);
1054         if (!NT_STATUS_IS_OK(status)) {
1055                 DBG_DEBUG("dbwrap_parse_record returned %s\n",
1056                           nt_errstr(status));
1057                 return status;
1058         }
1059         if (!NT_STATUS_IS_OK(state.status)) {
1060                 DBG_DEBUG("g_lock_dump_fn returned %s\n",
1061                           nt_errstr(state.status));
1062                 return state.status;
1063         }
1064         return NT_STATUS_OK;
1065 }
1066
1067 struct g_lock_watch_data_state {
1068         struct tevent_context *ev;
1069         struct g_lock_ctx *ctx;
1070         TDB_DATA key;
1071         struct server_id blocker;
1072         bool blockerdead;
1073         uint64_t data_seqnum;
1074         NTSTATUS status;
1075 };
1076
1077 static void g_lock_watch_data_done(struct tevent_req *subreq);
1078
1079 static void g_lock_watch_data_send_fn(
1080         struct db_record *rec,
1081         TDB_DATA value,
1082         void *private_data)
1083 {
1084         struct tevent_req *req = talloc_get_type_abort(
1085                 private_data, struct tevent_req);
1086         struct g_lock_watch_data_state *state = tevent_req_data(
1087                 req, struct g_lock_watch_data_state);
1088         struct tevent_req *subreq = NULL;
1089         struct g_lock lck;
1090         bool ok;
1091
1092         ok = g_lock_parse(value.dptr, value.dsize, &lck);
1093         if (!ok) {
1094                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1095                 return;
1096         }
1097         state->data_seqnum = lck.data_seqnum;
1098
1099         DBG_DEBUG("state->data_seqnum=%"PRIu64"\n", state->data_seqnum);
1100
1101         subreq = dbwrap_watched_watch_send(
1102                 state, state->ev, rec, state->blocker);
1103         if (subreq == NULL) {
1104                 state->status = NT_STATUS_NO_MEMORY;
1105                 return;
1106         }
1107         tevent_req_set_callback(subreq, g_lock_watch_data_done, req);
1108
1109         state->status = NT_STATUS_EVENT_PENDING;
1110 }
1111
1112 struct tevent_req *g_lock_watch_data_send(
1113         TALLOC_CTX *mem_ctx,
1114         struct tevent_context *ev,
1115         struct g_lock_ctx *ctx,
1116         TDB_DATA key,
1117         struct server_id blocker)
1118 {
1119         struct tevent_req *req = NULL;
1120         struct g_lock_watch_data_state *state = NULL;
1121         NTSTATUS status;
1122
1123         req = tevent_req_create(
1124                 mem_ctx, &state, struct g_lock_watch_data_state);
1125         if (req == NULL) {
1126                 return NULL;
1127         }
1128         state->ev = ev;
1129         state->ctx = ctx;
1130         state->blocker = blocker;
1131
1132         state->key = tdb_data_talloc_copy(state, key);
1133         if (tevent_req_nomem(state->key.dptr, req)) {
1134                 return tevent_req_post(req, ev);
1135         }
1136
1137         status = dbwrap_do_locked(
1138                 ctx->db, key, g_lock_watch_data_send_fn, req);
1139         if (tevent_req_nterror(req, status)) {
1140                 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
1141                 return tevent_req_post(req, ev);
1142         }
1143
1144         if (NT_STATUS_IS_OK(state->status)) {
1145                 tevent_req_done(req);
1146                 return tevent_req_post(req, ev);
1147         }
1148
1149         return req;
1150 }
1151
1152 static void g_lock_watch_data_done_fn(
1153         struct db_record *rec,
1154         TDB_DATA value,
1155         void *private_data)
1156 {
1157         struct tevent_req *req = talloc_get_type_abort(
1158                 private_data, struct tevent_req);
1159         struct g_lock_watch_data_state *state = tevent_req_data(
1160                 req, struct g_lock_watch_data_state);
1161         struct tevent_req *subreq = NULL;
1162         struct g_lock lck;
1163         bool ok;
1164
1165         ok = g_lock_parse(value.dptr, value.dsize, &lck);
1166         if (!ok) {
1167                 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1168                 return;
1169         }
1170
1171         if (lck.data_seqnum != state->data_seqnum) {
1172                 DBG_DEBUG("lck.data_seqnum=%"PRIu64", "
1173                           "state->data_seqnum=%"PRIu64"\n",
1174                           lck.data_seqnum,
1175                           state->data_seqnum);
1176                 state->status = NT_STATUS_OK;
1177                 return;
1178         }
1179
1180         subreq = dbwrap_watched_watch_send(
1181                 state, state->ev, rec, state->blocker);
1182         if (subreq == NULL) {
1183                 state->status = NT_STATUS_NO_MEMORY;
1184                 return;
1185         }
1186         tevent_req_set_callback(subreq, g_lock_watch_data_done, req);
1187
1188         state->status = NT_STATUS_EVENT_PENDING;
1189 }
1190
1191 static void g_lock_watch_data_done(struct tevent_req *subreq)
1192 {
1193         struct tevent_req *req = tevent_req_callback_data(
1194                 subreq, struct tevent_req);
1195         struct g_lock_watch_data_state *state = tevent_req_data(
1196                 req, struct g_lock_watch_data_state);
1197         NTSTATUS status;
1198
1199         status = dbwrap_watched_watch_recv(
1200                 subreq, &state->blockerdead, &state->blocker);
1201         TALLOC_FREE(subreq);
1202         if (tevent_req_nterror(req, status)) {
1203                 DBG_DEBUG("dbwrap_watched_watch_recv returned %s\n",
1204                           nt_errstr(status));
1205                 return;
1206         }
1207
1208         status = dbwrap_do_locked(
1209                 state->ctx->db, state->key, g_lock_watch_data_done_fn, req);
1210         if (tevent_req_nterror(req, status)) {
1211                 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
1212                 return;
1213         }
1214         if (NT_STATUS_EQUAL(state->status, NT_STATUS_EVENT_PENDING)) {
1215                 return;
1216         }
1217         if (tevent_req_nterror(req, state->status)) {
1218                 return;
1219         }
1220         tevent_req_done(req);
1221 }
1222
1223 NTSTATUS g_lock_watch_data_recv(
1224         struct tevent_req *req,
1225         bool *blockerdead,
1226         struct server_id *blocker)
1227 {
1228         struct g_lock_watch_data_state *state = tevent_req_data(
1229                 req, struct g_lock_watch_data_state);
1230         NTSTATUS status;
1231
1232         if (tevent_req_is_nterror(req, &status)) {
1233                 return status;
1234         }
1235         if (blockerdead != NULL) {
1236                 *blockerdead = state->blockerdead;
1237         }
1238         if (blocker != NULL) {
1239                 *blocker = state->blocker;
1240         }
1241
1242         return NT_STATUS_OK;
1243 }
1244
1245 static void g_lock_wake_watchers_fn(
1246         struct db_record *rec,
1247         TDB_DATA value,
1248         void *private_data)
1249 {
1250         struct g_lock lck = { .exclusive.pid = 0 };
1251         NTSTATUS status;
1252         bool ok;
1253
1254         ok = g_lock_parse(value.dptr, value.dsize, &lck);
1255         if (!ok) {
1256                 DBG_WARNING("g_lock_parse failed\n");
1257                 return;
1258         }
1259
1260         lck.data_seqnum += 1;
1261
1262         status = g_lock_store(rec, &lck, NULL);
1263         if (!NT_STATUS_IS_OK(status)) {
1264                 DBG_WARNING("g_lock_store failed: %s\n", nt_errstr(status));
1265                 return;
1266         }
1267 }
1268
1269 void g_lock_wake_watchers(struct g_lock_ctx *ctx, TDB_DATA key)
1270 {
1271         NTSTATUS status;
1272
1273         status = dbwrap_do_locked(ctx->db, key, g_lock_wake_watchers_fn, NULL);
1274         if (!NT_STATUS_IS_OK(status)) {
1275                 DBG_DEBUG("dbwrap_do_locked returned %s\n",
1276                           nt_errstr(status));
1277         }
1278 }