4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 #include "ctdb_cluster_mutex.h"
46 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
48 struct ctdb_vnn_map_wire *map;
51 CHECK_CONTROL_DATA_SIZE(0);
53 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
54 map = talloc_size(outdata, len);
55 CTDB_NO_MEMORY(ctdb, map);
57 map->generation = ctdb->vnn_map->generation;
58 map->size = ctdb->vnn_map->size;
59 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
62 outdata->dptr = (uint8_t *)map;
68 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
70 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
72 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
73 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
77 talloc_free(ctdb->vnn_map);
79 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
80 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
82 ctdb->vnn_map->generation = map->generation;
83 ctdb->vnn_map->size = map->size;
84 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
85 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
87 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
93 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
96 struct ctdb_db_context *ctdb_db;
97 struct ctdb_dbid_map_old *dbid_map;
99 CHECK_CONTROL_DATA_SIZE(0);
102 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
107 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
108 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
109 if (!outdata->dptr) {
110 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
114 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
116 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
117 dbid_map->dbs[i].db_id = ctdb_db->db_id;
118 dbid_map->dbs[i].flags = ctdb_db->db_flags;
125 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
127 CHECK_CONTROL_DATA_SIZE(0);
129 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
132 if (outdata->dptr == NULL) {
136 outdata->dsize = talloc_get_size(outdata->dptr);
142 reload the nodes file
145 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
147 unsigned int i, num_nodes;
149 struct ctdb_node **nodes;
151 tmp_ctx = talloc_new(ctdb);
153 /* steal the old nodes file for a while */
154 talloc_steal(tmp_ctx, ctdb->nodes);
157 num_nodes = ctdb->num_nodes;
160 /* load the new nodes file */
161 ctdb_load_nodes_file(ctdb);
163 for (i=0; i<ctdb->num_nodes; i++) {
164 /* keep any identical pre-existing nodes and connections */
165 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
166 talloc_free(ctdb->nodes[i]);
167 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
171 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
175 /* any new or different nodes must be added */
176 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
177 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
178 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
180 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
181 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
182 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
186 /* tell the recovery daemon to reaload the nodes file too */
187 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
189 talloc_free(tmp_ctx);
195 a traverse function for pulling all relevent records from pulldb
198 struct ctdb_context *ctdb;
199 struct ctdb_db_context *ctdb_db;
200 struct ctdb_marshall_buffer *pulldata;
202 uint32_t allocated_len;
206 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
208 struct pulldb_data *params = (struct pulldb_data *)p;
209 struct ctdb_rec_data_old *rec;
210 struct ctdb_context *ctdb = params->ctdb;
211 struct ctdb_db_context *ctdb_db = params->ctdb_db;
213 /* add the record to the blob */
214 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
216 params->failed = true;
219 if (params->len + rec->length >= params->allocated_len) {
220 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
221 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
223 if (params->pulldata == NULL) {
224 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
225 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
227 params->pulldata->count++;
228 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
229 params->len += rec->length;
231 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
232 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
241 pull a bunch of records from a ltdb, filtering by lmaster
243 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
245 struct ctdb_pulldb *pull;
246 struct ctdb_db_context *ctdb_db;
247 struct pulldb_data params;
248 struct ctdb_marshall_buffer *reply;
250 pull = (struct ctdb_pulldb *)indata.dptr;
252 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
254 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
258 if (!ctdb_db_frozen(ctdb_db)) {
260 ("rejecting ctdb_control_pull_db when not frozen\n"));
264 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
265 CTDB_NO_MEMORY(ctdb, reply);
267 reply->db_id = pull->db_id;
270 params.ctdb_db = ctdb_db;
271 params.pulldata = reply;
272 params.len = offsetof(struct ctdb_marshall_buffer, data);
273 params.allocated_len = params.len;
274 params.failed = false;
276 if (ctdb_db->unhealthy_reason) {
277 /* this is just a warning, as the tdb should be empty anyway */
278 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
279 ctdb_db->db_name, ctdb_db->unhealthy_reason));
282 /* If the records are invalid, we are done */
283 if (ctdb_db->invalid_records) {
287 if (ctdb_lockdb_mark(ctdb_db) != 0) {
288 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
292 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
293 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
294 ctdb_lockdb_unmark(ctdb_db);
295 talloc_free(params.pulldata);
299 ctdb_lockdb_unmark(ctdb_db);
302 outdata->dptr = (uint8_t *)params.pulldata;
303 outdata->dsize = params.len;
305 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
306 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
308 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
309 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
316 struct db_pull_state {
317 struct ctdb_context *ctdb;
318 struct ctdb_db_context *ctdb_db;
319 struct ctdb_marshall_buffer *recs;
322 uint32_t num_records;
325 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
326 TDB_DATA data, void *private_data)
328 struct db_pull_state *state = (struct db_pull_state *)private_data;
329 struct ctdb_marshall_buffer *recs;
331 recs = ctdb_marshall_add(state->ctdb, state->recs,
332 state->ctdb_db->db_id, 0, key, NULL, data);
334 TALLOC_FREE(state->recs);
339 if (talloc_get_size(state->recs) >=
340 state->ctdb->tunable.rec_buffer_size_limit) {
344 buffer = ctdb_marshall_finish(state->recs);
345 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
346 state->srvid, buffer);
348 TALLOC_FREE(state->recs);
352 state->num_records += state->recs->count;
353 TALLOC_FREE(state->recs);
359 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
360 struct ctdb_req_control_old *c,
361 TDB_DATA indata, TDB_DATA *outdata)
363 struct ctdb_pulldb_ext *pulldb_ext;
364 struct ctdb_db_context *ctdb_db;
365 struct db_pull_state state;
368 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
370 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
371 if (ctdb_db == NULL) {
372 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
377 if (!ctdb_db_frozen(ctdb_db)) {
379 ("rejecting ctdb_control_pull_db when not frozen\n"));
383 if (ctdb_db->unhealthy_reason) {
384 /* this is just a warning, as the tdb should be empty anyway */
386 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
387 ctdb_db->db_name, ctdb_db->unhealthy_reason));
391 state.ctdb_db = ctdb_db;
393 state.pnn = c->hdr.srcnode;
394 state.srvid = pulldb_ext->srvid;
395 state.num_records = 0;
397 /* If the records are invalid, we are done */
398 if (ctdb_db->invalid_records) {
402 if (ctdb_lockdb_mark(ctdb_db) != 0) {
404 (__location__ " Failed to get lock on entire db - failing\n"));
408 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
411 (__location__ " Failed to get traverse db '%s'\n",
413 ctdb_lockdb_unmark(ctdb_db);
417 /* Last few records */
418 if (state.recs != NULL) {
421 buffer = ctdb_marshall_finish(state.recs);
422 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
423 state.srvid, buffer);
425 TALLOC_FREE(state.recs);
426 ctdb_lockdb_unmark(ctdb_db);
430 state.num_records += state.recs->count;
431 TALLOC_FREE(state.recs);
434 ctdb_lockdb_unmark(ctdb_db);
437 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
438 if (outdata->dptr == NULL) {
439 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
443 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
444 outdata->dsize = sizeof(uint32_t);
450 push a bunch of records into a ltdb, filtering by rsn
452 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
454 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
455 struct ctdb_db_context *ctdb_db;
458 struct ctdb_rec_data_old *rec;
460 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
461 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
465 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
467 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
471 if (!ctdb_db_frozen(ctdb_db)) {
473 ("rejecting ctdb_control_push_db when not frozen\n"));
477 if (ctdb_lockdb_mark(ctdb_db) != 0) {
478 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
482 rec = (struct ctdb_rec_data_old *)&reply->data[0];
484 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
485 reply->count, reply->db_id));
487 for (i=0;i<reply->count;i++) {
489 struct ctdb_ltdb_header *hdr;
491 key.dptr = &rec->data[0];
492 key.dsize = rec->keylen;
493 data.dptr = &rec->data[key.dsize];
494 data.dsize = rec->datalen;
496 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
497 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
500 hdr = (struct ctdb_ltdb_header *)data.dptr;
501 /* strip off any read only record flags. All readonly records
502 are revoked implicitely by a recovery
504 hdr->flags &= ~CTDB_REC_RO_FLAGS;
506 data.dptr += sizeof(*hdr);
507 data.dsize -= sizeof(*hdr);
509 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
511 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
515 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
518 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
519 reply->count, reply->db_id));
521 if (ctdb_db_readonly(ctdb_db)) {
522 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
524 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
525 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
526 tdb_close(ctdb_db->rottdb);
527 ctdb_db->rottdb = NULL;
528 ctdb_db_reset_readonly(ctdb_db);
530 while (ctdb_db->revokechild_active != NULL) {
531 talloc_free(ctdb_db->revokechild_active);
535 ctdb_lockdb_unmark(ctdb_db);
539 ctdb_lockdb_unmark(ctdb_db);
543 struct db_push_state {
544 struct ctdb_context *ctdb;
545 struct ctdb_db_context *ctdb_db;
547 uint32_t num_records;
551 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
554 struct db_push_state *state = talloc_get_type(
555 private_data, struct db_push_state);
556 struct ctdb_marshall_buffer *recs;
557 struct ctdb_rec_data_old *rec;
565 recs = (struct ctdb_marshall_buffer *)indata.dptr;
566 rec = (struct ctdb_rec_data_old *)&recs->data[0];
568 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
569 recs->count, recs->db_id));
571 for (i=0; i<recs->count; i++) {
573 struct ctdb_ltdb_header *hdr;
575 key.dptr = &rec->data[0];
576 key.dsize = rec->keylen;
577 data.dptr = &rec->data[key.dsize];
578 data.dsize = rec->datalen;
580 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
581 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
585 hdr = (struct ctdb_ltdb_header *)data.dptr;
586 /* Strip off any read only record flags.
587 * All readonly records are revoked implicitely by a recovery.
589 hdr->flags &= ~CTDB_REC_RO_FLAGS;
591 data.dptr += sizeof(*hdr);
592 data.dsize -= sizeof(*hdr);
594 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
597 (__location__ " Unable to store record\n"));
601 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
604 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
605 recs->count, recs->db_id));
607 state->num_records += recs->count;
611 state->failed = true;
614 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
616 struct ctdb_pulldb_ext *pulldb_ext;
617 struct ctdb_db_context *ctdb_db;
618 struct db_push_state *state;
621 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
623 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
624 if (ctdb_db == NULL) {
626 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
630 if (!ctdb_db_frozen(ctdb_db)) {
632 ("rejecting ctdb_control_db_push_start when not frozen\n"));
636 if (ctdb_db->push_started) {
638 (__location__ " DB push already started for %s\n",
641 /* De-register old state */
642 state = (struct db_push_state *)ctdb_db->push_state;
644 srvid_deregister(ctdb->srv, state->srvid, state);
646 ctdb_db->push_state = NULL;
650 state = talloc_zero(ctdb_db, struct db_push_state);
652 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
657 state->ctdb_db = ctdb_db;
658 state->srvid = pulldb_ext->srvid;
659 state->failed = false;
661 ret = srvid_register(ctdb->srv, state, state->srvid,
662 db_push_msg_handler, state);
665 (__location__ " Failed to register srvid for db push\n"));
670 if (ctdb_lockdb_mark(ctdb_db) != 0) {
672 (__location__ " Failed to get lock on entire db - failing\n"));
673 srvid_deregister(ctdb->srv, state->srvid, state);
678 ctdb_db->push_started = true;
679 ctdb_db->push_state = state;
684 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
685 TDB_DATA indata, TDB_DATA *outdata)
688 struct ctdb_db_context *ctdb_db;
689 struct db_push_state *state;
691 db_id = *(uint32_t *)indata.dptr;
693 ctdb_db = find_ctdb_db(ctdb, db_id);
694 if (ctdb_db == NULL) {
695 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
699 if (!ctdb_db_frozen(ctdb_db)) {
701 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
705 if (!ctdb_db->push_started) {
706 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
710 if (ctdb_db_readonly(ctdb_db)) {
712 ("Clearing the tracking database for dbid 0x%x\n",
714 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
716 ("Failed to wipe tracking database for 0x%x."
717 " Dropping read-only delegation support\n",
719 tdb_close(ctdb_db->rottdb);
720 ctdb_db->rottdb = NULL;
721 ctdb_db_reset_readonly(ctdb_db);
724 while (ctdb_db->revokechild_active != NULL) {
725 talloc_free(ctdb_db->revokechild_active);
729 ctdb_lockdb_unmark(ctdb_db);
731 state = (struct db_push_state *)ctdb_db->push_state;
733 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
737 srvid_deregister(ctdb->srv, state->srvid, state);
739 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
740 if (outdata->dptr == NULL) {
741 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
743 ctdb_db->push_state = NULL;
747 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
748 outdata->dsize = sizeof(uint32_t);
751 ctdb_db->push_started = false;
752 ctdb_db->push_state = NULL;
757 struct set_recmode_state {
758 struct ctdb_context *ctdb;
759 struct ctdb_req_control_old *c;
762 static void set_recmode_handler(char status,
766 struct set_recmode_state *state = talloc_get_type_abort(
767 private_data, struct set_recmode_state);
769 const char *err = NULL;
775 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
776 state->ctdb->recovery_lock));
778 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
783 DEBUG(DEBUG_DEBUG, (__location__ " Recovery lock check OK\n"));
784 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
785 ctdb_process_deferred_attach(state->ctdb);
789 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock",
790 reclock.ctdbd, latency);
794 /* Timeout. Consider this a success, not a failure,
795 * as we failed to set the recovery lock which is what
796 * we wanted. This can be caused by the cluster
797 * filesystem being very slow to arbitrate locks
798 * immediately after a node failure. */
801 "Time out getting recovery lock, allowing recmode set anyway\n"));
802 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
803 ctdb_process_deferred_attach(state->ctdb);
810 ("Unexpected error when testing recovery lock\n"));
812 err = "Unexpected error when testing recovery lock";
815 ctdb_request_control_reply(state->ctdb, state->c, NULL, s, err);
820 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
821 struct timeval t, void *private_data)
823 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
825 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
826 talloc_free(ctdb->release_ips_ctx);
827 ctdb->release_ips_ctx = NULL;
829 ctdb_release_all_ips(ctdb);
833 * Set up an event to drop all public ips if we remain in recovery for too
836 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
838 if (ctdb->release_ips_ctx != NULL) {
839 talloc_free(ctdb->release_ips_ctx);
841 ctdb->release_ips_ctx = talloc_new(ctdb);
842 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
844 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
845 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
846 ctdb_drop_all_ips_event, ctdb);
851 set the recovery mode
853 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
854 struct ctdb_req_control_old *c,
855 TDB_DATA indata, bool *async_reply,
856 const char **errormsg)
858 uint32_t recmode = *(uint32_t *)indata.dptr;
859 struct ctdb_db_context *ctdb_db;
860 struct set_recmode_state *state;
861 struct ctdb_cluster_mutex_handle *h;
863 if (recmode == ctdb->recovery_mode) {
864 D_INFO("Recovery mode already set to %s\n",
865 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
869 D_NOTICE("Recovery mode set to %s\n",
870 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
872 /* if we enter recovery but stay in recovery for too long
873 we will eventually drop all our ip addresses
875 if (recmode == CTDB_RECOVERY_ACTIVE) {
876 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
877 D_ERR("Failed to set up deferred drop all ips\n");
880 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
884 /* From this point: recmode == CTDB_RECOVERY_NORMAL
886 * Therefore, what follows is special handling when setting
887 * recovery mode back to normal */
889 TALLOC_FREE(ctdb->release_ips_ctx);
891 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
892 if (ctdb_db->generation != ctdb->vnn_map->generation) {
894 ("Inconsistent DB generation %u for %s\n",
895 ctdb_db->generation, ctdb_db->db_name));
896 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
901 /* force the databases to thaw */
902 if (ctdb_db_all_frozen(ctdb)) {
903 ctdb_control_thaw(ctdb, false);
906 if (ctdb->recovery_lock == NULL) {
907 /* Not using recovery lock file */
908 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
909 ctdb_process_deferred_attach(ctdb);
913 state = talloc_zero(ctdb, struct set_recmode_state);
915 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
921 h = ctdb_cluster_mutex(state, ctdb, ctdb->recovery_lock, 5,
922 set_recmode_handler, state, NULL, NULL);
928 state->c = talloc_steal(state, c);
936 delete a record as part of the vacuum process
937 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
938 use non-blocking locks
940 return 0 if the record was successfully deleted (i.e. it does not exist
941 when the function returns)
942 or !0 is the record still exists in the tdb after returning.
944 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
946 TDB_DATA key, data, data2;
947 struct ctdb_ltdb_header *hdr, *hdr2;
949 /* these are really internal tdb functions - but we need them here for
950 non-blocking lock of the freelist */
951 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
952 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
955 key.dsize = rec->keylen;
956 key.dptr = &rec->data[0];
957 data.dsize = rec->datalen;
958 data.dptr = &rec->data[rec->keylen];
960 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
961 DBG_INFO("Called delete on record where we are lmaster\n");
965 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
966 DBG_ERR("Bad record size\n");
970 hdr = (struct ctdb_ltdb_header *)data.dptr;
972 /* use a non-blocking lock */
973 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
974 DBG_INFO("Failed to get non-blocking chain lock\n");
978 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
979 if (data2.dptr == NULL) {
980 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
984 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
985 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
986 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
987 DBG_ERR("Failed to delete corrupt record\n");
989 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
990 DBG_ERR("Deleted corrupt record\n");
992 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
997 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
999 if (hdr2->rsn > hdr->rsn) {
1000 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1001 DBG_INFO("Skipping record with rsn=%llu - called with rsn=%llu\n",
1002 (unsigned long long)hdr2->rsn,
1003 (unsigned long long)hdr->rsn);
1008 /* do not allow deleting record that have readonly flags set. */
1009 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1010 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1011 DBG_INFO("Skipping record with readonly flags set\n");
1015 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1016 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1017 DBG_INFO("Skipping record with readonly flags set locally\n");
1022 if (hdr2->dmaster == ctdb->pnn) {
1023 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1024 DBG_INFO("Attempted delete record where we are the dmaster\n");
1029 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1030 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1031 DBG_INFO("Failed to get non-blocking freelist lock\n");
1036 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1037 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1038 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1039 DBG_INFO("Failed to delete record\n");
1044 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1045 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1052 struct recovery_callback_state {
1053 struct ctdb_req_control_old *c;
1058 called when the 'recovered' event script has finished
1060 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1062 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1064 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1067 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1068 if (status == -ETIMEDOUT) {
1069 ctdb_ban_self(ctdb);
1073 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1076 gettimeofday(&ctdb->last_recovery_finished, NULL);
1078 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1079 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1084 recovery has finished
1086 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1087 struct ctdb_req_control_old *c,
1091 struct recovery_callback_state *state;
1093 DEBUG(DEBUG_ERR,("Recovery has finished\n"));
1095 ctdb_persistent_finish_trans3_commits(ctdb);
1097 state = talloc(ctdb, struct recovery_callback_state);
1098 CTDB_NO_MEMORY(ctdb, state);
1102 ret = ctdb_event_script_callback(ctdb, state,
1103 ctdb_end_recovery_callback,
1105 CTDB_EVENT_RECOVERED, "%s", "");
1108 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1113 /* tell the control that we will be reply asynchronously */
1114 state->c = talloc_steal(state, c);
1115 *async_reply = true;
1120 called when the 'startrecovery' event script has finished
1122 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1124 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1127 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1130 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1134 static void run_start_recovery_event(struct ctdb_context *ctdb,
1135 struct recovery_callback_state *state)
1139 ret = ctdb_event_script_callback(ctdb, state,
1140 ctdb_start_recovery_callback,
1142 CTDB_EVENT_START_RECOVERY,
1146 DEBUG(DEBUG_ERR,("Unable to run startrecovery event\n"));
1147 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1155 static bool reclock_strings_equal(const char *a, const char *b)
1157 return (a == NULL && b == NULL) ||
1158 (a != NULL && b != NULL && strcmp(a, b) == 0);
1161 static void start_recovery_reclock_callback(struct ctdb_context *ctdb,
1164 const char *errormsg,
1167 struct recovery_callback_state *state = talloc_get_type_abort(
1168 private_data, struct recovery_callback_state);
1169 const char *local = ctdb->recovery_lock;
1170 const char *remote = NULL;
1173 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1174 ctdb_request_control_reply(ctdb, state->c, NULL,
1180 /* Check reclock consistency */
1181 if (data.dsize > 0) {
1182 /* Ensure NUL-termination */
1183 data.dptr[data.dsize-1] = '\0';
1184 remote = (const char *)data.dptr;
1186 if (! reclock_strings_equal(local, remote)) {
1188 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1190 ("Recovery lock configuration inconsistent: "
1191 "recmaster has %s, this node has %s, shutting down\n",
1192 remote == NULL ? "NULL" : remote,
1193 local == NULL ? "NULL" : local));
1195 ctdb_shutdown_sequence(ctdb, 1);
1198 ("Recovery lock consistency check successful\n"));
1200 run_start_recovery_event(ctdb, state);
1203 /* Check recovery lock consistency and run eventscripts for the
1204 * "startrecovery" event */
1205 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1206 struct ctdb_req_control_old *c,
1210 struct recovery_callback_state *state;
1211 uint32_t recmaster = c->hdr.srcnode;
1213 DEBUG(DEBUG_ERR, ("Recovery has started\n"));
1214 gettimeofday(&ctdb->last_recovery_started, NULL);
1216 state = talloc(ctdb, struct recovery_callback_state);
1217 CTDB_NO_MEMORY(ctdb, state);
1221 /* Although the recovery master sent this node a start
1222 * recovery control, this node might still think the recovery
1223 * master is disconnected. In this case defer the recovery
1224 * lock consistency check. */
1225 if (ctdb->nodes[recmaster]->flags & NODE_FLAGS_DISCONNECTED) {
1226 run_start_recovery_event(ctdb, state);
1228 /* Ask the recovery master about its reclock setting */
1229 ret = ctdb_daemon_send_control(ctdb,
1232 CTDB_CONTROL_GET_RECLOCK_FILE,
1235 start_recovery_reclock_callback,
1239 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1245 /* tell the control that we will be reply asynchronously */
1246 state->c = talloc_steal(state, c);
1247 *async_reply = true;
1253 try to delete all these records as part of the vacuuming process
1254 and return the records we failed to delete
1256 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1258 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1259 struct ctdb_db_context *ctdb_db;
1261 struct ctdb_rec_data_old *rec;
1262 struct ctdb_marshall_buffer *records;
1264 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1265 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1269 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1271 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1276 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1277 reply->count, reply->db_id));
1280 /* create a blob to send back the records we couldnt delete */
1281 records = (struct ctdb_marshall_buffer *)
1282 talloc_zero_size(outdata,
1283 offsetof(struct ctdb_marshall_buffer, data));
1284 if (records == NULL) {
1285 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1288 records->db_id = ctdb_db->db_id;
1291 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1292 for (i=0;i<reply->count;i++) {
1295 key.dptr = &rec->data[0];
1296 key.dsize = rec->keylen;
1297 data.dptr = &rec->data[key.dsize];
1298 data.dsize = rec->datalen;
1300 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1301 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1302 talloc_free(records);
1306 /* If we cant delete the record we must add it to the reply
1307 so the lmaster knows it may not purge this record
1309 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1311 struct ctdb_ltdb_header *hdr;
1313 hdr = (struct ctdb_ltdb_header *)data.dptr;
1314 data.dptr += sizeof(*hdr);
1315 data.dsize -= sizeof(*hdr);
1317 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1319 old_size = talloc_get_size(records);
1320 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1321 if (records == NULL) {
1322 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1326 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1329 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1333 *outdata = ctdb_marshall_finish(records);
1341 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1343 uint32_t *capabilities = NULL;
1345 capabilities = talloc(outdata, uint32_t);
1346 CTDB_NO_MEMORY(ctdb, capabilities);
1347 *capabilities = ctdb->capabilities;
1349 outdata->dsize = sizeof(uint32_t);
1350 outdata->dptr = (uint8_t *)capabilities;
1355 /* The recovery daemon will ping us at regular intervals.
1356 If we havent been pinged for a while we assume the recovery
1357 daemon is inoperable and we restart.
1359 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1360 struct tevent_timer *te,
1361 struct timeval t, void *p)
1363 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1364 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1366 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1368 if (*count < ctdb->tunable.recd_ping_failcount) {
1370 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1371 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1372 ctdb_recd_ping_timeout, ctdb);
1376 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1378 ctdb_stop_recoverd(ctdb);
1379 ctdb_start_recoverd(ctdb);
1382 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1384 talloc_free(ctdb->recd_ping_count);
1386 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1387 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1389 if (ctdb->tunable.recd_ping_timeout != 0) {
1390 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1391 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1392 ctdb_recd_ping_timeout, ctdb);
1400 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1402 uint32_t new_recmaster;
1404 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1405 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1407 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1409 ("Remote node (%u) is now the recovery master\n",
1413 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1415 ("This node (%u) is now the recovery master\n",
1419 ctdb->recovery_master = new_recmaster;
1423 void ctdb_node_become_inactive(struct ctdb_context *ctdb)
1425 struct ctdb_db_context *ctdb_db;
1427 D_WARNING("Making node INACTIVE\n");
1430 * Do not service database calls - reset generation to invalid
1431 * so this node ignores any REQ/REPLY CALL/DMASTER
1433 ctdb->vnn_map->generation = INVALID_GENERATION;
1434 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
1435 ctdb_db->generation = INVALID_GENERATION;
1439 * Although this bypasses the control, the only thing missing
1440 * is the deferred drop of all public IPs, which isn't
1441 * necessary because they are dropped below
1443 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
1444 D_NOTICE("Recovery mode set to ACTIVE\n");
1445 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1449 * Initiate database freeze - this will be scheduled for
1450 * immediate execution and will be in progress long before the
1451 * calling control returns
1453 ctdb_daemon_send_control(ctdb,
1456 CTDB_CONTROL_FREEZE,
1458 CTDB_CTRL_FLAG_NOREPLY,
1463 D_NOTICE("Dropping all public IP addresses\n");
1464 ctdb_release_all_ips(ctdb);
1467 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1469 DEBUG(DEBUG_ERR, ("Stopping node\n"));
1470 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1475 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1477 DEBUG(DEBUG_ERR, ("Continue node\n"));
1478 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;