lib: Add g_lock_watch_data_send/recv()
authorVolker Lendecke <vl@samba.org>
Wed, 30 Oct 2019 15:12:11 +0000 (16:12 +0100)
committerJeremy Allison <jra@samba.org>
Fri, 15 May 2020 00:48:32 +0000 (00:48 +0000)
Same concept as dbwrap_watched_watch_send/recv: Get informed if the
underlying data of a record changes. This utilizes the watched
database that g_lock is based upon anyway. To avoid spurious wakeups
by pure g_lock operations this patch adds a sequence number for the
data that is stored in the g_lock data field.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/include/g_lock.h
source3/lib/g_lock.c

index 54f60e410bade5f8b85dcd3e97f5f1cb28c13328..98321b629ec66d0c417682233d296d68396cb1f9 100644 (file)
@@ -65,4 +65,15 @@ NTSTATUS g_lock_dump(struct g_lock_ctx *ctx,
                                void *private_data),
                     void *private_data);
 
+struct tevent_req *g_lock_watch_data_send(
+       TALLOC_CTX *mem_ctx,
+       struct tevent_context *ev,
+       struct g_lock_ctx *ctx,
+       TDB_DATA key,
+       struct server_id blocker);
+NTSTATUS g_lock_watch_data_recv(
+       struct tevent_req *req,
+       bool *blockerdead,
+       struct server_id *blocker);
+
 #endif
index 4bf30188a43e2c568dc6495ebefe81ed232a3724..fc6232b7dc571e672691fe298ff820eacafc93d6 100644 (file)
@@ -42,6 +42,7 @@ struct g_lock {
        struct server_id exclusive;
        size_t num_shared;
        uint8_t *shared;
+       uint64_t data_seqnum;
        size_t datalen;
        uint8_t *data;
 };
