4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tevent/tevent.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "../include/ctdb_private.h"
29 struct ctdb_persistent_state {
30 struct ctdb_context *ctdb;
31 struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
32 struct ctdb_client *client; /* used by trans3_commit */
33 struct ctdb_req_control *c;
37 uint32_t num_failed, num_sent;
41 1) all nodes fail, and all nodes reply
42 2) some nodes fail, all nodes reply
48 called when a node has acknowledged a ctdb_control_update_record call
50 static void ctdb_persistent_callback(struct ctdb_context *ctdb,
51 int32_t status, TDB_DATA data,
55 struct ctdb_persistent_state *state = talloc_get_type(private_data,
56 struct ctdb_persistent_state);
57 enum ctdb_trans2_commit_error etype;
59 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
60 DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
61 "during recovery\n"));
66 DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
67 status, errormsg?errormsg:"no error message given"));
68 state->status = status;
69 state->errormsg = errormsg;
73 * If a node failed to complete the update_record control,
74 * then either a recovery is already running or something
75 * bad is going on. So trigger a recovery and let the
76 * recovery finish the transaction, sending back the reply
77 * for the trans3_commit control to the client.
79 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
85 if (state->num_pending != 0) {
89 if (state->num_failed == state->num_sent) {
90 etype = CTDB_TRANS2_COMMIT_ALLFAIL;
91 } else if (state->num_failed != 0) {
92 etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
94 etype = CTDB_TRANS2_COMMIT_SUCCESS;
97 ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
102 called if persistent store times out
104 static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed_event *te,
105 struct timeval t, void *private_data)
107 struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
109 if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
110 DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
111 "timeout during recovery\n"));
115 ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT,
116 "timeout in ctdb_persistent_state");
122 store a set of persistent records - called from a ctdb client when it has updated
123 some records in a persistent database. The client will have the record
124 locked for the duration of this call. The client is the dmaster when
127 int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
128 struct ctdb_req_control *c,
129 TDB_DATA recdata, bool *async_reply)
131 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
132 struct ctdb_persistent_state *state;
134 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
135 struct ctdb_db_context *ctdb_db;
137 ctdb_db = find_ctdb_db(ctdb, m->db_id);
138 if (ctdb_db == NULL) {
139 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_commit: "
140 "Unknown database db_id[0x%08x]\n", m->db_id));
144 if (client == NULL) {
145 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store to a client. Returning error\n"));
149 if (ctdb_db->unhealthy_reason) {
150 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_trans2_commit: %s\n",
151 ctdb_db->db_name, ctdb_db->unhealthy_reason));
155 /* handling num_persistent_updates is a bit strange -
157 1) very old clients, which never called CTDB_CONTROL_START_PERSISTENT_UPDATE
158 They don't expect num_persistent_updates to be used at all
160 2) less old clients, which uses CTDB_CONTROL_START_PERSISTENT_UPDATE, and expected
161 this commit to then decrement it
163 3) new clients which use TRANS2 commit functions, and
164 expect this function to increment the counter, and
165 then have it decremented in ctdb_control_trans2_error
166 or ctdb_control_trans2_finished
169 case CTDB_CONTROL_PERSISTENT_STORE:
170 if (ctdb_db->transaction_active) {
171 DEBUG(DEBUG_ERR, (__location__ " trans2_commit: a "
172 "transaction is active on database "
173 "db_id[0x%08x] - refusing persistent "
174 " store for client id[0x%08x]\n",
175 ctdb_db->db_id, client->client_id));
178 if (client->num_persistent_updates > 0) {
179 client->num_persistent_updates--;
182 case CTDB_CONTROL_TRANS2_COMMIT:
183 if (ctdb_db->transaction_active) {
184 DEBUG(DEBUG_ERR,(__location__ " trans2_commit: there is"
185 " already a transaction commit "
186 "active on db_id[0x%08x] - forbidding "
187 "client_id[0x%08x] to commit\n",
188 ctdb_db->db_id, client->client_id));
191 if (client->db_id != 0) {
192 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit: "
193 "client-db_id[0x%08x] != 0 "
194 "(client_id[0x%08x])\n",
195 client->db_id, client->client_id));
198 client->num_persistent_updates++;
199 ctdb_db->transaction_active = true;
200 client->db_id = m->db_id;
201 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started to"
202 " commit transaction on db id[0x%08x]\n",
203 client->client_id, client->db_id));
205 case CTDB_CONTROL_TRANS2_COMMIT_RETRY:
206 /* already updated from the first commit */
207 if (client->db_id != m->db_id) {
208 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans2_commit "
209 "retry: client-db_id[0x%08x] != "
210 "db_id[0x%08x] (client_id[0x%08x])\n",
212 m->db_id, client->client_id));
215 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] started "
216 "transaction commit retry on "
218 client->client_id, client->db_id));
222 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
223 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans2_commit when recovery active\n"));
227 state = talloc_zero(ctdb, struct ctdb_persistent_state);
228 CTDB_NO_MEMORY(ctdb, state);
233 for (i=0;i<ctdb->vnn_map->size;i++) {
234 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
237 /* only send to active nodes */
238 if (node->flags & NODE_FLAGS_INACTIVE) {
242 /* don't send to ourselves */
243 if (node->pnn == ctdb->pnn) {
247 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
248 c->client_id, 0, recdata,
249 ctdb_persistent_callback, state);
251 DEBUG(DEBUG_ERR,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
256 state->num_pending++;
260 if (state->num_pending == 0) {
265 /* we need to wait for the replies */
268 /* need to keep the control structure around */
269 talloc_steal(state, c);
271 /* but we won't wait forever */
272 event_add_timed(ctdb->ev, state,
273 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
274 ctdb_persistent_store_timeout, state);
279 static int ctdb_persistent_state_destructor(struct ctdb_persistent_state *state)
281 if (state->client != NULL) {
282 state->client->db_id = 0;
285 if (state->ctdb_db != NULL) {
286 state->ctdb_db->persistent_state = NULL;
293 * Store a set of persistent records.
294 * This is used to roll out a transaction to all nodes.
296 int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
297 struct ctdb_req_control *c,
298 TDB_DATA recdata, bool *async_reply)
300 struct ctdb_client *client;
301 struct ctdb_persistent_state *state;
303 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
304 struct ctdb_db_context *ctdb_db;
306 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
307 DEBUG(DEBUG_INFO,("rejecting ctdb_control_trans3_commit when recovery active\n"));
311 client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
312 if (client == NULL) {
313 DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
314 "to a client. Returning error\n"));
318 if (client->db_id != 0) {
319 DEBUG(DEBUG_ERR,(__location__ " ERROR: trans3_commit: "
320 "client-db_id[0x%08x] != 0 "
321 "(client_id[0x%08x]): trans3_commit active?\n",
322 client->db_id, client->client_id));
326 ctdb_db = find_ctdb_db(ctdb, m->db_id);
327 if (ctdb_db == NULL) {
328 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
329 "Unknown database db_id[0x%08x]\n", m->db_id));
333 if (ctdb_db->persistent_state != NULL) {
334 DEBUG(DEBUG_ERR, (__location__ " Error: "
335 "ctdb_control_trans3_commit "
336 "called while a transaction commit is "
337 "active. db_id[0x%08x]\n", m->db_id));
341 ctdb_db->persistent_state = talloc_zero(ctdb_db,
342 struct ctdb_persistent_state);
343 CTDB_NO_MEMORY(ctdb, ctdb_db->persistent_state);
345 client->db_id = m->db_id;
347 state = ctdb_db->persistent_state;
349 state->ctdb_db = ctdb_db;
351 state->client = client;
353 talloc_set_destructor(state, ctdb_persistent_state_destructor);
355 for (i = 0; i < ctdb->vnn_map->size; i++) {
356 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
359 /* only send to active nodes */
360 if (node->flags & NODE_FLAGS_INACTIVE) {
364 ret = ctdb_daemon_send_control(ctdb, node->pnn, 0,
365 CTDB_CONTROL_UPDATE_RECORD,
366 c->client_id, 0, recdata,
367 ctdb_persistent_callback,
370 DEBUG(DEBUG_ERR,("Unable to send "
371 "CTDB_CONTROL_UPDATE_RECORD "
372 "to pnn %u\n", node->pnn));
377 state->num_pending++;
381 if (state->num_pending == 0) {
386 /* we need to wait for the replies */
389 /* need to keep the control structure around */
390 talloc_steal(state, c);
392 /* but we won't wait forever */
393 event_add_timed(ctdb->ev, state,
394 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
395 ctdb_persistent_store_timeout, state);
401 struct ctdb_persistent_write_state {
402 struct ctdb_db_context *ctdb_db;
403 struct ctdb_marshall_buffer *m;
404 struct ctdb_req_control *c;
409 called from a child process to write the data
411 static int ctdb_persistent_store(struct ctdb_persistent_write_state *state)
414 struct ctdb_rec_data *rec = NULL;
415 struct ctdb_marshall_buffer *m = state->m;
417 ret = tdb_transaction_start(state->ctdb_db->ltdb->tdb);
419 DEBUG(DEBUG_ERR,("Failed to start transaction for db_id 0x%08x in ctdb_persistent_store\n",
420 state->ctdb_db->db_id));
424 for (i=0;i<m->count;i++) {
425 struct ctdb_ltdb_header oldheader;
426 struct ctdb_ltdb_header header;
427 TDB_DATA key, data, olddata;
428 TALLOC_CTX *tmp_ctx = talloc_new(state);
430 rec = ctdb_marshall_loop_next(m, rec, NULL, &header, &key, &data);
433 DEBUG(DEBUG_ERR,("Failed to get next record %d for db_id 0x%08x in ctdb_persistent_store\n",
434 i, state->ctdb_db->db_id));
435 talloc_free(tmp_ctx);
439 /* fetch the old header and ensure the rsn is less than the new rsn */
440 ret = ctdb_ltdb_fetch(state->ctdb_db, key, &oldheader, tmp_ctx, &olddata);
442 DEBUG(DEBUG_ERR,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
443 state->ctdb_db->db_id));
444 talloc_free(tmp_ctx);
448 if (oldheader.rsn >= header.rsn &&
449 (olddata.dsize != data.dsize ||
450 memcmp(olddata.dptr, data.dptr, data.dsize) != 0)) {
451 DEBUG(DEBUG_CRIT,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
452 state->ctdb_db->db_id,
453 (unsigned long long)oldheader.rsn, (unsigned long long)header.rsn));
454 talloc_free(tmp_ctx);
458 talloc_free(tmp_ctx);
460 ret = ctdb_ltdb_store(state->ctdb_db, key, &header, data);
462 DEBUG(DEBUG_CRIT,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
463 state->ctdb_db->db_id));
468 ret = tdb_transaction_commit(state->ctdb_db->ltdb->tdb);
470 DEBUG(DEBUG_ERR,("Failed to commit transaction for db_id 0x%08x in ctdb_persistent_store\n",
471 state->ctdb_db->db_id));
478 tdb_transaction_cancel(state->ctdb_db->ltdb->tdb);
484 called when we the child has completed the persistent write
487 static void ctdb_persistent_write_callback(int status, void *private_data)
489 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
490 struct ctdb_persistent_write_state);
493 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, status, NULL);
499 called if our lockwait child times out
501 static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
502 struct timeval t, void *private_data)
504 struct ctdb_persistent_write_state *state = talloc_get_type(private_data,
505 struct ctdb_persistent_write_state);
506 ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
510 struct childwrite_handle {
511 struct ctdb_context *ctdb;
512 struct ctdb_db_context *ctdb_db;
513 struct fd_event *fde;
517 void (*callback)(int, void *);
518 struct timeval start_time;
521 static int childwrite_destructor(struct childwrite_handle *h)
523 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
524 kill(h->child, SIGKILL);
528 /* called when the child process has finished writing the record to the
531 static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
532 uint16_t flags, void *private_data)
534 struct childwrite_handle *h = talloc_get_type(private_data,
535 struct childwrite_handle);
536 void *p = h->private_data;
537 void (*callback)(int, void *) = h->callback;
538 pid_t child = h->child;
539 TALLOC_CTX *tmp_ctx = talloc_new(ev);
543 CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
544 CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
546 /* the handle needs to go away when the context is gone - when
547 the handle goes away this implicitly closes the pipe, which
549 talloc_steal(tmp_ctx, h);
551 talloc_set_destructor(h, NULL);
553 ret = read(h->fd[0], &c, 1);
555 DEBUG(DEBUG_ERR, (__location__ " Read returned %d. Childwrite failed\n", ret));
561 kill(child, SIGKILL);
562 talloc_free(tmp_ctx);
565 /* this creates a child process which will take out a tdb transaction
566 and write the record to the database.
568 struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
569 void (*callback)(int, void *private_data),
570 struct ctdb_persistent_write_state *state)
572 struct childwrite_handle *result;
574 pid_t parent = getpid();
576 CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
577 CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
579 if (!(result = talloc_zero(state, struct childwrite_handle))) {
580 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
584 ret = pipe(result->fd);
588 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
592 result->child = ctdb_fork(ctdb_db->ctdb);
594 if (result->child == (pid_t)-1) {
595 close(result->fd[0]);
596 close(result->fd[1]);
598 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
602 result->callback = callback;
603 result->private_data = state;
604 result->ctdb = ctdb_db->ctdb;
605 result->ctdb_db = ctdb_db;
607 if (result->child == 0) {
610 close(result->fd[0]);
611 debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
612 ret = ctdb_persistent_store(state);
614 DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
618 write(result->fd[1], &c, 1);
620 /* make sure we die when our parent dies */
621 while (kill(parent, 0) == 0 || errno != ESRCH) {
627 close(result->fd[1]);
628 set_close_on_exec(result->fd[0]);
630 talloc_set_destructor(result, childwrite_destructor);
632 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
634 result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
635 EVENT_FD_READ, childwrite_handler,
637 if (result->fde == NULL) {
639 CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
642 tevent_fd_set_auto_close(result->fde);
644 result->start_time = timeval_current();
650 update a record on this node if the new record has a higher rsn than the
653 int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
654 struct ctdb_req_control *c, TDB_DATA recdata,
657 struct ctdb_db_context *ctdb_db;
658 struct ctdb_persistent_write_state *state;
659 struct childwrite_handle *handle;
660 struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
662 if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
663 DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
667 ctdb_db = find_ctdb_db(ctdb, m->db_id);
668 if (ctdb_db == NULL) {
669 DEBUG(DEBUG_ERR,("Unknown database 0x%08x in ctdb_control_update_record\n", m->db_id));
673 if (ctdb_db->unhealthy_reason) {
674 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_update_record: %s\n",
675 ctdb_db->db_name, ctdb_db->unhealthy_reason));
679 state = talloc(ctdb, struct ctdb_persistent_write_state);
680 CTDB_NO_MEMORY(ctdb, state);
682 state->ctdb_db = ctdb_db;
686 /* create a child process to take out a transaction and
689 handle = ctdb_childwrite(ctdb_db, ctdb_persistent_write_callback, state);
690 if (handle == NULL) {
691 DEBUG(DEBUG_ERR,("Failed to setup childwrite handler in ctdb_control_update_record\n"));
696 /* we need to wait for the replies */
699 /* need to keep the control structure around */
700 talloc_steal(state, c);
702 /* but we won't wait forever */
703 event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
704 ctdb_persistent_lock_timeout, state);
711 called when a client has finished a local commit in a transaction to
712 a persistent database
714 int32_t ctdb_control_trans2_finished(struct ctdb_context *ctdb,
715 struct ctdb_req_control *c)
717 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
718 struct ctdb_db_context *ctdb_db;
720 ctdb_db = find_ctdb_db(ctdb, client->db_id);
721 if (ctdb_db == NULL) {
722 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish "
723 "Unknown database 0x%08x\n", client->db_id));
726 if (!ctdb_db->transaction_active) {
727 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_finish: "
728 "Database 0x%08x has no transaction commit "
729 "started\n", client->db_id));
733 ctdb_db->transaction_active = false;
736 if (client->num_persistent_updates == 0) {
737 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
738 DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
739 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
742 client->num_persistent_updates--;
744 DEBUG(DEBUG_DEBUG, (__location__ " client id[0x%08x] finished "
745 "transaction commit db_id[0x%08x]\n",
746 client->client_id, ctdb_db->db_id));
752 called when a client gets an error committing its database
753 during a transaction commit
755 int32_t ctdb_control_trans2_error(struct ctdb_context *ctdb,
756 struct ctdb_req_control *c)
758 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
759 struct ctdb_db_context *ctdb_db;
761 ctdb_db = find_ctdb_db(ctdb, client->db_id);
762 if (ctdb_db == NULL) {
763 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
764 "Unknown database 0x%08x\n", client->db_id));
767 if (!ctdb_db->transaction_active) {
768 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans2_error: "
769 "Database 0x%08x has no transaction commit "
770 "started\n", client->db_id));
774 ctdb_db->transaction_active = false;
777 if (client->num_persistent_updates == 0) {
778 DEBUG(DEBUG_ERR, (__location__ " ERROR: num_persistent_updates == 0\n"));
780 client->num_persistent_updates--;
783 DEBUG(DEBUG_ERR,(__location__ " An error occurred during transaction on"
784 " db_id[0x%08x] - forcing recovery\n",
786 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
792 * Tell whether a transaction is active on this node on the give DB.
794 int32_t ctdb_control_trans2_active(struct ctdb_context *ctdb,
795 struct ctdb_req_control *c,
798 struct ctdb_db_context *ctdb_db;
799 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
801 ctdb_db = find_ctdb_db(ctdb, db_id);
803 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
807 if (client->db_id == db_id) {
811 if (ctdb_db->transaction_active) {
819 backwards compatibility:
821 start a persistent store operation. passing both the key, header and
822 data to the daemon. If the client disconnects before it has issued
823 a persistent_update call to the daemon we trigger a full recovery
824 to ensure the databases are brought back in sync.
825 for now we ignore the recdata that the client has passed to us.
827 int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
828 struct ctdb_req_control *c,
831 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
833 if (client == NULL) {
834 DEBUG(DEBUG_ERR,(__location__ " can not match start_persistent_update to a client. Returning error\n"));
838 client->num_persistent_updates++;
844 backwards compatibility:
846 called to tell ctdbd that it is no longer doing a persistent update
848 int32_t ctdb_control_cancel_persistent_update(struct ctdb_context *ctdb,
849 struct ctdb_req_control *c,
852 struct ctdb_client *client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
854 if (client == NULL) {
855 DEBUG(DEBUG_ERR,(__location__ " can not match cancel_persistent_update to a client. Returning error\n"));
859 if (client->num_persistent_updates > 0) {
860 client->num_persistent_updates--;
868 backwards compatibility:
870 single record varient of ctdb_control_trans2_commit for older clients
872 int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
873 struct ctdb_req_control *c,
874 TDB_DATA recdata, bool *async_reply)
876 struct ctdb_marshall_buffer *m;
877 struct ctdb_rec_data *rec = (struct ctdb_rec_data *)recdata.dptr;
880 if (recdata.dsize != offsetof(struct ctdb_rec_data, data) +
881 rec->keylen + rec->datalen) {
882 DEBUG(DEBUG_ERR, (__location__ " Bad data size in recdata\n"));
886 key.dptr = &rec->data[0];
887 key.dsize = rec->keylen;
888 data.dptr = &rec->data[rec->keylen];
889 data.dsize = rec->datalen;
891 m = ctdb_marshall_add(c, NULL, rec->reqid, rec->reqid, key, NULL, data);
892 CTDB_NO_MEMORY(ctdb, m);
894 return ctdb_control_trans2_commit(ctdb, c, ctdb_marshall_finish(m), async_reply);
897 static int32_t ctdb_get_db_seqnum(struct ctdb_context *ctdb,
902 struct ctdb_db_context *ctdb_db;
903 const char *keyname = CTDB_DB_SEQNUM_KEY;
906 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
908 ctdb_db = find_ctdb_db(ctdb, db_id);
910 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
915 key.dptr = (uint8_t *)discard_const(keyname);
916 key.dsize = strlen(keyname) + 1;
918 ret = (int32_t)ctdb_ltdb_fetch(ctdb_db, key, NULL, mem_ctx, &data);
923 if (data.dsize != sizeof(uint64_t)) {
928 *seqnum = *(uint64_t *)data.dptr;
931 talloc_free(mem_ctx);
936 * Get the sequence number of a persistent database.
938 int32_t ctdb_control_get_db_seqnum(struct ctdb_context *ctdb,
946 db_id = *(uint32_t *)indata.dptr;
947 ret = ctdb_get_db_seqnum(ctdb, db_id, &seqnum);
952 outdata->dsize = sizeof(uint64_t);
953 outdata->dptr = (uint8_t *)talloc_zero(outdata, uint64_t);
954 if (outdata->dptr == NULL) {
959 *(outdata->dptr) = seqnum;