ctdb-tests: Implement transaction control in fake_ctdbd
authorAmitay Isaacs <amitay@gmail.com>
Mon, 5 Feb 2018 03:03:27 +0000 (14:03 +1100)
committerAmitay Isaacs <amitay@samba.org>
Tue, 27 Mar 2018 02:27:24 +0000 (04:27 +0200)
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/tests/src/fake_ctdbd.c

index 474a6de6dc4abd7bd7c1f0e8c0b1a72950b82574..854551e453fa46e47227bd7a17bd756e8b2647a3 100644 (file)
@@ -969,6 +969,68 @@ static int database_seqnum(struct database *db, uint64_t *seqnum)
        return ret;
 }
 
+static int ltdb_transaction_update(uint32_t reqid,
+                                  struct ctdb_ltdb_header *no_header,
+                                  TDB_DATA key, TDB_DATA data,
+                                  void *private_data)
+{
+       struct database *db = (struct database *)private_data;
+       TALLOC_CTX *tmp_ctx = talloc_new(db);
+       struct ctdb_ltdb_header header = { 0 }, oldheader;
+       TDB_DATA olddata;
+       int ret;
+
+       if (db->tdb == NULL) {
+               return EINVAL;
+       }
+
+       ret = ctdb_ltdb_header_extract(&data, &header);
+       if (ret != 0) {
+               return ret;
+       }
+
+       ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata);
+       if (ret != 0) {
+               return ret;
+       }
+
+       if (olddata.dsize > 0) {
+               if (oldheader.rsn > header.rsn ||
+                   (oldheader.rsn == header.rsn &&
+                    olddata.dsize != data.dsize)) {
+                       return -1;
+               }
+       }
+
+       talloc_free(tmp_ctx);
+
+       ret = ltdb_store(db, key, &header, data);
+       return ret;
+}
+
+static int ltdb_transaction(struct database *db,
+                           struct ctdb_rec_buffer *recbuf)
+{
+       int ret;
+
+       if (db->tdb == NULL) {
+               return EINVAL;
+       }
+
+       ret = tdb_transaction_start(db->tdb);
+       if (ret == -1) {
+               return ret;
+       }
+
+       ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db);
+       if (ret != 0) {
+               tdb_transaction_cancel(db->tdb);
+       }
+
+       ret = tdb_transaction_commit(db->tdb);
+       return ret;
+}
+
 static bool public_ips_parse(struct ctdbd_context *ctdb,
                             uint32_t numnodes)
 {
@@ -2780,6 +2842,49 @@ fail:
        reply.errmsg = "Failed to ban node";
 }
 
+static void control_trans3_commit(TALLOC_CTX *mem_ctx,
+                                 struct tevent_req *req,
+                                 struct ctdb_req_header *header,
+                                 struct ctdb_req_control *request)
+{
+       struct client_state *state = tevent_req_data(
+               req, struct client_state);
+       struct ctdbd_context *ctdb = state->ctdb;
+       struct ctdb_reply_control reply;
+       struct database *db;
+       int ret;
+
+       reply.rdata.opcode = request->opcode;
+
+       db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id);
+       if (db == NULL) {
+               reply.status = -1;
+               reply.errmsg = "Unknown database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+       if (! (db->flags &
+              (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) {
+               reply.status = -1;
+               reply.errmsg = "Transactions on volatile database";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+       ret = ltdb_transaction(db, request->rdata.data.recbuf);
+       if (ret != 0) {
+               reply.status = -1;
+               reply.errmsg = "Transaction failed";
+               client_send_control(req, header, &reply);
+               return;
+       }
+
+       reply.status = 0;
+       reply.errmsg = NULL;
+       client_send_control(req, header, &reply);
+}
+
 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
                               struct tevent_req *req,
                               struct ctdb_req_header *header,
@@ -3923,6 +4028,10 @@ static void client_process_control(struct tevent_req *req,
                control_set_ban_state(mem_ctx, req, &header, &request);
                break;
 
+       case CTDB_CONTROL_TRANS3_COMMIT:
+               control_trans3_commit(mem_ctx, req, &header, &request);
+               break;
+
        case CTDB_CONTROL_GET_DB_SEQNUM:
                control_get_db_seqnum(mem_ctx, req, &header, &request);
                break;