4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 #include "ctdb_cluster_mutex.h"
46 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
48 struct ctdb_vnn_map_wire *map;
51 CHECK_CONTROL_DATA_SIZE(0);
53 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
54 map = talloc_size(outdata, len);
55 CTDB_NO_MEMORY(ctdb, map);
57 map->generation = ctdb->vnn_map->generation;
58 map->size = ctdb->vnn_map->size;
59 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
62 outdata->dptr = (uint8_t *)map;
68 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
70 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
72 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
73 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
77 talloc_free(ctdb->vnn_map);
79 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
80 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
82 ctdb->vnn_map->generation = map->generation;
83 ctdb->vnn_map->size = map->size;
84 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
85 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
87 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
93 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
96 struct ctdb_db_context *ctdb_db;
97 struct ctdb_dbid_map_old *dbid_map;
99 CHECK_CONTROL_DATA_SIZE(0);
102 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
107 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
108 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
109 if (!outdata->dptr) {
110 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
114 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
116 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
117 dbid_map->dbs[i].db_id = ctdb_db->db_id;
118 dbid_map->dbs[i].flags = ctdb_db->db_flags;
125 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
127 CHECK_CONTROL_DATA_SIZE(0);
129 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
132 if (outdata->dptr == NULL) {
136 outdata->dsize = talloc_get_size(outdata->dptr);
142 reload the nodes file
145 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
149 struct ctdb_node **nodes;
151 tmp_ctx = talloc_new(ctdb);
153 /* steal the old nodes file for a while */
154 talloc_steal(tmp_ctx, ctdb->nodes);
157 num_nodes = ctdb->num_nodes;
160 /* load the new nodes file */
161 ctdb_load_nodes_file(ctdb);
163 for (i=0; i<ctdb->num_nodes; i++) {
164 /* keep any identical pre-existing nodes and connections */
165 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
166 talloc_free(ctdb->nodes[i]);
167 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
171 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
175 /* any new or different nodes must be added */
176 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
177 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
178 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
180 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
181 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
182 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
186 /* tell the recovery daemon to reaload the nodes file too */
187 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
189 talloc_free(tmp_ctx);
195 a traverse function for pulling all relevent records from pulldb
198 struct ctdb_context *ctdb;
199 struct ctdb_db_context *ctdb_db;
200 struct ctdb_marshall_buffer *pulldata;
202 uint32_t allocated_len;
206 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
208 struct pulldb_data *params = (struct pulldb_data *)p;
209 struct ctdb_rec_data_old *rec;
210 struct ctdb_context *ctdb = params->ctdb;
211 struct ctdb_db_context *ctdb_db = params->ctdb_db;
213 /* add the record to the blob */
214 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
216 params->failed = true;
219 if (params->len + rec->length >= params->allocated_len) {
220 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
221 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
223 if (params->pulldata == NULL) {
224 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
225 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
227 params->pulldata->count++;
228 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
229 params->len += rec->length;
231 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
232 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
241 pull a bunch of records from a ltdb, filtering by lmaster
243 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
245 struct ctdb_pulldb *pull;
246 struct ctdb_db_context *ctdb_db;
247 struct pulldb_data params;
248 struct ctdb_marshall_buffer *reply;
250 pull = (struct ctdb_pulldb *)indata.dptr;
252 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
254 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
258 if (!ctdb_db_frozen(ctdb_db)) {
260 ("rejecting ctdb_control_pull_db when not frozen\n"));
264 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
265 CTDB_NO_MEMORY(ctdb, reply);
267 reply->db_id = pull->db_id;
270 params.ctdb_db = ctdb_db;
271 params.pulldata = reply;
272 params.len = offsetof(struct ctdb_marshall_buffer, data);
273 params.allocated_len = params.len;
274 params.failed = false;
276 if (ctdb_db->unhealthy_reason) {
277 /* this is just a warning, as the tdb should be empty anyway */
278 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
279 ctdb_db->db_name, ctdb_db->unhealthy_reason));
282 /* If the records are invalid, we are done */
283 if (ctdb_db->invalid_records) {
287 if (ctdb_lockdb_mark(ctdb_db) != 0) {
288 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
292 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
293 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
294 ctdb_lockdb_unmark(ctdb_db);
295 talloc_free(params.pulldata);
299 ctdb_lockdb_unmark(ctdb_db);
302 outdata->dptr = (uint8_t *)params.pulldata;
303 outdata->dsize = params.len;
305 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
306 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
308 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
309 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
316 struct db_pull_state {
317 struct ctdb_context *ctdb;
318 struct ctdb_db_context *ctdb_db;
319 struct ctdb_marshall_buffer *recs;
322 uint32_t num_records;
325 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
326 TDB_DATA data, void *private_data)
328 struct db_pull_state *state = (struct db_pull_state *)private_data;
329 struct ctdb_marshall_buffer *recs;
331 recs = ctdb_marshall_add(state->ctdb, state->recs,
332 state->ctdb_db->db_id, 0, key, NULL, data);
334 TALLOC_FREE(state->recs);
339 if (talloc_get_size(state->recs) >=
340 state->ctdb->tunable.rec_buffer_size_limit) {
344 buffer = ctdb_marshall_finish(state->recs);
345 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
346 state->srvid, buffer);
348 TALLOC_FREE(state->recs);
352 state->num_records += state->recs->count;
353 TALLOC_FREE(state->recs);
359 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
360 struct ctdb_req_control_old *c,
361 TDB_DATA indata, TDB_DATA *outdata)
363 struct ctdb_pulldb_ext *pulldb_ext;
364 struct ctdb_db_context *ctdb_db;
365 struct db_pull_state state;
368 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
370 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
371 if (ctdb_db == NULL) {
372 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
377 if (!ctdb_db_frozen(ctdb_db)) {
379 ("rejecting ctdb_control_pull_db when not frozen\n"));
383 if (ctdb_db->unhealthy_reason) {
384 /* this is just a warning, as the tdb should be empty anyway */
386 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
387 ctdb_db->db_name, ctdb_db->unhealthy_reason));
391 state.ctdb_db = ctdb_db;
393 state.pnn = c->hdr.srcnode;
394 state.srvid = pulldb_ext->srvid;
395 state.num_records = 0;
397 /* If the records are invalid, we are done */
398 if (ctdb_db->invalid_records) {
402 if (ctdb_lockdb_mark(ctdb_db) != 0) {
404 (__location__ " Failed to get lock on entire db - failing\n"));
408 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
411 (__location__ " Failed to get traverse db '%s'\n",
413 ctdb_lockdb_unmark(ctdb_db);
417 /* Last few records */
418 if (state.recs != NULL) {
421 buffer = ctdb_marshall_finish(state.recs);
422 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
423 state.srvid, buffer);
425 TALLOC_FREE(state.recs);
426 ctdb_lockdb_unmark(ctdb_db);
430 state.num_records += state.recs->count;
431 TALLOC_FREE(state.recs);
434 ctdb_lockdb_unmark(ctdb_db);
437 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
438 if (outdata->dptr == NULL) {
439 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
443 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
444 outdata->dsize = sizeof(uint32_t);
450 push a bunch of records into a ltdb, filtering by rsn
452 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
454 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
455 struct ctdb_db_context *ctdb_db;
457 struct ctdb_rec_data_old *rec;
459 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
460 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
464 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
466 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
470 if (!ctdb_db_frozen(ctdb_db)) {
472 ("rejecting ctdb_control_push_db when not frozen\n"));
476 if (ctdb_lockdb_mark(ctdb_db) != 0) {
477 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
481 rec = (struct ctdb_rec_data_old *)&reply->data[0];
483 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
484 reply->count, reply->db_id));
486 for (i=0;i<reply->count;i++) {
488 struct ctdb_ltdb_header *hdr;
490 key.dptr = &rec->data[0];
491 key.dsize = rec->keylen;
492 data.dptr = &rec->data[key.dsize];
493 data.dsize = rec->datalen;
495 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
496 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
499 hdr = (struct ctdb_ltdb_header *)data.dptr;
500 /* strip off any read only record flags. All readonly records
501 are revoked implicitely by a recovery
503 hdr->flags &= ~CTDB_REC_RO_FLAGS;
505 data.dptr += sizeof(*hdr);
506 data.dsize -= sizeof(*hdr);
508 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
510 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
514 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
517 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
518 reply->count, reply->db_id));
520 if (ctdb_db_readonly(ctdb_db)) {
521 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
523 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
524 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
525 tdb_close(ctdb_db->rottdb);
526 ctdb_db->rottdb = NULL;
527 ctdb_db_reset_readonly(ctdb_db);
529 while (ctdb_db->revokechild_active != NULL) {
530 talloc_free(ctdb_db->revokechild_active);
534 ctdb_lockdb_unmark(ctdb_db);
538 ctdb_lockdb_unmark(ctdb_db);
542 struct db_push_state {
543 struct ctdb_context *ctdb;
544 struct ctdb_db_context *ctdb_db;
546 uint32_t num_records;
550 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
553 struct db_push_state *state = talloc_get_type(
554 private_data, struct db_push_state);
555 struct ctdb_marshall_buffer *recs;
556 struct ctdb_rec_data_old *rec;
563 recs = (struct ctdb_marshall_buffer *)indata.dptr;
564 rec = (struct ctdb_rec_data_old *)&recs->data[0];
566 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
567 recs->count, recs->db_id));
569 for (i=0; i<recs->count; i++) {
571 struct ctdb_ltdb_header *hdr;
573 key.dptr = &rec->data[0];
574 key.dsize = rec->keylen;
575 data.dptr = &rec->data[key.dsize];
576 data.dsize = rec->datalen;
578 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
579 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
583 hdr = (struct ctdb_ltdb_header *)data.dptr;
584 /* Strip off any read only record flags.
585 * All readonly records are revoked implicitely by a recovery.
587 hdr->flags &= ~CTDB_REC_RO_FLAGS;
589 data.dptr += sizeof(*hdr);
590 data.dsize -= sizeof(*hdr);
592 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
595 (__location__ " Unable to store record\n"));
599 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
602 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
603 recs->count, recs->db_id));
605 state->num_records += recs->count;
609 state->failed = true;
612 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
614 struct ctdb_pulldb_ext *pulldb_ext;
615 struct ctdb_db_context *ctdb_db;
616 struct db_push_state *state;
619 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
621 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
622 if (ctdb_db == NULL) {
624 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
628 if (!ctdb_db_frozen(ctdb_db)) {
630 ("rejecting ctdb_control_db_push_start when not frozen\n"));
634 if (ctdb_db->push_started) {
636 (__location__ " DB push already started for %s\n",
639 /* De-register old state */
640 state = (struct db_push_state *)ctdb_db->push_state;
642 srvid_deregister(ctdb->srv, state->srvid, state);
644 ctdb_db->push_state = NULL;
648 state = talloc_zero(ctdb_db, struct db_push_state);
650 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
655 state->ctdb_db = ctdb_db;
656 state->srvid = pulldb_ext->srvid;
657 state->failed = false;
659 ret = srvid_register(ctdb->srv, state, state->srvid,
660 db_push_msg_handler, state);
663 (__location__ " Failed to register srvid for db push\n"));
668 if (ctdb_lockdb_mark(ctdb_db) != 0) {
670 (__location__ " Failed to get lock on entire db - failing\n"));
671 srvid_deregister(ctdb->srv, state->srvid, state);
676 ctdb_db->push_started = true;
677 ctdb_db->push_state = state;
682 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
683 TDB_DATA indata, TDB_DATA *outdata)
686 struct ctdb_db_context *ctdb_db;
687 struct db_push_state *state;
689 db_id = *(uint32_t *)indata.dptr;
691 ctdb_db = find_ctdb_db(ctdb, db_id);
692 if (ctdb_db == NULL) {
693 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
697 if (!ctdb_db_frozen(ctdb_db)) {
699 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
703 if (!ctdb_db->push_started) {
704 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
708 if (ctdb_db_readonly(ctdb_db)) {
710 ("Clearing the tracking database for dbid 0x%x\n",
712 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
714 ("Failed to wipe tracking database for 0x%x."
715 " Dropping read-only delegation support\n",
717 tdb_close(ctdb_db->rottdb);
718 ctdb_db->rottdb = NULL;
719 ctdb_db_reset_readonly(ctdb_db);
722 while (ctdb_db->revokechild_active != NULL) {
723 talloc_free(ctdb_db->revokechild_active);
727 ctdb_lockdb_unmark(ctdb_db);
729 state = (struct db_push_state *)ctdb_db->push_state;
731 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
735 srvid_deregister(ctdb->srv, state->srvid, state);
737 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
738 if (outdata->dptr == NULL) {
739 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
741 ctdb_db->push_state = NULL;
745 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
746 outdata->dsize = sizeof(uint32_t);
749 ctdb_db->push_started = false;
750 ctdb_db->push_state = NULL;
755 struct set_recmode_state {
756 struct ctdb_context *ctdb;
757 struct ctdb_req_control_old *c;
760 static void set_recmode_handler(char status,
764 struct set_recmode_state *state = talloc_get_type_abort(
765 private_data, struct set_recmode_state);
767 const char *err = NULL;
773 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
774 state->ctdb->recovery_lock));
776 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
781 DEBUG(DEBUG_DEBUG, (__location__ " Recovery lock check OK\n"));
782 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
783 ctdb_process_deferred_attach(state->ctdb);
787 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock",
788 reclock.ctdbd, latency);
792 /* Timeout. Consider this a success, not a failure,
793 * as we failed to set the recovery lock which is what
794 * we wanted. This can be caused by the cluster
795 * filesystem being very slow to arbitrate locks
796 * immediately after a node failure. */
799 "Time out getting recovery lock, allowing recmode set anyway\n"));
800 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
801 ctdb_process_deferred_attach(state->ctdb);
808 ("Unexpected error when testing recovery lock\n"));
810 err = "Unexpected error when testing recovery lock";
813 ctdb_request_control_reply(state->ctdb, state->c, NULL, s, err);
818 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
819 struct timeval t, void *private_data)
821 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
823 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
824 talloc_free(ctdb->release_ips_ctx);
825 ctdb->release_ips_ctx = NULL;
827 ctdb_release_all_ips(ctdb);
831 * Set up an event to drop all public ips if we remain in recovery for too
834 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
836 if (ctdb->release_ips_ctx != NULL) {
837 talloc_free(ctdb->release_ips_ctx);
839 ctdb->release_ips_ctx = talloc_new(ctdb);
840 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
842 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
843 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
844 ctdb_drop_all_ips_event, ctdb);
849 set the recovery mode
851 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
852 struct ctdb_req_control_old *c,
853 TDB_DATA indata, bool *async_reply,
854 const char **errormsg)
856 uint32_t recmode = *(uint32_t *)indata.dptr;
857 struct ctdb_db_context *ctdb_db;
858 struct set_recmode_state *state;
859 struct ctdb_cluster_mutex_handle *h;
861 if (recmode == ctdb->recovery_mode) {
862 D_INFO("Recovery mode already set to %s\n",
863 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
867 D_NOTICE("Recovery mode set to %s\n",
868 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
870 /* if we enter recovery but stay in recovery for too long
871 we will eventually drop all our ip addresses
873 if (recmode == CTDB_RECOVERY_ACTIVE) {
874 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
875 D_ERR("Failed to set up deferred drop all ips\n");
878 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
882 /* From this point: recmode == CTDB_RECOVERY_NORMAL
884 * Therefore, what follows is special handling when setting
885 * recovery mode back to normal */
887 TALLOC_FREE(ctdb->release_ips_ctx);
889 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
890 if (ctdb_db->generation != ctdb->vnn_map->generation) {
892 ("Inconsistent DB generation %u for %s\n",
893 ctdb_db->generation, ctdb_db->db_name));
894 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
899 /* force the databases to thaw */
900 if (ctdb_db_all_frozen(ctdb)) {
901 ctdb_control_thaw(ctdb, false);
904 if (ctdb->recovery_lock == NULL) {
905 /* Not using recovery lock file */
906 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
907 ctdb_process_deferred_attach(ctdb);
911 state = talloc_zero(ctdb, struct set_recmode_state);
913 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
919 h = ctdb_cluster_mutex(state, ctdb, ctdb->recovery_lock, 5,
920 set_recmode_handler, state, NULL, NULL);
926 state->c = talloc_steal(state, c);
934 delete a record as part of the vacuum process
935 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
936 use non-blocking locks
938 return 0 if the record was successfully deleted (i.e. it does not exist
939 when the function returns)
940 or !0 is the record still exists in the tdb after returning.
942 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
944 TDB_DATA key, data, data2;
945 struct ctdb_ltdb_header *hdr, *hdr2;
947 /* these are really internal tdb functions - but we need them here for
948 non-blocking lock of the freelist */
949 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
950 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
953 key.dsize = rec->keylen;
954 key.dptr = &rec->data[0];
955 data.dsize = rec->datalen;
956 data.dptr = &rec->data[rec->keylen];
958 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
959 DBG_INFO("Called delete on record where we are lmaster\n");
963 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
964 DBG_ERR("Bad record size\n");
968 hdr = (struct ctdb_ltdb_header *)data.dptr;
970 /* use a non-blocking lock */
971 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
972 DBG_INFO("Failed to get non-blocking chain lock\n");
976 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
977 if (data2.dptr == NULL) {
978 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
982 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
983 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
984 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
985 DBG_ERR("Failed to delete corrupt record\n");
987 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
988 DBG_ERR("Deleted corrupt record\n");
990 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
995 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
997 if (hdr2->rsn > hdr->rsn) {
998 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
999 DBG_INFO("Skipping record with rsn=%llu - called with rsn=%llu\n",
1000 (unsigned long long)hdr2->rsn,
1001 (unsigned long long)hdr->rsn);
1006 /* do not allow deleting record that have readonly flags set. */
1007 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1008 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1009 DBG_INFO("Skipping record with readonly flags set\n");
1013 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1014 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1015 DBG_INFO("Skipping record with readonly flags set locally\n");
1020 if (hdr2->dmaster == ctdb->pnn) {
1021 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1022 DBG_INFO("Attempted delete record where we are the dmaster\n");
1027 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1028 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1029 DBG_INFO("Failed to get non-blocking freelist lock\n");
1034 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1035 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1036 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1037 DBG_INFO("Failed to delete record\n");
1042 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1043 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1050 struct recovery_callback_state {
1051 struct ctdb_req_control_old *c;
1056 called when the 'recovered' event script has finished
1058 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1060 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1062 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1065 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1066 if (status == -ETIMEDOUT) {
1067 ctdb_ban_self(ctdb);
1071 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1074 gettimeofday(&ctdb->last_recovery_finished, NULL);
1076 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1077 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1082 recovery has finished
1084 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1085 struct ctdb_req_control_old *c,
1089 struct recovery_callback_state *state;
1091 DEBUG(DEBUG_ERR,("Recovery has finished\n"));
1093 ctdb_persistent_finish_trans3_commits(ctdb);
1095 state = talloc(ctdb, struct recovery_callback_state);
1096 CTDB_NO_MEMORY(ctdb, state);
1100 ret = ctdb_event_script_callback(ctdb, state,
1101 ctdb_end_recovery_callback,
1103 CTDB_EVENT_RECOVERED, "%s", "");
1106 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1111 /* tell the control that we will be reply asynchronously */
1112 state->c = talloc_steal(state, c);
1113 *async_reply = true;
1118 called when the 'startrecovery' event script has finished
1120 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1122 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1125 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1128 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1132 static void run_start_recovery_event(struct ctdb_context *ctdb,
1133 struct recovery_callback_state *state)
1137 ret = ctdb_event_script_callback(ctdb, state,
1138 ctdb_start_recovery_callback,
1140 CTDB_EVENT_START_RECOVERY,
1144 DEBUG(DEBUG_ERR,("Unable to run startrecovery event\n"));
1145 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1153 static bool reclock_strings_equal(const char *a, const char *b)
1155 return (a == NULL && b == NULL) ||
1156 (a != NULL && b != NULL && strcmp(a, b) == 0);
1159 static void start_recovery_reclock_callback(struct ctdb_context *ctdb,
1162 const char *errormsg,
1165 struct recovery_callback_state *state = talloc_get_type_abort(
1166 private_data, struct recovery_callback_state);
1167 const char *local = ctdb->recovery_lock;
1168 const char *remote = NULL;
1171 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1172 ctdb_request_control_reply(ctdb, state->c, NULL,
1178 /* Check reclock consistency */
1179 if (data.dsize > 0) {
1180 /* Ensure NUL-termination */
1181 data.dptr[data.dsize-1] = '\0';
1182 remote = (const char *)data.dptr;
1184 if (! reclock_strings_equal(local, remote)) {
1186 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1188 ("Recovery lock configuration inconsistent: "
1189 "recmaster has %s, this node has %s, shutting down\n",
1190 remote == NULL ? "NULL" : remote,
1191 local == NULL ? "NULL" : local));
1193 ctdb_shutdown_sequence(ctdb, 1);
1196 ("Recovery lock consistency check successful\n"));
1198 run_start_recovery_event(ctdb, state);
1201 /* Check recovery lock consistency and run eventscripts for the
1202 * "startrecovery" event */
1203 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1204 struct ctdb_req_control_old *c,
1208 struct recovery_callback_state *state;
1209 uint32_t recmaster = c->hdr.srcnode;
1211 DEBUG(DEBUG_ERR, ("Recovery has started\n"));
1212 gettimeofday(&ctdb->last_recovery_started, NULL);
1214 state = talloc(ctdb, struct recovery_callback_state);
1215 CTDB_NO_MEMORY(ctdb, state);
1219 /* Although the recovery master sent this node a start
1220 * recovery control, this node might still think the recovery
1221 * master is disconnected. In this case defer the recovery
1222 * lock consistency check. */
1223 if (ctdb->nodes[recmaster]->flags & NODE_FLAGS_DISCONNECTED) {
1224 run_start_recovery_event(ctdb, state);
1226 /* Ask the recovery master about its reclock setting */
1227 ret = ctdb_daemon_send_control(ctdb,
1230 CTDB_CONTROL_GET_RECLOCK_FILE,
1233 start_recovery_reclock_callback,
1237 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1243 /* tell the control that we will be reply asynchronously */
1244 state->c = talloc_steal(state, c);
1245 *async_reply = true;
1251 try to delete all these records as part of the vacuuming process
1252 and return the records we failed to delete
1254 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1256 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1257 struct ctdb_db_context *ctdb_db;
1259 struct ctdb_rec_data_old *rec;
1260 struct ctdb_marshall_buffer *records;
1262 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1263 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1267 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1269 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1274 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1275 reply->count, reply->db_id));
1278 /* create a blob to send back the records we couldnt delete */
1279 records = (struct ctdb_marshall_buffer *)
1280 talloc_zero_size(outdata,
1281 offsetof(struct ctdb_marshall_buffer, data));
1282 if (records == NULL) {
1283 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1286 records->db_id = ctdb_db->db_id;
1289 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1290 for (i=0;i<reply->count;i++) {
1293 key.dptr = &rec->data[0];
1294 key.dsize = rec->keylen;
1295 data.dptr = &rec->data[key.dsize];
1296 data.dsize = rec->datalen;
1298 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1299 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1300 talloc_free(records);
1304 /* If we cant delete the record we must add it to the reply
1305 so the lmaster knows it may not purge this record
1307 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1309 struct ctdb_ltdb_header *hdr;
1311 hdr = (struct ctdb_ltdb_header *)data.dptr;
1312 data.dptr += sizeof(*hdr);
1313 data.dsize -= sizeof(*hdr);
1315 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1317 old_size = talloc_get_size(records);
1318 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1319 if (records == NULL) {
1320 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1324 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1327 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1331 *outdata = ctdb_marshall_finish(records);
1339 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1341 uint32_t *capabilities = NULL;
1343 capabilities = talloc(outdata, uint32_t);
1344 CTDB_NO_MEMORY(ctdb, capabilities);
1345 *capabilities = ctdb->capabilities;
1347 outdata->dsize = sizeof(uint32_t);
1348 outdata->dptr = (uint8_t *)capabilities;
1353 /* The recovery daemon will ping us at regular intervals.
1354 If we havent been pinged for a while we assume the recovery
1355 daemon is inoperable and we restart.
1357 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1358 struct tevent_timer *te,
1359 struct timeval t, void *p)
1361 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1362 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1364 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1366 if (*count < ctdb->tunable.recd_ping_failcount) {
1368 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1369 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1370 ctdb_recd_ping_timeout, ctdb);
1374 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1376 ctdb_stop_recoverd(ctdb);
1377 ctdb_start_recoverd(ctdb);
1380 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1382 talloc_free(ctdb->recd_ping_count);
1384 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1385 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1387 if (ctdb->tunable.recd_ping_timeout != 0) {
1388 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1389 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1390 ctdb_recd_ping_timeout, ctdb);
1398 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1400 uint32_t new_recmaster;
1402 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1403 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1405 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1407 ("Remote node (%u) is now the recovery master\n",
1411 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1413 ("This node (%u) is now the recovery master\n",
1417 ctdb->recovery_master = new_recmaster;
1422 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1424 DEBUG(DEBUG_ERR, ("Stopping node\n"));
1425 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1430 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1432 DEBUG(DEBUG_ERR, ("Continue node\n"));
1433 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;