4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/time.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/system.h"
40 #include "common/common.h"
41 #include "common/logging.h"
43 #include "ctdb_cluster_mutex.h"
46 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
48 struct ctdb_vnn_map_wire *map;
51 CHECK_CONTROL_DATA_SIZE(0);
53 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
54 map = talloc_size(outdata, len);
55 CTDB_NO_MEMORY(ctdb, map);
57 map->generation = ctdb->vnn_map->generation;
58 map->size = ctdb->vnn_map->size;
59 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
62 outdata->dptr = (uint8_t *)map;
68 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
70 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
72 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
73 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
77 talloc_free(ctdb->vnn_map);
79 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
80 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
82 ctdb->vnn_map->generation = map->generation;
83 ctdb->vnn_map->size = map->size;
84 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
85 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
87 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
93 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
96 struct ctdb_db_context *ctdb_db;
97 struct ctdb_dbid_map_old *dbid_map;
99 CHECK_CONTROL_DATA_SIZE(0);
102 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
107 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
108 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
109 if (!outdata->dptr) {
110 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
114 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
116 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
117 dbid_map->dbs[i].db_id = ctdb_db->db_id;
118 dbid_map->dbs[i].flags = ctdb_db->db_flags;
125 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
127 CHECK_CONTROL_DATA_SIZE(0);
129 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
132 if (outdata->dptr == NULL) {
136 outdata->dsize = talloc_get_size(outdata->dptr);
142 reload the nodes file
145 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
149 struct ctdb_node **nodes;
151 tmp_ctx = talloc_new(ctdb);
153 /* steal the old nodes file for a while */
154 talloc_steal(tmp_ctx, ctdb->nodes);
157 num_nodes = ctdb->num_nodes;
160 /* load the new nodes file */
161 ctdb_load_nodes_file(ctdb);
163 for (i=0; i<ctdb->num_nodes; i++) {
164 /* keep any identical pre-existing nodes and connections */
165 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
166 talloc_free(ctdb->nodes[i]);
167 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
171 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
175 /* any new or different nodes must be added */
176 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
177 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
178 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
180 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
181 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
182 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
186 /* tell the recovery daemon to reaload the nodes file too */
187 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
189 talloc_free(tmp_ctx);
195 a traverse function for pulling all relevent records from pulldb
198 struct ctdb_context *ctdb;
199 struct ctdb_db_context *ctdb_db;
200 struct ctdb_marshall_buffer *pulldata;
202 uint32_t allocated_len;
206 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
208 struct pulldb_data *params = (struct pulldb_data *)p;
209 struct ctdb_rec_data_old *rec;
210 struct ctdb_context *ctdb = params->ctdb;
211 struct ctdb_db_context *ctdb_db = params->ctdb_db;
213 /* add the record to the blob */
214 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
216 params->failed = true;
219 if (params->len + rec->length >= params->allocated_len) {
220 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
221 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
223 if (params->pulldata == NULL) {
224 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
225 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
227 params->pulldata->count++;
228 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
229 params->len += rec->length;
231 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
232 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
241 pull a bunch of records from a ltdb, filtering by lmaster
243 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
245 struct ctdb_pulldb *pull;
246 struct ctdb_db_context *ctdb_db;
247 struct pulldb_data params;
248 struct ctdb_marshall_buffer *reply;
250 pull = (struct ctdb_pulldb *)indata.dptr;
252 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
254 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
258 if (!ctdb_db_frozen(ctdb_db)) {
260 ("rejecting ctdb_control_pull_db when not frozen\n"));
264 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
265 CTDB_NO_MEMORY(ctdb, reply);
267 reply->db_id = pull->db_id;
270 params.ctdb_db = ctdb_db;
271 params.pulldata = reply;
272 params.len = offsetof(struct ctdb_marshall_buffer, data);
273 params.allocated_len = params.len;
274 params.failed = false;
276 if (ctdb_db->unhealthy_reason) {
277 /* this is just a warning, as the tdb should be empty anyway */
278 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
279 ctdb_db->db_name, ctdb_db->unhealthy_reason));
282 if (ctdb_lockdb_mark(ctdb_db) != 0) {
283 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
287 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
288 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
289 ctdb_lockdb_unmark(ctdb_db);
290 talloc_free(params.pulldata);
294 ctdb_lockdb_unmark(ctdb_db);
296 outdata->dptr = (uint8_t *)params.pulldata;
297 outdata->dsize = params.len;
299 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
300 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
302 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
303 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
310 struct db_pull_state {
311 struct ctdb_context *ctdb;
312 struct ctdb_db_context *ctdb_db;
313 struct ctdb_marshall_buffer *recs;
316 uint32_t num_records;
319 static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key,
320 TDB_DATA data, void *private_data)
322 struct db_pull_state *state = (struct db_pull_state *)private_data;
323 struct ctdb_marshall_buffer *recs;
325 recs = ctdb_marshall_add(state->ctdb, state->recs,
326 state->ctdb_db->db_id, 0, key, NULL, data);
328 TALLOC_FREE(state->recs);
333 if (talloc_get_size(state->recs) >=
334 state->ctdb->tunable.rec_buffer_size_limit) {
338 buffer = ctdb_marshall_finish(state->recs);
339 ret = ctdb_daemon_send_message(state->ctdb, state->pnn,
340 state->srvid, buffer);
342 TALLOC_FREE(state->recs);
346 state->num_records += state->recs->count;
347 TALLOC_FREE(state->recs);
353 int32_t ctdb_control_db_pull(struct ctdb_context *ctdb,
354 struct ctdb_req_control_old *c,
355 TDB_DATA indata, TDB_DATA *outdata)
357 struct ctdb_pulldb_ext *pulldb_ext;
358 struct ctdb_db_context *ctdb_db;
359 struct db_pull_state state;
362 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
364 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
365 if (ctdb_db == NULL) {
366 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n",
371 if (!ctdb_db_frozen(ctdb_db)) {
373 ("rejecting ctdb_control_pull_db when not frozen\n"));
377 if (ctdb_db->unhealthy_reason) {
378 /* this is just a warning, as the tdb should be empty anyway */
380 ("db(%s) unhealty in ctdb_control_db_pull: %s\n",
381 ctdb_db->db_name, ctdb_db->unhealthy_reason));
385 state.ctdb_db = ctdb_db;
387 state.pnn = c->hdr.srcnode;
388 state.srvid = pulldb_ext->srvid;
389 state.num_records = 0;
391 if (ctdb_lockdb_mark(ctdb_db) != 0) {
393 (__location__ " Failed to get lock on entire db - failing\n"));
397 ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state);
400 (__location__ " Failed to get traverse db '%s'\n",
402 ctdb_lockdb_unmark(ctdb_db);
406 /* Last few records */
407 if (state.recs != NULL) {
410 buffer = ctdb_marshall_finish(state.recs);
411 ret = ctdb_daemon_send_message(state.ctdb, state.pnn,
412 state.srvid, buffer);
414 TALLOC_FREE(state.recs);
415 ctdb_lockdb_unmark(ctdb_db);
419 state.num_records += state.recs->count;
420 TALLOC_FREE(state.recs);
423 ctdb_lockdb_unmark(ctdb_db);
425 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
426 if (outdata->dptr == NULL) {
427 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
431 memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t));
432 outdata->dsize = sizeof(uint32_t);
438 push a bunch of records into a ltdb, filtering by rsn
440 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
442 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
443 struct ctdb_db_context *ctdb_db;
445 struct ctdb_rec_data_old *rec;
447 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
448 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
452 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
454 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
458 if (!ctdb_db_frozen(ctdb_db)) {
460 ("rejecting ctdb_control_push_db when not frozen\n"));
464 if (ctdb_lockdb_mark(ctdb_db) != 0) {
465 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
469 rec = (struct ctdb_rec_data_old *)&reply->data[0];
471 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
472 reply->count, reply->db_id));
474 for (i=0;i<reply->count;i++) {
476 struct ctdb_ltdb_header *hdr;
478 key.dptr = &rec->data[0];
479 key.dsize = rec->keylen;
480 data.dptr = &rec->data[key.dsize];
481 data.dsize = rec->datalen;
483 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
484 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
487 hdr = (struct ctdb_ltdb_header *)data.dptr;
488 /* strip off any read only record flags. All readonly records
489 are revoked implicitely by a recovery
491 hdr->flags &= ~CTDB_REC_RO_FLAGS;
493 data.dptr += sizeof(*hdr);
494 data.dsize -= sizeof(*hdr);
496 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
498 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
502 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
505 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
506 reply->count, reply->db_id));
508 if (ctdb_db_readonly(ctdb_db)) {
509 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
511 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
512 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
513 tdb_close(ctdb_db->rottdb);
514 ctdb_db->rottdb = NULL;
515 ctdb_db_reset_readonly(ctdb_db);
517 while (ctdb_db->revokechild_active != NULL) {
518 talloc_free(ctdb_db->revokechild_active);
522 ctdb_lockdb_unmark(ctdb_db);
526 ctdb_lockdb_unmark(ctdb_db);
530 struct db_push_state {
531 struct ctdb_context *ctdb;
532 struct ctdb_db_context *ctdb_db;
534 uint32_t num_records;
538 static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata,
541 struct db_push_state *state = talloc_get_type(
542 private_data, struct db_push_state);
543 struct ctdb_marshall_buffer *recs;
544 struct ctdb_rec_data_old *rec;
551 recs = (struct ctdb_marshall_buffer *)indata.dptr;
552 rec = (struct ctdb_rec_data_old *)&recs->data[0];
554 DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n",
555 recs->count, recs->db_id));
557 for (i=0; i<recs->count; i++) {
559 struct ctdb_ltdb_header *hdr;
561 key.dptr = &rec->data[0];
562 key.dsize = rec->keylen;
563 data.dptr = &rec->data[key.dsize];
564 data.dsize = rec->datalen;
566 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
567 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
571 hdr = (struct ctdb_ltdb_header *)data.dptr;
572 /* Strip off any read only record flags.
573 * All readonly records are revoked implicitely by a recovery.
575 hdr->flags &= ~CTDB_REC_RO_FLAGS;
577 data.dptr += sizeof(*hdr);
578 data.dsize -= sizeof(*hdr);
580 ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data);
583 (__location__ " Unable to store record\n"));
587 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
590 DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n",
591 recs->count, recs->db_id));
593 state->num_records += recs->count;
597 state->failed = true;
600 int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata)
602 struct ctdb_pulldb_ext *pulldb_ext;
603 struct ctdb_db_context *ctdb_db;
604 struct db_push_state *state;
607 pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr;
609 ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id);
610 if (ctdb_db == NULL) {
612 (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id));
616 if (!ctdb_db_frozen(ctdb_db)) {
618 ("rejecting ctdb_control_db_push_start when not frozen\n"));
622 if (ctdb_db->push_started) {
624 (__location__ " DB push already started for %s\n",
627 /* De-register old state */
628 state = (struct db_push_state *)ctdb_db->push_state;
630 srvid_deregister(ctdb->srv, state->srvid, state);
632 ctdb_db->push_state = NULL;
636 state = talloc_zero(ctdb_db, struct db_push_state);
638 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
643 state->ctdb_db = ctdb_db;
644 state->srvid = pulldb_ext->srvid;
645 state->failed = false;
647 ret = srvid_register(ctdb->srv, state, state->srvid,
648 db_push_msg_handler, state);
651 (__location__ " Failed to register srvid for db push\n"));
656 if (ctdb_lockdb_mark(ctdb_db) != 0) {
658 (__location__ " Failed to get lock on entire db - failing\n"));
659 srvid_deregister(ctdb->srv, state->srvid, state);
664 ctdb_db->push_started = true;
665 ctdb_db->push_state = state;
670 int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb,
671 TDB_DATA indata, TDB_DATA *outdata)
674 struct ctdb_db_context *ctdb_db;
675 struct db_push_state *state;
677 db_id = *(uint32_t *)indata.dptr;
679 ctdb_db = find_ctdb_db(ctdb, db_id);
680 if (ctdb_db == NULL) {
681 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id));
685 if (!ctdb_db_frozen(ctdb_db)) {
687 ("rejecting ctdb_control_db_push_confirm when not frozen\n"));
691 if (!ctdb_db->push_started) {
692 DEBUG(DEBUG_ERR, (__location__ " DB push not started\n"));
696 if (ctdb_db_readonly(ctdb_db)) {
698 ("Clearing the tracking database for dbid 0x%x\n",
700 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
702 ("Failed to wipe tracking database for 0x%x."
703 " Dropping read-only delegation support\n",
705 tdb_close(ctdb_db->rottdb);
706 ctdb_db->rottdb = NULL;
707 ctdb_db_reset_readonly(ctdb_db);
710 while (ctdb_db->revokechild_active != NULL) {
711 talloc_free(ctdb_db->revokechild_active);
715 ctdb_lockdb_unmark(ctdb_db);
717 state = (struct db_push_state *)ctdb_db->push_state;
719 DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n"));
723 srvid_deregister(ctdb->srv, state->srvid, state);
725 outdata->dptr = talloc_size(outdata, sizeof(uint32_t));
726 if (outdata->dptr == NULL) {
727 DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n"));
729 ctdb_db->push_state = NULL;
733 memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t));
734 outdata->dsize = sizeof(uint32_t);
737 ctdb_db->push_started = false;
738 ctdb_db->push_state = NULL;
743 struct set_recmode_state {
744 struct ctdb_context *ctdb;
745 struct ctdb_req_control_old *c;
748 static void set_recmode_handler(char status,
752 struct set_recmode_state *state = talloc_get_type_abort(
753 private_data, struct set_recmode_state);
755 const char *err = NULL;
761 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
762 state->ctdb->recovery_lock));
764 err = "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem";
769 DEBUG(DEBUG_DEBUG, (__location__ " Recovery lock check OK\n"));
770 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
771 ctdb_process_deferred_attach(state->ctdb);
775 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock",
776 reclock.ctdbd, latency);
780 /* Timeout. Consider this a success, not a failure,
781 * as we failed to set the recovery lock which is what
782 * we wanted. This can be caused by the cluster
783 * filesystem being very slow to arbitrate locks
784 * immediately after a node failure. */
787 "Time out getting recovery lock, allowing recmode set anyway\n"));
788 state->ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
789 ctdb_process_deferred_attach(state->ctdb);
796 ("Unexpected error when testing recovery lock\n"));
798 err = "Unexpected error when testing recovery lock";
801 ctdb_request_control_reply(state->ctdb, state->c, NULL, s, err);
806 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
807 struct timeval t, void *private_data)
809 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
811 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
812 talloc_free(ctdb->release_ips_ctx);
813 ctdb->release_ips_ctx = NULL;
815 ctdb_release_all_ips(ctdb);
819 * Set up an event to drop all public ips if we remain in recovery for too
822 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
824 if (ctdb->release_ips_ctx != NULL) {
825 talloc_free(ctdb->release_ips_ctx);
827 ctdb->release_ips_ctx = talloc_new(ctdb);
828 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
830 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
831 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
832 ctdb_drop_all_ips_event, ctdb);
837 set the recovery mode
839 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
840 struct ctdb_req_control_old *c,
841 TDB_DATA indata, bool *async_reply,
842 const char **errormsg)
844 uint32_t recmode = *(uint32_t *)indata.dptr;
845 struct ctdb_db_context *ctdb_db;
846 struct set_recmode_state *state;
847 struct ctdb_cluster_mutex_handle *h;
849 if (recmode == ctdb->recovery_mode) {
850 D_INFO("Recovery mode already set to %s\n",
851 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
855 D_NOTICE("Recovery mode set to %s\n",
856 recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "ACTIVE");
858 /* if we enter recovery but stay in recovery for too long
859 we will eventually drop all our ip addresses
861 if (recmode == CTDB_RECOVERY_ACTIVE) {
862 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
863 D_ERR("Failed to set up deferred drop all ips\n");
866 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
870 /* From this point: recmode == CTDB_RECOVERY_NORMAL
872 * Therefore, what follows is special handling when setting
873 * recovery mode back to normal */
875 TALLOC_FREE(ctdb->release_ips_ctx);
877 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
878 if (ctdb_db->generation != ctdb->vnn_map->generation) {
880 ("Inconsistent DB generation %u for %s\n",
881 ctdb_db->generation, ctdb_db->db_name));
882 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
887 /* force the databases to thaw */
888 if (ctdb_db_all_frozen(ctdb)) {
889 ctdb_control_thaw(ctdb, false);
892 if (ctdb->recovery_lock == NULL) {
893 /* Not using recovery lock file */
894 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
895 ctdb_process_deferred_attach(ctdb);
899 state = talloc_zero(ctdb, struct set_recmode_state);
901 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
907 h = ctdb_cluster_mutex(state, ctdb, ctdb->recovery_lock, 5,
908 set_recmode_handler, state, NULL, NULL);
914 state->c = talloc_steal(state, c);
922 delete a record as part of the vacuum process
923 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
924 use non-blocking locks
926 return 0 if the record was successfully deleted (i.e. it does not exist
927 when the function returns)
928 or !0 is the record still exists in the tdb after returning.
930 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
932 TDB_DATA key, data, data2;
933 struct ctdb_ltdb_header *hdr, *hdr2;
935 /* these are really internal tdb functions - but we need them here for
936 non-blocking lock of the freelist */
937 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
938 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
941 key.dsize = rec->keylen;
942 key.dptr = &rec->data[0];
943 data.dsize = rec->datalen;
944 data.dptr = &rec->data[rec->keylen];
946 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
947 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
951 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
952 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
956 hdr = (struct ctdb_ltdb_header *)data.dptr;
958 /* use a non-blocking lock */
959 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
963 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
964 if (data2.dptr == NULL) {
965 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
969 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
970 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
971 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
972 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
974 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
975 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
977 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
982 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
984 if (hdr2->rsn > hdr->rsn) {
985 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
986 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
987 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
992 /* do not allow deleting record that have readonly flags set. */
993 if (hdr->flags & CTDB_REC_RO_FLAGS) {
994 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
995 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
999 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1000 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1001 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
1006 if (hdr2->dmaster == ctdb->pnn) {
1007 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1008 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
1013 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1014 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1019 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1020 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1021 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1022 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
1027 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1028 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1035 struct recovery_callback_state {
1036 struct ctdb_req_control_old *c;
1041 called when the 'recovered' event script has finished
1043 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1045 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1047 ctdb_enable_monitoring(ctdb);
1048 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1051 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1052 if (status == -ETIME) {
1053 ctdb_ban_self(ctdb);
1057 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1060 gettimeofday(&ctdb->last_recovery_finished, NULL);
1062 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1063 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
1068 recovery has finished
1070 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
1071 struct ctdb_req_control_old *c,
1075 struct recovery_callback_state *state;
1077 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1079 ctdb_persistent_finish_trans3_commits(ctdb);
1081 state = talloc(ctdb, struct recovery_callback_state);
1082 CTDB_NO_MEMORY(ctdb, state);
1086 ctdb_disable_monitoring(ctdb);
1088 ret = ctdb_event_script_callback(ctdb, state,
1089 ctdb_end_recovery_callback,
1091 CTDB_EVENT_RECOVERED, "%s", "");
1094 ctdb_enable_monitoring(ctdb);
1096 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1101 /* tell the control that we will be reply asynchronously */
1102 state->c = talloc_steal(state, c);
1103 *async_reply = true;
1108 called when the 'startrecovery' event script has finished
1110 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1112 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1115 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1118 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1122 static void run_start_recovery_event(struct ctdb_context *ctdb,
1123 struct recovery_callback_state *state)
1127 ctdb_disable_monitoring(ctdb);
1129 ret = ctdb_event_script_callback(ctdb, state,
1130 ctdb_start_recovery_callback,
1132 CTDB_EVENT_START_RECOVERY,
1136 DEBUG(DEBUG_ERR,("Unable to run startrecovery event\n"));
1137 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1145 static bool reclock_strings_equal(const char *a, const char *b)
1147 return (a == NULL && b == NULL) ||
1148 (a != NULL && b != NULL && strcmp(a, b) == 0);
1151 static void start_recovery_reclock_callback(struct ctdb_context *ctdb,
1154 const char *errormsg,
1157 struct recovery_callback_state *state = talloc_get_type_abort(
1158 private_data, struct recovery_callback_state);
1159 const char *local = ctdb->recovery_lock;
1160 const char *remote = NULL;
1163 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1164 ctdb_request_control_reply(ctdb, state->c, NULL,
1170 /* Check reclock consistency */
1171 if (data.dsize > 0) {
1172 /* Ensure NUL-termination */
1173 data.dptr[data.dsize-1] = '\0';
1174 remote = (const char *)data.dptr;
1176 if (! reclock_strings_equal(local, remote)) {
1178 ctdb_request_control_reply(ctdb, state->c, NULL, -1, NULL);
1180 ("Recovery lock configuration inconsistent: "
1181 "recmaster has %s, this node has %s, shutting down\n",
1182 remote == NULL ? "NULL" : remote,
1183 local == NULL ? "NULL" : local));
1185 ctdb_shutdown_sequence(ctdb, 1);
1188 ("Recovery lock consistency check successful\n"));
1190 run_start_recovery_event(ctdb, state);
1193 /* Check recovery lock consistency and run eventscripts for the
1194 * "startrecovery" event */
1195 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
1196 struct ctdb_req_control_old *c,
1200 struct recovery_callback_state *state;
1201 uint32_t recmaster = c->hdr.srcnode;
1203 DEBUG(DEBUG_NOTICE, ("Recovery has started\n"));
1204 gettimeofday(&ctdb->last_recovery_started, NULL);
1206 state = talloc(ctdb, struct recovery_callback_state);
1207 CTDB_NO_MEMORY(ctdb, state);
1211 /* Although the recovery master sent this node a start
1212 * recovery control, this node might still think the recovery
1213 * master is disconnected. In this case defer the recovery
1214 * lock consistency check. */
1215 if (ctdb->nodes[recmaster]->flags & NODE_FLAGS_DISCONNECTED) {
1216 run_start_recovery_event(ctdb, state);
1218 /* Ask the recovery master about its reclock setting */
1219 ret = ctdb_daemon_send_control(ctdb,
1222 CTDB_CONTROL_GET_RECLOCK_FILE,
1225 start_recovery_reclock_callback,
1229 DEBUG(DEBUG_ERR, (__location__ " GET_RECLOCK failed\n"));
1235 /* tell the control that we will be reply asynchronously */
1236 state->c = talloc_steal(state, c);
1237 *async_reply = true;
1243 try to delete all these records as part of the vacuuming process
1244 and return the records we failed to delete
1246 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1248 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1249 struct ctdb_db_context *ctdb_db;
1251 struct ctdb_rec_data_old *rec;
1252 struct ctdb_marshall_buffer *records;
1254 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1255 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1259 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1261 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1266 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1267 reply->count, reply->db_id));
1270 /* create a blob to send back the records we couldnt delete */
1271 records = (struct ctdb_marshall_buffer *)
1272 talloc_zero_size(outdata,
1273 offsetof(struct ctdb_marshall_buffer, data));
1274 if (records == NULL) {
1275 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1278 records->db_id = ctdb_db->db_id;
1281 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1282 for (i=0;i<reply->count;i++) {
1285 key.dptr = &rec->data[0];
1286 key.dsize = rec->keylen;
1287 data.dptr = &rec->data[key.dsize];
1288 data.dsize = rec->datalen;
1290 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1291 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1292 talloc_free(records);
1296 /* If we cant delete the record we must add it to the reply
1297 so the lmaster knows it may not purge this record
1299 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1301 struct ctdb_ltdb_header *hdr;
1303 hdr = (struct ctdb_ltdb_header *)data.dptr;
1304 data.dptr += sizeof(*hdr);
1305 data.dsize -= sizeof(*hdr);
1307 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1309 old_size = talloc_get_size(records);
1310 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1311 if (records == NULL) {
1312 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1316 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1319 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1323 *outdata = ctdb_marshall_finish(records);
1329 * Store a record as part of the vacuum process:
1330 * This is called from the RECEIVE_RECORD control which
1331 * the lmaster uses to send the current empty copy
1332 * to all nodes for storing, before it lets the other
1333 * nodes delete the records in the second phase with
1334 * the TRY_DELETE_RECORDS control.
1336 * Only store if we are not lmaster or dmaster, and our
1337 * rsn is <= the provided rsn. Use non-blocking locks.
1339 * return 0 if the record was successfully stored.
1340 * return !0 if the record still exists in the tdb after returning.
1342 static int store_tdb_record(struct ctdb_context *ctdb,
1343 struct ctdb_db_context *ctdb_db,
1344 struct ctdb_rec_data_old *rec)
1346 TDB_DATA key, data, data2;
1347 struct ctdb_ltdb_header *hdr, *hdr2;
1350 key.dsize = rec->keylen;
1351 key.dptr = &rec->data[0];
1352 data.dsize = rec->datalen;
1353 data.dptr = &rec->data[rec->keylen];
1355 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1356 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1357 "where we are lmaster\n"));
1361 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1362 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1366 hdr = (struct ctdb_ltdb_header *)data.dptr;
1368 /* use a non-blocking lock */
1369 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1370 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1374 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1375 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1376 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1377 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1381 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1386 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1388 if (hdr2->rsn > hdr->rsn) {
1389 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1390 "rsn=%llu - called with rsn=%llu\n",
1391 (unsigned long long)hdr2->rsn,
1392 (unsigned long long)hdr->rsn));
1397 /* do not allow vacuuming of records that have readonly flags set. */
1398 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1399 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1404 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1405 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1411 if (hdr2->dmaster == ctdb->pnn) {
1412 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1413 "where we are the dmaster\n"));
1418 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1419 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1427 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1435 * Try to store all these records as part of the vacuuming process
1436 * and return the records we failed to store.
1438 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1439 TDB_DATA indata, TDB_DATA *outdata)
1441 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1442 struct ctdb_db_context *ctdb_db;
1444 struct ctdb_rec_data_old *rec;
1445 struct ctdb_marshall_buffer *records;
1447 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1449 (__location__ " invalid data in receive_records\n"));
1453 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1455 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1460 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1461 "dbid 0x%x\n", reply->count, reply->db_id));
1463 /* create a blob to send back the records we could not store */
1464 records = (struct ctdb_marshall_buffer *)
1465 talloc_zero_size(outdata,
1466 offsetof(struct ctdb_marshall_buffer, data));
1467 if (records == NULL) {
1468 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1471 records->db_id = ctdb_db->db_id;
1473 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1474 for (i=0; i<reply->count; i++) {
1477 key.dptr = &rec->data[0];
1478 key.dsize = rec->keylen;
1479 data.dptr = &rec->data[key.dsize];
1480 data.dsize = rec->datalen;
1482 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1483 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1485 talloc_free(records);
1490 * If we can not store the record we must add it to the reply
1491 * so the lmaster knows it may not purge this record.
1493 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1495 struct ctdb_ltdb_header *hdr;
1497 hdr = (struct ctdb_ltdb_header *)data.dptr;
1498 data.dptr += sizeof(*hdr);
1499 data.dsize -= sizeof(*hdr);
1501 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1502 "record with hash 0x%08x in vacuum "
1503 "via RECEIVE_RECORDS\n",
1506 old_size = talloc_get_size(records);
1507 records = talloc_realloc_size(outdata, records,
1508 old_size + rec->length);
1509 if (records == NULL) {
1510 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1515 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1518 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1521 *outdata = ctdb_marshall_finish(records);
1530 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1532 uint32_t *capabilities = NULL;
1534 capabilities = talloc(outdata, uint32_t);
1535 CTDB_NO_MEMORY(ctdb, capabilities);
1536 *capabilities = ctdb->capabilities;
1538 outdata->dsize = sizeof(uint32_t);
1539 outdata->dptr = (uint8_t *)capabilities;
1544 /* The recovery daemon will ping us at regular intervals.
1545 If we havent been pinged for a while we assume the recovery
1546 daemon is inoperable and we restart.
1548 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1549 struct tevent_timer *te,
1550 struct timeval t, void *p)
1552 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1553 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1555 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1557 if (*count < ctdb->tunable.recd_ping_failcount) {
1559 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1560 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1561 ctdb_recd_ping_timeout, ctdb);
1565 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1567 ctdb_stop_recoverd(ctdb);
1568 ctdb_start_recoverd(ctdb);
1571 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1573 talloc_free(ctdb->recd_ping_count);
1575 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1576 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1578 if (ctdb->tunable.recd_ping_timeout != 0) {
1579 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1580 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1581 ctdb_recd_ping_timeout, ctdb);
1589 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1591 uint32_t new_recmaster;
1593 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1594 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1596 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1598 ("Remote node (%u) is now the recovery master\n",
1602 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1604 ("This node (%u) is now the recovery master\n",
1608 ctdb->recovery_master = new_recmaster;
1613 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1615 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1616 ctdb_disable_monitoring(ctdb);
1617 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1622 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1624 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1625 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;