2 Unix SMB/CIFS implementation.
3 global locks based on dbwrap and messaging
4 Copyright (C) 2009 by Volker Lendecke
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "lib/util/server_id.h"
23 #include "lib/util/debug.h"
24 #include "lib/util/talloc_stack.h"
25 #include "lib/util/samba_util.h"
26 #include "lib/util_path.h"
27 #include "dbwrap/dbwrap.h"
28 #include "dbwrap/dbwrap_open.h"
29 #include "dbwrap/dbwrap_watch.h"
32 #include "../lib/util/tevent_ntstatus.h"
37 struct db_context *db;
38 struct messaging_context *msg;
42 struct server_id exclusive;
50 static bool g_lock_parse(uint8_t *buf, size_t buflen, struct g_lock *lck)
52 struct server_id exclusive;
53 size_t num_shared, shared_len;
56 if (buflen < (SERVER_ID_BUF_LENGTH + /* exclusive */
57 sizeof(uint64_t) + /* seqnum */
58 sizeof(uint32_t))) { /* num_shared */
59 struct g_lock ret = { .exclusive.pid = 0 };
60 generate_random_buffer(
61 (uint8_t *)&ret.data_seqnum,
62 sizeof(ret.data_seqnum));
67 server_id_get(&exclusive, buf);
68 buf += SERVER_ID_BUF_LENGTH;
69 buflen -= SERVER_ID_BUF_LENGTH;
71 data_seqnum = BVAL(buf, 0);
72 buf += sizeof(uint64_t);
73 buflen -= sizeof(uint64_t);
75 num_shared = IVAL(buf, 0);
76 buf += sizeof(uint32_t);
77 buflen -= sizeof(uint32_t);
79 if (num_shared > buflen/SERVER_ID_BUF_LENGTH) {
80 DBG_DEBUG("num_shared=%zu, buflen=%zu\n",
86 shared_len = num_shared * SERVER_ID_BUF_LENGTH;
88 *lck = (struct g_lock) {
89 .exclusive = exclusive,
90 .num_shared = num_shared,
92 .data_seqnum = data_seqnum,
93 .datalen = buflen-shared_len,
94 .data = buf+shared_len,
100 static void g_lock_get_shared(const struct g_lock *lck,
102 struct server_id *shared)
104 if (i >= lck->num_shared) {
107 server_id_get(shared, lck->shared + i*SERVER_ID_BUF_LENGTH);
110 static void g_lock_del_shared(struct g_lock *lck, size_t i)
112 if (i >= lck->num_shared) {
115 lck->num_shared -= 1;
116 if (i < lck->num_shared) {
117 memcpy(lck->shared + i*SERVER_ID_BUF_LENGTH,
118 lck->shared + lck->num_shared*SERVER_ID_BUF_LENGTH,
119 SERVER_ID_BUF_LENGTH);
123 static NTSTATUS g_lock_store(
124 struct db_record *rec,
126 struct server_id *new_shared)
128 uint8_t exclusive[SERVER_ID_BUF_LENGTH];
129 uint8_t seqnum_buf[sizeof(uint64_t)];
130 uint8_t sizebuf[sizeof(uint32_t)];
131 uint8_t new_shared_buf[SERVER_ID_BUF_LENGTH];
133 struct TDB_DATA dbufs[] = {
134 { .dptr = exclusive, .dsize = sizeof(exclusive) },
135 { .dptr = seqnum_buf, .dsize = sizeof(seqnum_buf) },
136 { .dptr = sizebuf, .dsize = sizeof(sizebuf) },
137 { .dptr = lck->shared,
138 .dsize = lck->num_shared * SERVER_ID_BUF_LENGTH },
140 { .dptr = lck->data, .dsize = lck->datalen }
143 server_id_put(exclusive, lck->exclusive);
144 SBVAL(seqnum_buf, 0, lck->data_seqnum);
146 if (new_shared != NULL) {
147 if (lck->num_shared >= UINT32_MAX) {
148 return NT_STATUS_BUFFER_OVERFLOW;
151 server_id_put(new_shared_buf, *new_shared);
153 dbufs[4] = (TDB_DATA) {
154 .dptr = new_shared_buf,
155 .dsize = sizeof(new_shared_buf),
158 lck->num_shared += 1;
161 SIVAL(sizebuf, 0, lck->num_shared);
163 return dbwrap_record_storev(rec, dbufs, ARRAY_SIZE(dbufs), 0);
166 struct g_lock_ctx *g_lock_ctx_init_backend(
168 struct messaging_context *msg,
169 struct db_context **backend)
171 struct g_lock_ctx *result;
173 result = talloc(mem_ctx, struct g_lock_ctx);
174 if (result == NULL) {
179 result->db = db_open_watched(result, backend, msg);
180 if (result->db == NULL) {
181 DBG_WARNING("db_open_watched failed\n");
188 struct g_lock_ctx *g_lock_ctx_init(TALLOC_CTX *mem_ctx,
189 struct messaging_context *msg)
191 char *db_path = NULL;
192 struct db_context *backend = NULL;
193 struct g_lock_ctx *ctx = NULL;
195 db_path = lock_path(mem_ctx, "g_lock.tdb");
196 if (db_path == NULL) {
204 TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
209 TALLOC_FREE(db_path);
210 if (backend == NULL) {
211 DBG_WARNING("Could not open g_lock.tdb\n");
215 ctx = g_lock_ctx_init_backend(mem_ctx, msg, &backend);
219 static NTSTATUS g_lock_cleanup_dead(
220 struct db_record *rec,
222 struct server_id *dead_blocker)
224 bool modified = false;
226 NTSTATUS status = NT_STATUS_OK;
227 struct server_id_buf tmp;
229 if (dead_blocker == NULL) {
233 exclusive_died = server_id_equal(dead_blocker, &lck->exclusive);
235 if (exclusive_died) {
236 DBG_DEBUG("Exclusive holder %s died\n",
237 server_id_str_buf(lck->exclusive, &tmp));
238 lck->exclusive.pid = 0;
242 if (lck->num_shared != 0) {
244 struct server_id shared;
246 g_lock_get_shared(lck, 0, &shared);
247 shared_died = server_id_equal(dead_blocker, &shared);
250 DBG_DEBUG("Shared holder %s died\n",
251 server_id_str_buf(shared, &tmp));
252 g_lock_del_shared(lck, 0);
258 status = g_lock_store(rec, lck, NULL);
259 if (!NT_STATUS_IS_OK(status)) {
260 DBG_DEBUG("g_lock_store() failed: %s\n",
268 static ssize_t g_lock_find_shared(
270 const struct server_id *self)
274 for (i=0; i<lck->num_shared; i++) {
275 struct server_id shared;
278 g_lock_get_shared(lck, i, &shared);
280 same = server_id_equal(self, &shared);
289 static void g_lock_cleanup_shared(struct g_lock *lck)
292 struct server_id check;
295 if (lck->num_shared == 0) {
300 * Read locks can stay around forever if the process dies. Do
301 * a heuristic check for process existence: Check one random
302 * process for existence. Hopefully this will keep runaway
303 * read locks under control.
305 i = generate_random() % lck->num_shared;
306 g_lock_get_shared(lck, i, &check);
308 exists = serverid_exists(&check);
310 struct server_id_buf tmp;
311 DBG_DEBUG("Shared locker %s died -- removing\n",
312 server_id_str_buf(check, &tmp));
313 g_lock_del_shared(lck, i);
317 struct g_lock_lock_state {
318 struct tevent_context *ev;
319 struct g_lock_ctx *ctx;
321 enum g_lock_type type;
325 struct g_lock_lock_fn_state {
326 struct g_lock_lock_state *req_state;
327 struct server_id *dead_blocker;
329 struct tevent_req *watch_req;
333 static int g_lock_lock_state_destructor(struct g_lock_lock_state *s);
335 static NTSTATUS g_lock_trylock(
336 struct db_record *rec,
337 struct g_lock_lock_fn_state *state,
339 struct server_id *blocker)
341 struct g_lock_lock_state *req_state = state->req_state;
342 struct server_id self = messaging_server_id(req_state->ctx->msg);
343 enum g_lock_type type = req_state->type;
344 bool retry = req_state->retry;
345 struct g_lock lck = { .exclusive.pid = 0 };
346 struct server_id_buf tmp;
350 ok = g_lock_parse(data.dptr, data.dsize, &lck);
352 DBG_DEBUG("g_lock_parse failed\n");
353 return NT_STATUS_INTERNAL_DB_CORRUPTION;
356 status = g_lock_cleanup_dead(rec, &lck, state->dead_blocker);
357 if (!NT_STATUS_IS_OK(status)) {
358 DBG_DEBUG("g_lock_cleanup_dead() failed: %s\n",
363 if (lck.exclusive.pid != 0) {
364 bool self_exclusive = server_id_equal(&self, &lck.exclusive);
366 if (!self_exclusive) {
367 bool exists = serverid_exists(&lck.exclusive);
369 lck.exclusive = (struct server_id) { .pid=0 };
373 DBG_DEBUG("%s has an exclusive lock\n",
374 server_id_str_buf(lck.exclusive, &tmp));
376 if (type == G_LOCK_DOWNGRADE) {
377 struct server_id_buf tmp2;
378 DBG_DEBUG("%s: Trying to downgrade %s\n",
379 server_id_str_buf(self, &tmp),
381 lck.exclusive, &tmp2));
382 return NT_STATUS_NOT_LOCKED;
385 if (type == G_LOCK_UPGRADE) {
387 shared_idx = g_lock_find_shared(&lck, &self);
389 if (shared_idx == -1) {
390 DBG_DEBUG("Trying to upgrade %s "
392 "existing shared lock\n",
395 return NT_STATUS_NOT_LOCKED;
399 * We're trying to upgrade, and the
400 * exlusive lock is taken by someone
401 * else. This means that someone else
402 * is waiting for us to give up our
403 * shared lock. If we now also wait
404 * for someone to give their shared
405 * lock, we will deadlock.
408 DBG_DEBUG("Trying to upgrade %s while "
409 "someone else is also "
410 "trying to upgrade\n",
411 server_id_str_buf(self, &tmp));
412 return NT_STATUS_POSSIBLE_DEADLOCK;
415 *blocker = lck.exclusive;
416 return NT_STATUS_LOCK_NOT_GRANTED;
419 if (type == G_LOCK_DOWNGRADE) {
420 DBG_DEBUG("Downgrading %s from WRITE to READ\n",
421 server_id_str_buf(self, &tmp));
423 lck.exclusive = (struct server_id) { .pid = 0 };
428 DBG_DEBUG("%s already locked by self\n",
429 server_id_str_buf(self, &tmp));
430 return NT_STATUS_WAS_LOCKED;
433 if (lck.num_shared != 0) {
434 g_lock_get_shared(&lck, 0, blocker);
436 DBG_DEBUG("Continue waiting for shared lock %s\n",
437 server_id_str_buf(*blocker, &tmp));
439 return NT_STATUS_LOCK_NOT_GRANTED;
442 talloc_set_destructor(req_state, NULL);
445 * Retry after a conflicting lock was released
452 if (type == G_LOCK_UPGRADE) {
453 ssize_t shared_idx = g_lock_find_shared(&lck, &self);
455 if (shared_idx == -1) {
456 DBG_DEBUG("Trying to upgrade %s without "
457 "existing shared lock\n",
458 server_id_str_buf(self, &tmp));
459 return NT_STATUS_NOT_LOCKED;
462 g_lock_del_shared(&lck, shared_idx);
466 if (type == G_LOCK_WRITE) {
467 ssize_t shared_idx = g_lock_find_shared(&lck, &self);
469 if (shared_idx != -1) {
470 DBG_DEBUG("Trying to writelock existing shared %s\n",
471 server_id_str_buf(self, &tmp));
472 return NT_STATUS_WAS_LOCKED;
475 lck.exclusive = self;
477 status = g_lock_store(rec, &lck, NULL);
478 if (!NT_STATUS_IS_OK(status)) {
479 DBG_DEBUG("g_lock_store() failed: %s\n",
484 if (lck.num_shared != 0) {
485 talloc_set_destructor(
486 req_state, g_lock_lock_state_destructor);
488 g_lock_get_shared(&lck, 0, blocker);
490 DBG_DEBUG("Waiting for %zu shared locks, "
491 "picking blocker %s\n",
493 server_id_str_buf(*blocker, &tmp));
495 return NT_STATUS_LOCK_NOT_GRANTED;
498 talloc_set_destructor(req_state, NULL);
505 if (lck.num_shared == 0) {
506 status = g_lock_store(rec, &lck, &self);
507 if (!NT_STATUS_IS_OK(status)) {
508 DBG_DEBUG("g_lock_store() failed: %s\n",
515 g_lock_cleanup_shared(&lck);
517 status = g_lock_store(rec, &lck, &self);
518 if (!NT_STATUS_IS_OK(status)) {
519 DBG_DEBUG("g_lock_store() failed: %s\n",
527 static void g_lock_lock_fn(
528 struct db_record *rec,
532 struct g_lock_lock_fn_state *state = private_data;
533 struct server_id blocker = {0};
535 state->status = g_lock_trylock(rec, state, value, &blocker);
536 if (!NT_STATUS_EQUAL(state->status, NT_STATUS_LOCK_NOT_GRANTED)) {
540 state->watch_req = dbwrap_watched_watch_send(
541 state->req_state, state->req_state->ev, rec, blocker);
542 if (state->watch_req == NULL) {
543 state->status = NT_STATUS_NO_MEMORY;
547 static int g_lock_lock_state_destructor(struct g_lock_lock_state *s)
549 NTSTATUS status = g_lock_unlock(s->ctx, s->key);
550 if (!NT_STATUS_IS_OK(status)) {
551 DBG_DEBUG("g_lock_unlock failed: %s\n", nt_errstr(status));
556 static void g_lock_lock_retry(struct tevent_req *subreq);
558 struct tevent_req *g_lock_lock_send(TALLOC_CTX *mem_ctx,
559 struct tevent_context *ev,
560 struct g_lock_ctx *ctx,
562 enum g_lock_type type)
564 struct tevent_req *req;
565 struct g_lock_lock_state *state;
566 struct g_lock_lock_fn_state fn_state;
570 req = tevent_req_create(mem_ctx, &state, struct g_lock_lock_state);
579 fn_state = (struct g_lock_lock_fn_state) {
583 status = dbwrap_do_locked(ctx->db, key, g_lock_lock_fn, &fn_state);
584 if (tevent_req_nterror(req, status)) {
585 DBG_DEBUG("dbwrap_do_locked failed: %s\n",
587 return tevent_req_post(req, ev);
590 if (NT_STATUS_IS_OK(fn_state.status)) {
591 tevent_req_done(req);
592 return tevent_req_post(req, ev);
594 if (!NT_STATUS_EQUAL(fn_state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
595 tevent_req_nterror(req, fn_state.status);
596 return tevent_req_post(req, ev);
599 if (tevent_req_nomem(fn_state.watch_req, req)) {
600 return tevent_req_post(req, ev);
603 ok = tevent_req_set_endtime(
606 timeval_current_ofs(5 + generate_random() % 5, 0));
609 return tevent_req_post(req, ev);
611 tevent_req_set_callback(fn_state.watch_req, g_lock_lock_retry, req);
616 static void g_lock_lock_retry(struct tevent_req *subreq)
618 struct tevent_req *req = tevent_req_callback_data(
619 subreq, struct tevent_req);
620 struct g_lock_lock_state *state = tevent_req_data(
621 req, struct g_lock_lock_state);
622 struct g_lock_lock_fn_state fn_state;
623 struct server_id blocker;
627 status = dbwrap_watched_watch_recv(subreq, &blockerdead, &blocker);
628 DBG_DEBUG("watch_recv returned %s\n", nt_errstr(status));
631 if (!NT_STATUS_IS_OK(status) &&
632 !NT_STATUS_EQUAL(status, NT_STATUS_IO_TIMEOUT)) {
633 tevent_req_nterror(req, status);
639 fn_state = (struct g_lock_lock_fn_state) {
641 .dead_blocker = blockerdead ? &blocker : NULL,
644 status = dbwrap_do_locked(state->ctx->db, state->key,
645 g_lock_lock_fn, &fn_state);
646 if (tevent_req_nterror(req, status)) {
647 DBG_DEBUG("dbwrap_do_locked failed: %s\n",
652 if (NT_STATUS_IS_OK(fn_state.status)) {
653 tevent_req_done(req);
656 if (!NT_STATUS_EQUAL(fn_state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
657 tevent_req_nterror(req, fn_state.status);
661 if (tevent_req_nomem(fn_state.watch_req, req)) {
665 if (!tevent_req_set_endtime(
666 fn_state.watch_req, state->ev,
667 timeval_current_ofs(5 + generate_random() % 5, 0))) {
670 tevent_req_set_callback(fn_state.watch_req, g_lock_lock_retry, req);
673 NTSTATUS g_lock_lock_recv(struct tevent_req *req)
675 return tevent_req_simple_recv_ntstatus(req);
678 struct g_lock_lock_simple_state {
680 enum g_lock_type type;
684 static void g_lock_lock_simple_fn(
685 struct db_record *rec,
689 struct g_lock_lock_simple_state *state = private_data;
690 struct g_lock lck = { .exclusive.pid = 0 };
693 ok = g_lock_parse(value.dptr, value.dsize, &lck);
695 DBG_DEBUG("g_lock_parse failed\n");
696 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
700 if (lck.exclusive.pid != 0) {
704 if (state->type == G_LOCK_WRITE) {
705 if (lck.num_shared != 0) {
708 lck.exclusive = state->me;
709 state->status = g_lock_store(rec, &lck, NULL);
713 if (state->type == G_LOCK_READ) {
714 g_lock_cleanup_shared(&lck);
715 state->status = g_lock_store(rec, &lck, &state->me);
720 state->status = NT_STATUS_LOCK_NOT_GRANTED;
723 NTSTATUS g_lock_lock(struct g_lock_ctx *ctx, TDB_DATA key,
724 enum g_lock_type type, struct timeval timeout)
727 struct tevent_context *ev;
728 struct tevent_req *req;
732 if ((type == G_LOCK_READ) || (type == G_LOCK_WRITE)) {
734 * This is an abstraction violation: Normally we do
735 * the sync wrappers around async functions with full
736 * nested event contexts. However, this is used in
737 * very hot code paths, so avoid the event context
738 * creation for the good path where there's no lock
739 * contention. My benchmark gave a factor of 2
740 * improvement for lock/unlock.
742 struct g_lock_lock_simple_state state = {
743 .me = messaging_server_id(ctx->msg),
746 status = dbwrap_do_locked(
747 ctx->db, key, g_lock_lock_simple_fn, &state);
748 if (!NT_STATUS_IS_OK(status)) {
749 DBG_DEBUG("dbwrap_do_locked() failed: %s\n",
753 if (NT_STATUS_IS_OK(state.status)) {
756 if (!NT_STATUS_EQUAL(
757 state.status, NT_STATUS_LOCK_NOT_GRANTED)) {
762 * Fall back to the full g_lock_trylock logic,
763 * g_lock_lock_simple_fn() called above only covers
764 * the uncontended path.
768 frame = talloc_stackframe();
769 status = NT_STATUS_NO_MEMORY;
771 ev = samba_tevent_context_init(frame);
775 req = g_lock_lock_send(frame, ev, ctx, key, type);
779 end = timeval_current_ofs(timeout.tv_sec, timeout.tv_usec);
780 if (!tevent_req_set_endtime(req, ev, end)) {
783 if (!tevent_req_poll_ntstatus(req, ev, &status)) {
786 status = g_lock_lock_recv(req);
792 struct g_lock_unlock_state {
793 struct server_id self;
797 static void g_lock_unlock_fn(
798 struct db_record *rec,
802 struct g_lock_unlock_state *state = private_data;
803 struct server_id_buf tmp;
808 ok = g_lock_parse(value.dptr, value.dsize, &lck);
810 DBG_DEBUG("g_lock_parse() failed\n");
811 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
815 exclusive = server_id_equal(&state->self, &lck.exclusive);
817 for (i=0; i<lck.num_shared; i++) {
818 struct server_id shared;
819 g_lock_get_shared(&lck, i, &shared);
820 if (server_id_equal(&state->self, &shared)) {
825 if (i < lck.num_shared) {
827 DBG_DEBUG("%s both exclusive and shared (%zu)\n",
828 server_id_str_buf(state->self, &tmp),
830 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
833 g_lock_del_shared(&lck, i);
836 DBG_DEBUG("Lock %s not found, num_rec=%zu\n",
837 server_id_str_buf(state->self, &tmp),
839 state->status = NT_STATUS_NOT_FOUND;
842 lck.exclusive = (struct server_id) { .pid = 0 };
845 if ((lck.exclusive.pid == 0) &&
846 (lck.num_shared == 0) &&
847 (lck.datalen == 0)) {
848 state->status = dbwrap_record_delete(rec);
852 state->status = g_lock_store(rec, &lck, NULL);
855 NTSTATUS g_lock_unlock(struct g_lock_ctx *ctx, TDB_DATA key)
857 struct g_lock_unlock_state state = {
858 .self = messaging_server_id(ctx->msg),
862 status = dbwrap_do_locked(ctx->db, key, g_lock_unlock_fn, &state);
863 if (!NT_STATUS_IS_OK(status)) {
864 DBG_WARNING("dbwrap_do_locked failed: %s\n",
868 if (!NT_STATUS_IS_OK(state.status)) {
869 DBG_WARNING("g_lock_unlock_fn failed: %s\n",
870 nt_errstr(state.status));
877 struct g_lock_write_data_state {
879 struct server_id self;
885 static void g_lock_write_data_fn(
886 struct db_record *rec,
890 struct g_lock_write_data_state *state = private_data;
895 ok = g_lock_parse(value.dptr, value.dsize, &lck);
897 DBG_DEBUG("g_lock_parse for %s failed\n",
898 hex_encode_talloc(talloc_tos(),
901 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
905 exclusive = server_id_equal(&state->self, &lck.exclusive);
908 * Make sure we're really exclusive. We are marked as
909 * exclusive when we are waiting for an exclusive lock
911 exclusive &= (lck.num_shared == 0);
914 DBG_DEBUG("Not locked by us\n");
915 state->status = NT_STATUS_NOT_LOCKED;
919 lck.data_seqnum += 1;
920 lck.data = discard_const_p(uint8_t, state->data);
921 lck.datalen = state->datalen;
922 state->status = g_lock_store(rec, &lck, NULL);
925 NTSTATUS g_lock_write_data(struct g_lock_ctx *ctx, TDB_DATA key,
926 const uint8_t *buf, size_t buflen)
928 struct g_lock_write_data_state state = {
929 .key = key, .self = messaging_server_id(ctx->msg),
930 .data = buf, .datalen = buflen
934 status = dbwrap_do_locked(ctx->db, key,
935 g_lock_write_data_fn, &state);
936 if (!NT_STATUS_IS_OK(status)) {
937 DBG_WARNING("dbwrap_do_locked failed: %s\n",
941 if (!NT_STATUS_IS_OK(state.status)) {
942 DBG_WARNING("g_lock_write_data_fn failed: %s\n",
943 nt_errstr(state.status));
950 struct g_lock_locks_state {
951 int (*fn)(TDB_DATA key, void *private_data);
955 static int g_lock_locks_fn(struct db_record *rec, void *priv)
958 struct g_lock_locks_state *state = (struct g_lock_locks_state *)priv;
960 key = dbwrap_record_get_key(rec);
961 return state->fn(key, state->private_data);
964 int g_lock_locks(struct g_lock_ctx *ctx,
965 int (*fn)(TDB_DATA key, void *private_data),
968 struct g_lock_locks_state state;
973 state.private_data = private_data;
975 status = dbwrap_traverse_read(ctx->db, g_lock_locks_fn, &state, &count);
976 if (!NT_STATUS_IS_OK(status)) {
982 struct g_lock_dump_state {
985 void (*fn)(struct server_id exclusive,
987 struct server_id *shared,
995 static void g_lock_dump_fn(TDB_DATA key, TDB_DATA data,
998 struct g_lock_dump_state *state = private_data;
999 struct g_lock lck = (struct g_lock) { .exclusive.pid = 0 };
1000 struct server_id *shared = NULL;
1004 ok = g_lock_parse(data.dptr, data.dsize, &lck);
1006 DBG_DEBUG("g_lock_parse failed for %s\n",
1007 hex_encode_talloc(talloc_tos(),
1010 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1014 shared = talloc_array(
1015 state->mem_ctx, struct server_id, lck.num_shared);
1016 if (shared == NULL) {
1017 DBG_DEBUG("talloc failed\n");
1018 state->status = NT_STATUS_NO_MEMORY;
1022 for (i=0; i<lck.num_shared; i++) {
1023 g_lock_get_shared(&lck, i, &shared[i]);
1026 state->fn(lck.exclusive,
1031 state->private_data);
1033 TALLOC_FREE(shared);
1035 state->status = NT_STATUS_OK;
1038 NTSTATUS g_lock_dump(struct g_lock_ctx *ctx, TDB_DATA key,
1039 void (*fn)(struct server_id exclusive,
1041 struct server_id *shared,
1042 const uint8_t *data,
1044 void *private_data),
1047 struct g_lock_dump_state state = {
1048 .mem_ctx = ctx, .key = key,
1049 .fn = fn, .private_data = private_data
1053 status = dbwrap_parse_record(ctx->db, key, g_lock_dump_fn, &state);
1054 if (!NT_STATUS_IS_OK(status)) {
1055 DBG_DEBUG("dbwrap_parse_record returned %s\n",
1059 if (!NT_STATUS_IS_OK(state.status)) {
1060 DBG_DEBUG("g_lock_dump_fn returned %s\n",
1061 nt_errstr(state.status));
1062 return state.status;
1064 return NT_STATUS_OK;
1067 struct g_lock_watch_data_state {
1068 struct tevent_context *ev;
1069 struct g_lock_ctx *ctx;
1071 struct server_id blocker;
1073 uint64_t data_seqnum;
1077 static void g_lock_watch_data_done(struct tevent_req *subreq);
1079 static void g_lock_watch_data_send_fn(
1080 struct db_record *rec,
1084 struct tevent_req *req = talloc_get_type_abort(
1085 private_data, struct tevent_req);
1086 struct g_lock_watch_data_state *state = tevent_req_data(
1087 req, struct g_lock_watch_data_state);
1088 struct tevent_req *subreq = NULL;
1092 ok = g_lock_parse(value.dptr, value.dsize, &lck);
1094 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1097 state->data_seqnum = lck.data_seqnum;
1099 DBG_DEBUG("state->data_seqnum=%"PRIu64"\n", state->data_seqnum);
1101 subreq = dbwrap_watched_watch_send(
1102 state, state->ev, rec, state->blocker);
1103 if (subreq == NULL) {
1104 state->status = NT_STATUS_NO_MEMORY;
1107 tevent_req_set_callback(subreq, g_lock_watch_data_done, req);
1109 state->status = NT_STATUS_EVENT_PENDING;
1112 struct tevent_req *g_lock_watch_data_send(
1113 TALLOC_CTX *mem_ctx,
1114 struct tevent_context *ev,
1115 struct g_lock_ctx *ctx,
1117 struct server_id blocker)
1119 struct tevent_req *req = NULL;
1120 struct g_lock_watch_data_state *state = NULL;
1123 req = tevent_req_create(
1124 mem_ctx, &state, struct g_lock_watch_data_state);
1130 state->blocker = blocker;
1132 state->key = tdb_data_talloc_copy(state, key);
1133 if (tevent_req_nomem(state->key.dptr, req)) {
1134 return tevent_req_post(req, ev);
1137 status = dbwrap_do_locked(
1138 ctx->db, key, g_lock_watch_data_send_fn, req);
1139 if (tevent_req_nterror(req, status)) {
1140 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
1141 return tevent_req_post(req, ev);
1144 if (NT_STATUS_IS_OK(state->status)) {
1145 tevent_req_done(req);
1146 return tevent_req_post(req, ev);
1152 static void g_lock_watch_data_done_fn(
1153 struct db_record *rec,
1157 struct tevent_req *req = talloc_get_type_abort(
1158 private_data, struct tevent_req);
1159 struct g_lock_watch_data_state *state = tevent_req_data(
1160 req, struct g_lock_watch_data_state);
1161 struct tevent_req *subreq = NULL;
1165 ok = g_lock_parse(value.dptr, value.dsize, &lck);
1167 state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
1171 if (lck.data_seqnum != state->data_seqnum) {
1172 DBG_DEBUG("lck.data_seqnum=%"PRIu64", "
1173 "state->data_seqnum=%"PRIu64"\n",
1175 state->data_seqnum);
1176 state->status = NT_STATUS_OK;
1180 subreq = dbwrap_watched_watch_send(
1181 state, state->ev, rec, state->blocker);
1182 if (subreq == NULL) {
1183 state->status = NT_STATUS_NO_MEMORY;
1186 tevent_req_set_callback(subreq, g_lock_watch_data_done, req);
1188 state->status = NT_STATUS_EVENT_PENDING;
1191 static void g_lock_watch_data_done(struct tevent_req *subreq)
1193 struct tevent_req *req = tevent_req_callback_data(
1194 subreq, struct tevent_req);
1195 struct g_lock_watch_data_state *state = tevent_req_data(
1196 req, struct g_lock_watch_data_state);
1199 status = dbwrap_watched_watch_recv(
1200 subreq, &state->blockerdead, &state->blocker);
1201 TALLOC_FREE(subreq);
1202 if (tevent_req_nterror(req, status)) {
1203 DBG_DEBUG("dbwrap_watched_watch_recv returned %s\n",
1208 status = dbwrap_do_locked(
1209 state->ctx->db, state->key, g_lock_watch_data_done_fn, req);
1210 if (tevent_req_nterror(req, status)) {
1211 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
1214 if (NT_STATUS_EQUAL(state->status, NT_STATUS_EVENT_PENDING)) {
1217 if (tevent_req_nterror(req, state->status)) {
1220 tevent_req_done(req);
1223 NTSTATUS g_lock_watch_data_recv(
1224 struct tevent_req *req,
1226 struct server_id *blocker)
1228 struct g_lock_watch_data_state *state = tevent_req_data(
1229 req, struct g_lock_watch_data_state);
1232 if (tevent_req_is_nterror(req, &status)) {
1235 if (blockerdead != NULL) {
1236 *blockerdead = state->blockerdead;
1238 if (blocker != NULL) {
1239 *blocker = state->blocker;
1242 return NT_STATUS_OK;
1245 static void g_lock_wake_watchers_fn(
1246 struct db_record *rec,
1250 struct g_lock lck = { .exclusive.pid = 0 };
1254 ok = g_lock_parse(value.dptr, value.dsize, &lck);
1256 DBG_WARNING("g_lock_parse failed\n");
1260 lck.data_seqnum += 1;
1262 status = g_lock_store(rec, &lck, NULL);
1263 if (!NT_STATUS_IS_OK(status)) {
1264 DBG_WARNING("g_lock_store failed: %s\n", nt_errstr(status));
1269 void g_lock_wake_watchers(struct g_lock_ctx *ctx, TDB_DATA key)
1273 status = dbwrap_do_locked(ctx->db, key, g_lock_wake_watchers_fn, NULL);
1274 if (!NT_STATUS_IS_OK(status)) {
1275 DBG_DEBUG("dbwrap_do_locked returned %s\n",