@@ -50,9 +51,16 @@ static bool g_lock_parse(uint8_t *buf, size_t buflen, struct g_lock *lck)
 {
        struct server_id exclusive;
        size_t num_shared, shared_len;
-
-       if (buflen < (SERVER_ID_BUF_LENGTH + sizeof(uint32_t))) {
-               *lck = (struct g_lock) { .exclusive.pid = 0 };
+       uint64_t data_seqnum;
+
+       if (buflen < (SERVER_ID_BUF_LENGTH + /* exclusive */
+                     sizeof(uint64_t) +     /* seqnum */
+                     sizeof(uint32_t))) {   /* num_shared */
+               struct g_lock ret = { .exclusive.pid = 0 };
+               generate_random_buffer(
+                       (uint8_t *)&ret.data_seqnum,
+                       sizeof(ret.data_seqnum));
+               *lck = ret;
                return true;
        }
 
@@ -60,11 +68,18 @@ static bool g_lock_parse(uint8_t *buf, size_t buflen, struct g_lock *lck)
        buf += SERVER_ID_BUF_LENGTH;
        buflen -= SERVER_ID_BUF_LENGTH;
 
+       data_seqnum = BVAL(buf, 0);
+       buf += sizeof(uint64_t);
+       buflen -= sizeof(uint64_t);
+
        num_shared = IVAL(buf, 0);
        buf += sizeof(uint32_t);
        buflen -= sizeof(uint32_t);
 
        if (num_shared > buflen/SERVER_ID_BUF_LENGTH) {
+               DBG_DEBUG("num_shared=%zu, buflen=%zu\n",
+                         num_shared,
+                         buflen);
                return false;
        }
 
@@ -74,6 +89,7 @@ static bool g_lock_parse(uint8_t *buf, size_t buflen, struct g_lock *lck)
                .exclusive = exclusive,
                .num_shared = num_shared,
                .shared = buf,
+               .data_seqnum = data_seqnum,
                .datalen = buflen-shared_len,
                .data = buf+shared_len,
        };
@@ -110,11 +126,13 @@ static NTSTATUS g_lock_store(
        struct server_id *new_shared)
 {
        uint8_t exclusive[SERVER_ID_BUF_LENGTH];
+       uint8_t seqnum_buf[sizeof(uint64_t)];
        uint8_t sizebuf[sizeof(uint32_t)];
-       uint8_t shared[SERVER_ID_BUF_LENGTH];
+       uint8_t new_shared_buf[SERVER_ID_BUF_LENGTH];
 
        struct TDB_DATA dbufs[] = {
                { .dptr = exclusive, .dsize = sizeof(exclusive) },
+               { .dptr = seqnum_buf, .dsize = sizeof(seqnum_buf) },
                { .dptr = sizebuf, .dsize = sizeof(sizebuf) },
                { .dptr = lck->shared,
                  .dsize = lck->num_shared * SERVER_ID_BUF_LENGTH },
@@ -123,16 +141,18 @@ static NTSTATUS g_lock_store(
        };
 
        server_id_put(exclusive, lck->exclusive);
+       SBVAL(seqnum_buf, 0, lck->data_seqnum);
 
        if (new_shared != NULL) {
                if (lck->num_shared >= UINT32_MAX) {
                        return NT_STATUS_BUFFER_OVERFLOW;
                }
 
-               server_id_put(shared, *new_shared);
+               server_id_put(new_shared_buf, *new_shared);
 
-               dbufs[3] = (TDB_DATA) {
-                       .dptr = shared, .dsize = sizeof(shared),
+               dbufs[4] = (TDB_DATA) {
+                       .dptr = new_shared_buf,
+                       .dsize = sizeof(new_shared_buf),
                };
 
                lck->num_shared += 1;
@@ -896,6 +916,7 @@ static void g_lock_write_data_fn(
                return;
        }
 
+       lck.data_seqnum += 1;
        lck.data = discard_const_p(uint8_t, state->data);
        lck.datalen = state->datalen;
        state->status = g_lock_store(rec, &lck, NULL);
@@ -1042,3 +1063,181 @@ NTSTATUS g_lock_dump(struct g_lock_ctx *ctx, TDB_DATA key,
        }
        return NT_STATUS_OK;
 }
+
+struct g_lock_watch_data_state {
+       struct tevent_context *ev;
+       struct g_lock_ctx *ctx;
+       TDB_DATA key;
+       struct server_id blocker;
+       bool blockerdead;
+       uint64_t data_seqnum;
+       NTSTATUS status;
+};
+
+static void g_lock_watch_data_done(struct tevent_req *subreq);
+
+static void g_lock_watch_data_send_fn(
+       struct db_record *rec,
+       TDB_DATA value,
+       void *private_data)
+{
+       struct tevent_req *req = talloc_get_type_abort(
+               private_data, struct tevent_req);
+       struct g_lock_watch_data_state *state = tevent_req_data(
+               req, struct g_lock_watch_data_state);
+       struct tevent_req *subreq = NULL;
+       struct g_lock lck;
+       bool ok;
+
+       ok = g_lock_parse(value.dptr, value.dsize, &lck);
+       if (!ok) {
+               state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               return;
+       }
+       state->data_seqnum = lck.data_seqnum;
+
+       DBG_DEBUG("state->data_seqnum=%"PRIu64"\n", state->data_seqnum);
+
+       subreq = dbwrap_watched_watch_send(
+               state, state->ev, rec, state->blocker);
+       if (subreq == NULL) {
+               state->status = NT_STATUS_NO_MEMORY;
+               return;
+       }
+       tevent_req_set_callback(subreq, g_lock_watch_data_done, req);
+
+       state->status = NT_STATUS_EVENT_PENDING;
+}
+
+struct tevent_req *g_lock_watch_data_send(
+       TALLOC_CTX *mem_ctx,
+       struct tevent_context *ev,
+       struct g_lock_ctx *ctx,
+       TDB_DATA key,
+       struct server_id blocker)
+{
+       struct tevent_req *req = NULL;
+       struct g_lock_watch_data_state *state = NULL;
+       NTSTATUS status;
+
+       req = tevent_req_create(
+               mem_ctx, &state, struct g_lock_watch_data_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+       state->ctx = ctx;
+       state->blocker = blocker;
+
+       state->key = tdb_data_talloc_copy(state, key);
+       if (tevent_req_nomem(state->key.dptr, req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       status = dbwrap_do_locked(
+               ctx->db, key, g_lock_watch_data_send_fn, req);
+       if (tevent_req_nterror(req, status)) {
+               DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
+               return tevent_req_post(req, ev);
+       }
+
+       if (NT_STATUS_IS_OK(state->status)) {
+               tevent_req_done(req);
+               return tevent_req_post(req, ev);
+       }
+
+       return req;
+}
+
+static void g_lock_watch_data_done_fn(
+       struct db_record *rec,
+       TDB_DATA value,
+       void *private_data)
+{
+       struct tevent_req *req = talloc_get_type_abort(
+               private_data, struct tevent_req);
+       struct g_lock_watch_data_state *state = tevent_req_data(
+               req, struct g_lock_watch_data_state);
+       struct tevent_req *subreq = NULL;
+       struct g_lock lck;
+       bool ok;
+
+       ok = g_lock_parse(value.dptr, value.dsize, &lck);
+       if (!ok) {
+               state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               return;
+       }
+
+       if (lck.data_seqnum != state->data_seqnum) {
+               DBG_DEBUG("lck.data_seqnum=%"PRIu64", "
+                         "state->data_seqnum=%"PRIu64"\n",
+                         lck.data_seqnum,
+                         state->data_seqnum);
+               state->status = NT_STATUS_OK;
+               return;
+       }
+
+       subreq = dbwrap_watched_watch_send(
+               state, state->ev, rec, state->blocker);
+       if (subreq == NULL) {
+               state->status = NT_STATUS_NO_MEMORY;
+               return;
+       }
+       tevent_req_set_callback(subreq, g_lock_watch_data_done, req);
+
+       state->status = NT_STATUS_EVENT_PENDING;
+}
+
+static void g_lock_watch_data_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct g_lock_watch_data_state *state = tevent_req_data(
+               req, struct g_lock_watch_data_state);
+       NTSTATUS status;
+
+       status = dbwrap_watched_watch_recv(
+               subreq, &state->blockerdead, &state->blocker);
+       TALLOC_FREE(subreq);
+       if (tevent_req_nterror(req, status)) {
+               DBG_DEBUG("dbwrap_watched_watch_recv returned %s\n",
+                         nt_errstr(status));
+               return;
+       }
+
+       status = dbwrap_do_locked(
+               state->ctx->db, state->key, g_lock_watch_data_done_fn, req);
+       if (tevent_req_nterror(req, status)) {
+               DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status));
+               return;
+       }
+       if (NT_STATUS_EQUAL(state->status, NT_STATUS_EVENT_PENDING)) {
+               return;
+       }
+       if (tevent_req_nterror(req, state->status)) {
+               return;
+       }
+       tevent_req_done(req);
+}
+
+NTSTATUS g_lock_watch_data_recv(
+       struct tevent_req *req,
+       bool *blockerdead,
+       struct server_id *blocker)
+{
+       struct g_lock_watch_data_state *state = tevent_req_data(
+               req, struct g_lock_watch_data_state);
+       NTSTATUS status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               return status;
+       }
+       if (blockerdead != NULL) {
+               *blockerdead = state->blockerdead;
+       }
+       if (blocker != NULL) {
+               *blocker = state->blocker;
+       }
+
+       return NT_STATUS_OK;
+}