4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
32 struct ctdb_client *client; /* used by trans3_commit */
33 struct ctdb_req_control *c;
37 uint32_t num_failed, num_sent;
41 1) all nodes fail, and all nodes reply
42 2) some nodes fail, all nodes reply
48 called when a node has acknowledged a ctdb_control_update_record call
50 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
51 int32_t status, TDB_DATA data,
55 struct ctdb_persistent_state *state = talloc_get_type(private_data,
56 struct ctdb_persistent_state);
57 enum ctdb_trans2_commit_error etype;
59 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
60 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
61 "during recovery\n"));
66 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
67 status, errormsg?errormsg:"no error message given"));
68 state->status = status;
69 state->errormsg = errormsg;
73 * If a node failed to complete the update_record control,
74 * then either a recovery is already running or something
75 * bad is going on. So trigger a recovery and let the
76 * recovery finish the transaction, sending back the reply
77 * for the trans3_commit control to the client.
79 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
85 if (state->num_pending != 0) {
89 if (state->num_failed == state->num_sent) {
90 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
91 } else if (state->num_failed != 0) {
92 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
94 etype = CTDB_TRANS2_COMMIT_SUCCESS;
97 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
102 called if persistent store times out
104 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
105 struct timeval t, void *private_data)
107 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
109 if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
110 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
111 "timeout during recovery\n"));
115 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
116 "timeout in ctdb_persistent_state");
122 * Finish pending trans3 commit controls, i.e. send
123 * reply to the client. This is called by the end-recovery
124 * control to fix the situation when a recovery interrupts
125 * the usual porgress of a transaction.
127 void ctdb_persistent_finish_trans3_commits(struct ctdb_context *ctdb)
129 struct ctdb_db_context *ctdb_db;
131 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
132 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
133 "timeout during recovery\n"));
137 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
138 struct ctdb_persistent_state *state;
140 if (ctdb_db->persistent_state == NULL) {
144 state = ctdb_db->persistent_state;
146 ctdb_request_control_reply(ctdb, state->c, NULL,
147 CTDB_TRANS2_COMMIT_SOMEFAIL,
148 "trans3 commit ended by recovery");
150 /* The destructor sets ctdb_db->persistent_state to NULL. */
156 store a set of persistent records - called from a ctdb client when it has updated
157 some records in a persistent database. The client will have the record
158 locked for the duration of this call. The client is the dmaster when
161 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
162 struct ctdb_req_control *c,
163 TDB_DATA recdata, bool *async_reply)
165 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
166 struct ctdb_persistent_state *state;
168 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
169 struct ctdb_db_context *ctdb_db;
171 ctdb_db = find_ctdb_db(ctdb, m->db_id);
172 if (ctdb_db == NULL) {
173 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
174 "Unknown database db_id[0x%08x]\n", m->db_id));
178 if (client == NULL) {
179 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
183 if (ctdb_db->unhealthy_reason) {
184 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
185 ctdb_db->db_name, ctdb_db->unhealthy_reason));
189 /* handling num_persistent_updates is a bit strange -
191 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
192 They don't expect num_persistent_updates to be used at all
194 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
195 this commit to then decrement it
197 3) new clients which use TRANS2 commit functions, and
198 expect this function to increment the counter, and
199 then have it decremented in ctdb_control_trans2_error
200 or ctdb_control_trans2_finished
203 case CTDB_CONTROL_PERSISTENT_STORE:
204 if (ctdb_db->transaction_active) {
205 DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
206 "transaction is active on database "
207 "db_id[0x%08x] - refusing persistent "
208 " store for client id[0x%08x]\n",
209 ctdb_db->db_id, client->client_id));
212 if (client->num_persistent_updates > 0) {
213 client->num_persistent_updates--;
216 case CTDB_CONTROL_TRANS2_COMMIT:
217 if (ctdb_db->transaction_active) {
218 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
219 " already a transaction commit "
220 "active on db_id[0x%08x] - forbidding "
221 "client_id[0x%08x] to commit\n",
222 ctdb_db->db_id, client->client_id));
225 if (client->db_id != 0) {
226 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
227 "client-db_id[0x%08x] != 0 "
228 "(client_id[0x%08x])\n",
229 client->db_id, client->client_id));
232 client->num_persistent_updates++;
233 ctdb_db->transaction_active = true;
234 client->db_id = m->db_id;
235 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
236 " commit transaction on db id[0x%08x]\n",
237 client->client_id, client->db_id));
239 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
240 /* already updated from the first commit */
241 if (client->db_id != m->db_id) {
242 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
243 "retry: client-db_id[0x%08x] != "
244 "db_id[0x%08x] (client_id[0x%08x])\n",
246 m->db_id, client->client_id));
249 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
250 "transaction commit retry on "
252 client->client_id, client->db_id));
256 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
257 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
261 state = talloc_zero(ctdb, struct ctdb_persistent_state);
262 CTDB_NO_MEMORY(ctdb, state);
267 for (i=0;i<ctdb->vnn_map->size;i++) {
268 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
271 /* only send to active nodes */
272 if (node->flags & NODE_FLAGS_INACTIVE) {
276 /* don't send to ourselves */
277 if (node->pnn == ctdb->pnn) {
281 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
282 c->client_id, 0, recdata,
283 ctdb_persistent_callback, state);
285 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
290 state->num_pending++;
294 if (state->num_pending == 0) {
299 /* we need to wait for the replies */
302 /* need to keep the control structure around */
303 talloc_steal(state, c);
305 /* but we won't wait forever */
306 event_add_timed(ctdb->ev, state,
307 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
308 ctdb_persistent_store_timeout, state);
313 static int ctdb_persistent_state_destructor(struct ctdb_persistent_state *state)
315 if (state->client != NULL) {
316 state->client->db_id = 0;
319 if (state->ctdb_db != NULL) {
320 state->ctdb_db->persistent_state = NULL;
327 * Store a set of persistent records.
328 * This is used to roll out a transaction to all nodes.
330 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
331 struct ctdb_req_control *c,
332 TDB_DATA recdata, bool *async_reply)
334 struct ctdb_client *client;
335 struct ctdb_persistent_state *state;
337 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
338 struct ctdb_db_context *ctdb_db;
340 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
341 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
345 client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
346 if (client == NULL) {
347 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
348 "to a client. Returning error\n"));
352 if (client->db_id != 0) {
353 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans3_commit: "
354 "client-db_id[0x%08x] != 0 "
355 "(client_id[0x%08x]): trans3_commit active?\n",
356 client->db_id, client->client_id));
360 ctdb_db = find_ctdb_db(ctdb, m->db_id);
361 if (ctdb_db == NULL) {
362 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
363 "Unknown database db_id[0x%08x]\n", m->db_id));
367 if (ctdb_db->persistent_state != NULL) {
368 DEBUG(DEBUG_ERR, (__location__ " Error: "
369 "ctdb_control_trans3_commit "
370 "called while a transaction commit is "
371 "active. db_id[0x%08x]\n", m->db_id));
375 ctdb_db->persistent_state = talloc_zero(ctdb_db,
376 struct ctdb_persistent_state);
377 CTDB_NO_MEMORY(ctdb, ctdb_db->persistent_state);
379 client->db_id = m->db_id;
381 state = ctdb_db->persistent_state;
383 state->ctdb_db = ctdb_db;
385 state->client = client;
387 talloc_set_destructor(state, ctdb_persistent_state_destructor);
389 for (i = 0; i < ctdb->vnn_map->size; i++) {
390 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
393 /* only send to active nodes */
394 if (node->flags & NODE_FLAGS_INACTIVE) {
398 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
399 CTDB_CONTROL_UPDATE_RECORD,
400 c->client_id, 0, recdata,
401 ctdb_persistent_callback,
404 DEBUG(DEBUG_ERR,("Unable to send "
405 "CTDB_CONTROL_UPDATE_RECORD "
406 "to pnn %u\n", node->pnn));
411 state->num_pending++;
415 if (state->num_pending == 0) {
420 /* we need to wait for the replies */
423 /* need to keep the control structure around */
424 talloc_steal(state, c);
426 /* but we won't wait forever */
427 event_add_timed(ctdb->ev, state,
428 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
429 ctdb_persistent_store_timeout, state);
435 struct ctdb_persistent_write_state {
436 struct ctdb_db_context *ctdb_db;
437 struct ctdb_marshall_buffer *m;
438 struct ctdb_req_control *c;
443 called from a child process to write the data
445 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
448 struct ctdb_rec_data *rec = NULL;
449 struct ctdb_marshall_buffer *m = state->m;
451 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
453 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
454 state->ctdb_db->db_id));
458 for (i=0;i<m->count;i++) {
459 struct ctdb_ltdb_header oldheader;
460 struct ctdb_ltdb_header header;
461 TDB_DATA key, data, olddata;
462 TALLOC_CTX *tmp_ctx = talloc_new(state);
464 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
467 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
468 i, state->ctdb_db->db_id));
469 talloc_free(tmp_ctx);
473 /* fetch the old header and ensure the rsn is less than the new rsn */
474 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
476 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
477 state->ctdb_db->db_id));
478 talloc_free(tmp_ctx);
482 if (oldheader.rsn >= header.rsn &&
483 (olddata.dsize != data.dsize ||
484 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
485 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
486 state->ctdb_db->db_id,
487 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
488 talloc_free(tmp_ctx);
492 talloc_free(tmp_ctx);
494 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
496 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
497 state->ctdb_db->db_id));
502 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
504 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
505 state->ctdb_db->db_id));
512 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
518 called when we the child has completed the persistent write
521 static void ctdb_persistent_write_callback(int status, void *private_data)
523 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
524 struct ctdb_persistent_write_state);
527 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
533 called if our lockwait child times out
535 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
536 struct timeval t, void *private_data)
538 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
539 struct ctdb_persistent_write_state);
540 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
544 struct childwrite_handle {
545 struct ctdb_context *ctdb;
546 struct ctdb_db_context *ctdb_db;
547 struct fd_event *fde;
551 void (*callback)(int, void *);
552 struct timeval start_time;
555 static int childwrite_destructor(struct childwrite_handle *h)
557 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
558 kill(h->child, SIGKILL);
562 /* called when the child process has finished writing the record to the
565 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
566 uint16_t flags, void *private_data)
568 struct childwrite_handle *h = talloc_get_type(private_data,
569 struct childwrite_handle);
570 void *p = h->private_data;
571 void (*callback)(int, void *) = h->callback;
572 pid_t child = h->child;
573 TALLOC_CTX *tmp_ctx = talloc_new(ev);
577 CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
578 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
580 /* the handle needs to go away when the context is gone - when
581 the handle goes away this implicitly closes the pipe, which
583 talloc_steal(tmp_ctx, h);
585 talloc_set_destructor(h, NULL);
587 ret = read(h->fd[0], &c, 1);
589 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
595 kill(child, SIGKILL);
596 talloc_free(tmp_ctx);
599 /* this creates a child process which will take out a tdb transaction
600 and write the record to the database.
602 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
603 void (*callback)(int, void *private_data),
604 struct ctdb_persistent_write_state *state)
606 struct childwrite_handle *result;
608 pid_t parent = getpid();
610 CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
611 CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
613 if (!(result = talloc_zero(state, struct childwrite_handle))) {
614 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
618 ret = pipe(result->fd);
622 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
626 result->child = ctdb_fork(ctdb_db->ctdb);
628 if (result->child == (pid_t)-1) {
629 close(result->fd[0]);
630 close(result->fd[1]);
632 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
636 result->callback = callback;
637 result->private_data = state;
638 result->ctdb = ctdb_db->ctdb;
639 result->ctdb_db = ctdb_db;
641 if (result->child == 0) {
644 close(result->fd[0]);
645 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
646 ret = ctdb_persistent_store(state);
648 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
652 write(result->fd[1], &c, 1);
654 /* make sure we die when our parent dies */
655 while (kill(parent, 0) == 0 || errno != ESRCH) {
661 close(result->fd[1]);
662 set_close_on_exec(result->fd[0]);
664 talloc_set_destructor(result, childwrite_destructor);
666 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
668 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
669 EVENT_FD_READ, childwrite_handler,
671 if (result->fde == NULL) {
673 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
676 tevent_fd_set_auto_close(result->fde);
678 result->start_time = timeval_current();
684 update a record on this node if the new record has a higher rsn than the
687 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
688 struct ctdb_req_control *c, TDB_DATA recdata,
691 struct ctdb_db_context *ctdb_db;
692 struct ctdb_persistent_write_state *state;
693 struct childwrite_handle *handle;
694 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
696 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
697 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
701 ctdb_db = find_ctdb_db(ctdb, m->db_id);
702 if (ctdb_db == NULL) {
703 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
707 if (ctdb_db->unhealthy_reason) {
708 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
709 ctdb_db->db_name, ctdb_db->unhealthy_reason));
713 state = talloc(ctdb, struct ctdb_persistent_write_state);
714 CTDB_NO_MEMORY(ctdb, state);
716 state->ctdb_db = ctdb_db;
720 /* create a child process to take out a transaction and
723 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
724 if (handle == NULL) {
725 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
730 /* we need to wait for the replies */
733 /* need to keep the control structure around */
734 talloc_steal(state, c);
736 /* but we won't wait forever */
737 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
738 ctdb_persistent_lock_timeout, state);
745 called when a client has finished a local commit in a transaction to
746 a persistent database
748 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
749 struct ctdb_req_control *c)
751 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
752 struct ctdb_db_context *ctdb_db;
754 ctdb_db = find_ctdb_db(ctdb, client->db_id);
755 if (ctdb_db == NULL) {
756 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
757 "Unknown database 0x%08x\n", client->db_id));
760 if (!ctdb_db->transaction_active) {
761 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
762 "Database 0x%08x has no transaction commit "
763 "started\n", client->db_id));
767 ctdb_db->transaction_active = false;
770 if (client->num_persistent_updates == 0) {
771 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
772 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
773 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
776 client->num_persistent_updates--;
778 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
779 "transaction commit db_id[0x%08x]\n",
780 client->client_id, ctdb_db->db_id));
786 called when a client gets an error committing its database
787 during a transaction commit
789 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
790 struct ctdb_req_control *c)
792 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
793 struct ctdb_db_context *ctdb_db;
795 ctdb_db = find_ctdb_db(ctdb, client->db_id);
796 if (ctdb_db == NULL) {
797 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
798 "Unknown database 0x%08x\n", client->db_id));
801 if (!ctdb_db->transaction_active) {
802 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
803 "Database 0x%08x has no transaction commit "
804 "started\n", client->db_id));
808 ctdb_db->transaction_active = false;
811 if (client->num_persistent_updates == 0) {
812 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
814 client->num_persistent_updates--;
817 DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
818 " db_id[0x%08x] - forcing recovery\n",
820 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
826 * Tell whether a transaction is active on this node on the give DB.
828 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
829 struct ctdb_req_control *c,
832 struct ctdb_db_context *ctdb_db;
833 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
835 ctdb_db = find_ctdb_db(ctdb, db_id);
837 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
841 if (client->db_id == db_id) {
845 if (ctdb_db->transaction_active) {
853 backwards compatibility:
855 start a persistent store operation. passing both the key, header and
856 data to the daemon. If the client disconnects before it has issued
857 a persistent_update call to the daemon we trigger a full recovery
858 to ensure the databases are brought back in sync.
859 for now we ignore the recdata that the client has passed to us.
861 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
862 struct ctdb_req_control *c,
865 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
867 if (client == NULL) {
868 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
872 client->num_persistent_updates++;
878 backwards compatibility:
880 called to tell ctdbd that it is no longer doing a persistent update
882 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
883 struct ctdb_req_control *c,
886 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
888 if (client == NULL) {
889 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
893 if (client->num_persistent_updates > 0) {
894 client->num_persistent_updates--;
902 backwards compatibility:
904 single record varient of ctdb_control_trans2_commit for older clients
906 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
907 struct ctdb_req_control *c,
908 TDB_DATA recdata, bool *async_reply)
910 struct ctdb_marshall_buffer *m;
911 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
914 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
915 rec->keylen + rec->datalen) {
916 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
920 key.dptr = &rec->data[0];
921 key.dsize = rec->keylen;
922 data.dptr = &rec->data[rec->keylen];
923 data.dsize = rec->datalen;
925 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
926 CTDB_NO_MEMORY(ctdb, m);
928 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
931 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
936 struct ctdb_db_context *ctdb_db;
937 const char *keyname = CTDB_DB_SEQNUM_KEY;
940 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
942 ctdb_db = find_ctdb_db(ctdb, db_id);
944 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
949 key.dptr = (uint8_t *)discard_const(keyname);
950 key.dsize = strlen(keyname) + 1;
952 ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
957 if (data.dsize != sizeof(uint64_t)) {
962 *seqnum = *(uint64_t *)data.dptr;
965 talloc_free(mem_ctx);
970 * Get the sequence number of a persistent database.
972 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
980 db_id = *(uint32_t *)indata.dptr;
981 ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
986 outdata->dsize = sizeof(uint64_t);
987 outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
988 if (outdata->dptr == NULL) {
993 *(outdata->dptr) = seqnum;