s3:dbwrap_ctdb: add a function db_ctdb_ltdb_fetch()
[amitay/samba.git] / source3 / lib / dbwrap_ctdb.c
index e38f76fcf6e910d39885229475a177495182918e..145c0f65ea831bd9def68af6dcc21d3949df5470 100644 (file)
 struct db_ctdb_transaction_handle {
        struct db_ctdb_ctx *ctx;
        bool in_replay;
-       /* we store the reads and writes done under a transaction one
-          list stores both reads and writes, the other just writes
-       */
+       /*
+        * we store the reads and writes done under a transaction:
+        * - one list stores both reads and writes (m_all),
+        * - the other just writes (m_write)
+        */
        struct ctdb_marshall_buffer *m_all;
        struct ctdb_marshall_buffer *m_write;
        uint32_t nesting;
@@ -73,6 +75,91 @@ static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
 }
 
 
+/**
+ * fetch a record from the tdb, separating out the header
+ * information and returning the body of the record.
+ */
+static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
+                                  TDB_DATA key,
+                                  struct ctdb_ltdb_header *header,
+                                  TALLOC_CTX *mem_ctx,
+                                  TDB_DATA *data)
+{
+       TDB_DATA rec;
+       NTSTATUS status;
+
+       rec = tdb_fetch(db->wtdb->tdb, key);
+       if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
+               status = NT_STATUS_NOT_FOUND;
+               if (data) {
+                       ZERO_STRUCTP(data);
+               }
+               if (header) {
+                       header->dmaster = (uint32_t)-1;
+                       header->rsn = 0;
+               }
+               goto done;
+       }
+
+       if (header) {
+               *header = *(struct ctdb_ltdb_header *)rec.dptr;
+       }
+
+       if (data) {
+               data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
+               if (data->dsize == 0) {
+                       data->dptr = NULL;
+               } else {
+                       data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
+                                       rec.dptr
+                                        + sizeof(struct ctdb_ltdb_header),
+                                       data->dsize);
+                       if (data->dptr == NULL) {
+                               status = NT_STATUS_NO_MEMORY;
+                               goto done;
+                       }
+               }
+       }
+
+       status = NT_STATUS_OK;
+
+done:
+       SAFE_FREE(rec.dptr);
+       return status;
+}
+
+/*
+ * Store a record together with the ctdb record header
+ * in the local copy of the database.
+ */
+static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
+                                  TDB_DATA key,
+                                  struct ctdb_ltdb_header *header,
+                                  TDB_DATA data)
+{
+       TALLOC_CTX *tmp_ctx = talloc_stackframe();
+       TDB_DATA rec;
+       int ret;
+
+       rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
+       rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
+
+       if (rec.dptr == NULL) {
+               talloc_free(tmp_ctx);
+               return NT_STATUS_NO_MEMORY;
+       }
+
+       memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
+       memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
+
+       ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
+
+       talloc_free(tmp_ctx);
+
+       return (ret == 0) ? NT_STATUS_OK
+                         : tdb_error_to_ntstatus(db->wtdb->tdb);
+
+}
 
 /*
   form a ctdb_rec_data record from a key/data pair
@@ -470,6 +557,7 @@ static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
        int ret;
        TDB_DATA rec;
        struct ctdb_ltdb_header header;
+       NTSTATUS status;
 
        /* we need the header so we can update the RSN */
        rec = tdb_fetch(h->ctx->wtdb->tdb, key);
@@ -478,7 +566,6 @@ static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
                   This is only safe because we are in a transaction and this
                   is a persistent database */
                ZERO_STRUCT(header);
-               header.dmaster = get_my_vnn();
        } else {
                memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
                rec.dsize -= sizeof(struct ctdb_ltdb_header);
@@ -492,6 +579,7 @@ static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
                SAFE_FREE(rec.dptr);
        }
 
+       header.dmaster = get_my_vnn();
        header.rsn++;
 
        if (!h->in_replay) {
@@ -510,17 +598,12 @@ static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
                return -1;
        }
 
-       rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
-       rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
-       if (rec.dptr == NULL) {
-               DEBUG(0,(__location__ " Failed to alloc record\n"));
-               talloc_free(tmp_ctx);
-               return -1;
+       status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
+       if (NT_STATUS_IS_OK(status)) {
+               ret = 0;
+       } else {
+               ret = -1;
        }
-       memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
-       memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
-
-       ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
 
        talloc_free(tmp_ctx);
 
@@ -784,24 +867,8 @@ static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
 {
        struct db_ctdb_rec *crec = talloc_get_type_abort(
                rec->private_data, struct db_ctdb_rec);
-       TDB_DATA cdata;
-       int ret;
 
-       cdata.dsize = sizeof(crec->header) + data.dsize;
-
-       if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
-               return NT_STATUS_NO_MEMORY;
-       }
-
-       memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
-       memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
-
-       ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
-
-       SAFE_FREE(cdata.dptr);
-
-       return (ret == 0) ? NT_STATUS_OK
-                         : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
+       return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
 }