4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
32 lock all databases - mark only
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
36 struct ctdb_db_context *ctdb_db;
37 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
38 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
41 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
42 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
50 lock all databases - unmark only
52 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
54 struct ctdb_db_context *ctdb_db;
55 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
56 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
59 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
60 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
69 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
71 CHECK_CONTROL_DATA_SIZE(0);
72 struct ctdb_vnn_map_wire *map;
75 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
76 map = talloc_size(outdata, len);
77 CTDB_NO_MEMORY(ctdb, map);
79 map->generation = ctdb->vnn_map->generation;
80 map->size = ctdb->vnn_map->size;
81 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
84 outdata->dptr = (uint8_t *)map;
90 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
92 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
94 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
95 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
99 talloc_free(ctdb->vnn_map);
101 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
102 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
104 ctdb->vnn_map->generation = map->generation;
105 ctdb->vnn_map->size = map->size;
106 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
107 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
109 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
115 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
118 struct ctdb_db_context *ctdb_db;
119 struct ctdb_dbid_map *dbid_map;
121 CHECK_CONTROL_DATA_SIZE(0);
124 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
129 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
130 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
131 if (!outdata->dptr) {
132 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
136 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
138 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
139 dbid_map->dbs[i].dbid = ctdb_db->db_id;
140 dbid_map->dbs[i].persistent = ctdb_db->persistent;
147 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
149 uint32_t i, num_nodes;
150 struct ctdb_node_map *node_map;
152 CHECK_CONTROL_DATA_SIZE(0);
154 num_nodes = ctdb->num_nodes;
156 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
157 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
158 if (!outdata->dptr) {
159 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
163 node_map = (struct ctdb_node_map *)outdata->dptr;
164 node_map->num = num_nodes;
165 for (i=0; i<num_nodes; i++) {
166 if (parse_ip(ctdb->nodes[i]->address.address,
167 NULL, /* TODO: pass in the correct interface here*/
169 &node_map->nodes[i].addr) == 0)
171 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
174 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
175 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
182 get an old style ipv4-only nodemap
185 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
187 uint32_t i, num_nodes;
188 struct ctdb_node_mapv4 *node_map;
190 CHECK_CONTROL_DATA_SIZE(0);
192 num_nodes = ctdb->num_nodes;
194 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
195 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
196 if (!outdata->dptr) {
197 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
201 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
202 node_map->num = num_nodes;
203 for (i=0; i<num_nodes; i++) {
204 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
205 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
209 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
210 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
217 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
218 struct timeval t, void *private_data)
221 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
223 struct ctdb_node **nodes;
225 tmp_ctx = talloc_new(ctdb);
227 /* steal the old nodes file for a while */
228 talloc_steal(tmp_ctx, ctdb->nodes);
231 num_nodes = ctdb->num_nodes;
234 /* load the new nodes file */
235 ctdb_load_nodes_file(ctdb);
237 for (i=0; i<ctdb->num_nodes; i++) {
238 /* keep any identical pre-existing nodes and connections */
239 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
240 talloc_free(ctdb->nodes[i]);
241 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
245 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
249 /* any new or different nodes must be added */
250 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
251 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
252 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
254 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
255 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
256 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
260 /* tell the recovery daemon to reaload the nodes file too */
261 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
263 talloc_free(tmp_ctx);
268 reload the nodes file after a short delay (so that we can send the response
272 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
274 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
280 a traverse function for pulling all relevent records from pulldb
283 struct ctdb_context *ctdb;
284 struct ctdb_marshall_buffer *pulldata;
289 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
291 struct pulldb_data *params = (struct pulldb_data *)p;
292 struct ctdb_rec_data *rec;
294 /* add the record to the blob */
295 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
297 params->failed = true;
300 params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
301 if (params->pulldata == NULL) {
302 DEBUG(DEBUG_ERR,(__location__ " Failed to expand pulldb_data to %u (%u records)\n",
303 rec->length + params->len, params->pulldata->count));
304 params->failed = true;
307 params->pulldata->count++;
308 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
309 params->len += rec->length;
316 pul a bunch of records from a ltdb, filtering by lmaster
318 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
320 struct ctdb_control_pulldb *pull;
321 struct ctdb_db_context *ctdb_db;
322 struct pulldb_data params;
323 struct ctdb_marshall_buffer *reply;
325 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
326 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
330 pull = (struct ctdb_control_pulldb *)indata.dptr;
332 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
334 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
338 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
339 CTDB_NO_MEMORY(ctdb, reply);
341 reply->db_id = pull->db_id;
344 params.pulldata = reply;
345 params.len = offsetof(struct ctdb_marshall_buffer, data);
346 params.failed = false;
348 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
349 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
353 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
354 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
355 ctdb_lock_all_databases_unmark(ctdb);
356 talloc_free(params.pulldata);
360 ctdb_lock_all_databases_unmark(ctdb);
362 outdata->dptr = (uint8_t *)params.pulldata;
363 outdata->dsize = params.len;
369 push a bunch of records into a ltdb, filtering by rsn
371 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
373 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
374 struct ctdb_db_context *ctdb_db;
376 struct ctdb_rec_data *rec;
378 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
379 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
383 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
384 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
388 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
390 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
394 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
395 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
399 rec = (struct ctdb_rec_data *)&reply->data[0];
401 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
402 reply->count, reply->db_id));
404 for (i=0;i<reply->count;i++) {
406 struct ctdb_ltdb_header *hdr;
408 key.dptr = &rec->data[0];
409 key.dsize = rec->keylen;
410 data.dptr = &rec->data[key.dsize];
411 data.dsize = rec->datalen;
413 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
414 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
417 hdr = (struct ctdb_ltdb_header *)data.dptr;
418 data.dptr += sizeof(*hdr);
419 data.dsize -= sizeof(*hdr);
421 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
423 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
427 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
430 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
431 reply->count, reply->db_id));
433 ctdb_lock_all_databases_unmark(ctdb);
437 ctdb_lock_all_databases_unmark(ctdb);
442 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
444 uint32_t *dmaster = (uint32_t *)p;
445 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
448 /* skip if already correct */
449 if (header->dmaster == *dmaster) {
453 header->dmaster = *dmaster;
455 ret = tdb_store(tdb, key, data, TDB_REPLACE);
457 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back ret:%d\n",ret));
461 /* TODO: add error checking here */
466 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
468 struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
469 struct ctdb_db_context *ctdb_db;
471 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
472 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
476 ctdb_db = find_ctdb_db(ctdb, p->db_id);
478 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
482 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
483 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
487 tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
489 ctdb_lock_all_databases_unmark(ctdb);
494 struct ctdb_set_recmode_state {
495 struct ctdb_context *ctdb;
496 struct ctdb_req_control *c;
499 struct timed_event *te;
500 struct fd_event *fde;
502 struct timeval start_time;
506 called if our set_recmode child times out. this would happen if
507 ctdb_recovery_lock() would block.
509 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
510 struct timeval t, void *private_data)
512 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
513 struct ctdb_set_recmode_state);
515 /* we consider this a success, not a failure, as we failed to
516 set the recovery lock which is what we wanted. This can be
517 caused by the cluster filesystem being very slow to
518 arbitrate locks immediately after a node failure.
520 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
521 state->ctdb->recovery_mode = state->recmode;
522 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
527 /* when we free the recmode state we must kill any child process.
529 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
531 double l = timeval_elapsed(&state->start_time);
533 ctdb_reclock_latency(state->ctdb, "daemon reclock", &state->ctdb->statistics.reclock.ctdbd, l);
535 if (state->fd[0] != -1) {
538 if (state->fd[1] != -1) {
541 kill(state->child, SIGKILL);
545 /* this is called when the client process has completed ctdb_recovery_lock()
546 and has written data back to us through the pipe.
548 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
549 uint16_t flags, void *private_data)
551 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
552 struct ctdb_set_recmode_state);
556 /* we got a response from our child process so we can abort the
559 talloc_free(state->te);
563 /* read the childs status when trying to lock the reclock file.
564 child wrote 0 if everything is fine and 1 if it did manage
565 to lock the file, which would be a problem since that means
566 we got a request to exit from recovery but we could still lock
567 the file which at this time SHOULD be locked by the recovery
568 daemon on the recmaster
570 ret = read(state->fd[0], &c, 1);
571 if (ret != 1 || c != 0) {
572 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
577 state->ctdb->recovery_mode = state->recmode;
579 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
585 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
586 struct timeval t, void *private_data)
588 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
590 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
591 talloc_free(ctdb->release_ips_ctx);
592 ctdb->release_ips_ctx = NULL;
594 ctdb_release_all_ips(ctdb);
598 set the recovery mode
600 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
601 struct ctdb_req_control *c,
602 TDB_DATA indata, bool *async_reply,
603 const char **errormsg)
605 uint32_t recmode = *(uint32_t *)indata.dptr;
607 struct ctdb_set_recmode_state *state;
608 pid_t parent = getpid();
610 /* if we enter recovery but stay in recovery for too long
611 we will eventually drop all our ip addresses
613 if (recmode == CTDB_RECOVERY_NORMAL) {
614 talloc_free(ctdb->release_ips_ctx);
615 ctdb->release_ips_ctx = NULL;
617 talloc_free(ctdb->release_ips_ctx);
618 ctdb->release_ips_ctx = talloc_new(ctdb);
619 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
621 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
625 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
626 DEBUG(DEBUG_ERR,("Attempt to change recovery mode to %u when not frozen\n",
628 (*errormsg) = "Cannot change recovery mode while not frozen";
632 if (recmode != ctdb->recovery_mode) {
633 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
634 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
637 if (recmode != CTDB_RECOVERY_NORMAL ||
638 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
639 ctdb->recovery_mode = recmode;
643 /* some special handling when ending recovery mode */
645 /* force the databased to thaw */
646 if (ctdb->freeze_handle) {
647 ctdb_control_thaw(ctdb);
650 state = talloc(ctdb, struct ctdb_set_recmode_state);
651 CTDB_NO_MEMORY(ctdb, state);
653 state->start_time = timeval_current();
657 if (ctdb->tunable.verify_recovery_lock == 0) {
658 /* dont need to verify the reclock file */
659 ctdb->recovery_mode = recmode;
663 /* For the rest of what needs to be done, we need to do this in
664 a child process since
665 1, the call to ctdb_recovery_lock() can block if the cluster
666 filesystem is in the process of recovery.
668 ret = pipe(state->fd);
671 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
675 state->child = fork();
676 if (state->child == (pid_t)-1) {
683 if (state->child == 0) {
687 /* we should not be able to get the lock on the reclock file,
688 as it should be held by the recovery master
690 if (ctdb_recovery_lock(ctdb, false)) {
691 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
695 write(state->fd[1], &cc, 1);
696 /* make sure we die when our parent dies */
697 while (kill(parent, 0) == 0 || errno != ESRCH) {
699 write(state->fd[1], &cc, 1);
706 talloc_set_destructor(state, set_recmode_destructor);
708 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
709 ctdb_set_recmode_timeout, state);
711 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
712 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
716 if (state->fde == NULL) {
722 state->recmode = recmode;
723 state->c = talloc_steal(state, c);
732 try and get the recovery lock in shared storage - should only work
733 on the recovery master recovery daemon. Anywhere else is a bug
735 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
740 DEBUG(DEBUG_ERR, ("Take the recovery lock\n"));
742 if (ctdb->recovery_lock_fd != -1) {
743 close(ctdb->recovery_lock_fd);
744 ctdb->recovery_lock_fd = -1;
747 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
748 if (ctdb->recovery_lock_fd == -1) {
749 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
750 ctdb->recovery_lock_file, strerror(errno)));
754 set_close_on_exec(ctdb->recovery_lock_fd);
756 lock.l_type = F_WRLCK;
757 lock.l_whence = SEEK_SET;
762 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
763 close(ctdb->recovery_lock_fd);
764 ctdb->recovery_lock_fd = -1;
766 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
772 close(ctdb->recovery_lock_fd);
773 ctdb->recovery_lock_fd = -1;
777 DEBUG(DEBUG_ERR, ("Recovery lock taken successfully\n"));
780 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
786 delete a record as part of the vacuum process
787 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
788 use non-blocking locks
790 return 0 if the record was successfully deleted (i.e. it does not exist
791 when the function returns)
792 or !0 is the record still exists in the tdb after returning.
794 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
797 struct ctdb_ltdb_header *hdr, *hdr2;
799 /* these are really internal tdb functions - but we need them here for
800 non-blocking lock of the freelist */
801 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
802 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
805 key.dsize = rec->keylen;
806 key.dptr = &rec->data[0];
807 data.dsize = rec->datalen;
808 data.dptr = &rec->data[rec->keylen];
810 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
811 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
815 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
816 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
820 hdr = (struct ctdb_ltdb_header *)data.dptr;
822 /* use a non-blocking lock */
823 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
827 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
828 if (data.dptr == NULL) {
829 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
833 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
834 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
835 tdb_delete(ctdb_db->ltdb->tdb, key);
836 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
837 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
839 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
844 hdr2 = (struct ctdb_ltdb_header *)data.dptr;
846 if (hdr2->rsn > hdr->rsn) {
847 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
848 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
849 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
854 if (hdr2->dmaster == ctdb->pnn) {
855 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
856 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
861 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
862 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
867 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
868 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
869 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
870 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
875 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
876 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
883 struct recovery_callback_state {
884 struct ctdb_req_control *c;
889 called when the 'recovered' event script has finished
891 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
893 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
895 ctdb_enable_monitoring(ctdb);
898 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
901 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
904 gettimeofday(&ctdb->last_recovery_finished, NULL);
908 recovery has finished
910 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
911 struct ctdb_req_control *c,
915 struct recovery_callback_state *state;
917 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
919 state = talloc(ctdb, struct recovery_callback_state);
920 CTDB_NO_MEMORY(ctdb, state);
922 state->c = talloc_steal(state, c);
924 ctdb_disable_monitoring(ctdb);
926 ret = ctdb_event_script_callback(ctdb,
927 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
929 ctdb_end_recovery_callback,
933 ctdb_enable_monitoring(ctdb);
935 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
940 /* tell the control that we will be reply asynchronously */
946 called when the 'startrecovery' event script has finished
948 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
950 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
953 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
956 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
961 run the startrecovery eventscript
963 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
964 struct ctdb_req_control *c,
968 struct recovery_callback_state *state;
970 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
971 gettimeofday(&ctdb->last_recovery_started, NULL);
973 state = talloc(ctdb, struct recovery_callback_state);
974 CTDB_NO_MEMORY(ctdb, state);
976 state->c = talloc_steal(state, c);
978 ctdb_disable_monitoring(ctdb);
980 ret = ctdb_event_script_callback(ctdb,
981 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
983 ctdb_start_recovery_callback,
984 state, "startrecovery");
987 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
992 /* tell the control that we will be reply asynchronously */
998 try to delete all these records as part of the vacuuming process
999 and return the records we failed to delete
1001 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1003 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1004 struct ctdb_db_context *ctdb_db;
1006 struct ctdb_rec_data *rec;
1007 struct ctdb_marshall_buffer *records;
1009 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1010 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1014 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1016 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1021 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1022 reply->count, reply->db_id));
1025 /* create a blob to send back the records we couldnt delete */
1026 records = (struct ctdb_marshall_buffer *)
1027 talloc_zero_size(outdata,
1028 offsetof(struct ctdb_marshall_buffer, data));
1029 if (records == NULL) {
1030 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1033 records->db_id = ctdb_db->db_id;
1036 rec = (struct ctdb_rec_data *)&reply->data[0];
1037 for (i=0;i<reply->count;i++) {
1040 key.dptr = &rec->data[0];
1041 key.dsize = rec->keylen;
1042 data.dptr = &rec->data[key.dsize];
1043 data.dsize = rec->datalen;
1045 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1046 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1050 /* If we cant delete the record we must add it to the reply
1051 so the lmaster knows it may not purge this record
1053 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1055 struct ctdb_ltdb_header *hdr;
1057 hdr = (struct ctdb_ltdb_header *)data.dptr;
1058 data.dptr += sizeof(*hdr);
1059 data.dsize -= sizeof(*hdr);
1061 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1063 old_size = talloc_get_size(records);
1064 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1065 if (records == NULL) {
1066 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1070 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1073 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1077 outdata->dptr = (uint8_t *)records;
1078 outdata->dsize = talloc_get_size(records);
1086 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1088 uint32_t *capabilities = NULL;
1090 capabilities = talloc(outdata, uint32_t);
1091 CTDB_NO_MEMORY(ctdb, capabilities);
1092 *capabilities = ctdb->capabilities;
1094 outdata->dsize = sizeof(uint32_t);
1095 outdata->dptr = (uint8_t *)capabilities;
1100 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1102 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1103 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1105 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1107 if (*count < ctdb->tunable.recd_ping_failcount) {
1109 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1110 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1111 ctdb_recd_ping_timeout, ctdb);
1115 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Shutting down ctdb daemon. (This can be caused if the cluster filesystem has hung)\n"));
1117 ctdb_stop_recoverd(ctdb);
1118 ctdb_stop_keepalive(ctdb);
1119 ctdb_stop_monitoring(ctdb);
1120 ctdb_release_all_ips(ctdb);
1121 if (ctdb->methods != NULL) {
1122 ctdb->methods->shutdown(ctdb);
1124 ctdb_event_script(ctdb, "shutdown");
1125 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Daemon has been shut down.\n"));
1129 /* The recovery daemon will ping us at regular intervals.
1130 If we havent been pinged for a while we assume the recovery
1131 daemon is inoperable and we shut down.
1133 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1135 talloc_free(ctdb->recd_ping_count);
1137 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1138 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1140 if (ctdb->tunable.recd_ping_timeout != 0) {
1141 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1142 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1143 ctdb_recd_ping_timeout, ctdb);
1151 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1153 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1155 ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];
1159 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1161 DEBUG(DEBUG_ERR,(__location__ " Stopping node\n"));
1162 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1167 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1169 DEBUG(DEBUG_ERR,(__location__ " Continue node\n"));
1170 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;