4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tdb/include/tdb.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
32 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
34 CHECK_CONTROL_DATA_SIZE(0);
35 struct ctdb_vnn_map_wire *map;
38 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
39 map = talloc_size(outdata, len);
40 CTDB_NO_MEMORY(ctdb, map);
42 map->generation = ctdb->vnn_map->generation;
43 map->size = ctdb->vnn_map->size;
44 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
47 outdata->dptr = (uint8_t *)map;
53 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
55 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
58 for(i=1; i<=NUM_DB_PRIORITIES; i++) {
59 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
60 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
65 talloc_free(ctdb->vnn_map);
67 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
68 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
70 ctdb->vnn_map->generation = map->generation;
71 ctdb->vnn_map->size = map->size;
72 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
73 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
75 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
81 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
84 struct ctdb_db_context *ctdb_db;
85 struct ctdb_dbid_map *dbid_map;
87 CHECK_CONTROL_DATA_SIZE(0);
90 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
95 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
96 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
98 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
102 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
104 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
105 dbid_map->dbs[i].dbid = ctdb_db->db_id;
106 if (ctdb_db->persistent != 0) {
107 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
109 if (ctdb_db->readonly != 0) {
110 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
112 if (ctdb_db->sticky != 0) {
113 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
121 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
123 uint32_t i, num_nodes;
124 struct ctdb_node_map *node_map;
126 CHECK_CONTROL_DATA_SIZE(0);
128 num_nodes = ctdb->num_nodes;
130 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
131 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
132 if (!outdata->dptr) {
133 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
137 node_map = (struct ctdb_node_map *)outdata->dptr;
138 node_map->num = num_nodes;
139 for (i=0; i<num_nodes; i++) {
140 if (parse_ip(ctdb->nodes[i]->address.address,
141 NULL, /* TODO: pass in the correct interface here*/
143 &node_map->nodes[i].addr) == 0)
145 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
148 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
149 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
156 get an old style ipv4-only nodemap
159 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
161 uint32_t i, num_nodes;
162 struct ctdb_node_mapv4 *node_map;
164 CHECK_CONTROL_DATA_SIZE(0);
166 num_nodes = ctdb->num_nodes;
168 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
169 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
170 if (!outdata->dptr) {
171 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
175 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
176 node_map->num = num_nodes;
177 for (i=0; i<num_nodes; i++) {
178 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
179 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
183 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
184 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
191 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
192 struct timeval t, void *private_data)
195 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
197 struct ctdb_node **nodes;
199 tmp_ctx = talloc_new(ctdb);
201 /* steal the old nodes file for a while */
202 talloc_steal(tmp_ctx, ctdb->nodes);
205 num_nodes = ctdb->num_nodes;
208 /* load the new nodes file */
209 ctdb_load_nodes_file(ctdb);
211 for (i=0; i<ctdb->num_nodes; i++) {
212 /* keep any identical pre-existing nodes and connections */
213 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
214 talloc_free(ctdb->nodes[i]);
215 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
219 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
223 /* any new or different nodes must be added */
224 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
225 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
226 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
228 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
229 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
230 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
234 /* tell the recovery daemon to reaload the nodes file too */
235 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
237 talloc_free(tmp_ctx);
242 reload the nodes file after a short delay (so that we can send the response
246 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
248 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
254 a traverse function for pulling all relevent records from pulldb
257 struct ctdb_context *ctdb;
258 struct ctdb_db_context *ctdb_db;
259 struct ctdb_marshall_buffer *pulldata;
261 uint32_t allocated_len;
265 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
267 struct pulldb_data *params = (struct pulldb_data *)p;
268 struct ctdb_rec_data *rec;
269 struct ctdb_context *ctdb = params->ctdb;
270 struct ctdb_db_context *ctdb_db = params->ctdb_db;
272 /* add the record to the blob */
273 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
275 params->failed = true;
278 if (params->len + rec->length >= params->allocated_len) {
279 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
280 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
282 if (params->pulldata == NULL) {
283 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
284 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
286 params->pulldata->count++;
287 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
288 params->len += rec->length;
290 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
291 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
300 pull a bunch of records from a ltdb, filtering by lmaster
302 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
304 struct ctdb_control_pulldb *pull;
305 struct ctdb_db_context *ctdb_db;
306 struct pulldb_data params;
307 struct ctdb_marshall_buffer *reply;
309 pull = (struct ctdb_control_pulldb *)indata.dptr;
311 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
313 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
317 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
318 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
322 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
323 CTDB_NO_MEMORY(ctdb, reply);
325 reply->db_id = pull->db_id;
328 params.ctdb_db = ctdb_db;
329 params.pulldata = reply;
330 params.len = offsetof(struct ctdb_marshall_buffer, data);
331 params.allocated_len = params.len;
332 params.failed = false;
334 if (ctdb_db->unhealthy_reason) {
335 /* this is just a warning, as the tdb should be empty anyway */
336 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
337 ctdb_db->db_name, ctdb_db->unhealthy_reason));
340 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
341 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
345 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
346 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
347 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
348 talloc_free(params.pulldata);
352 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
354 outdata->dptr = (uint8_t *)params.pulldata;
355 outdata->dsize = params.len;
357 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
358 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
360 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
361 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
369 push a bunch of records into a ltdb, filtering by rsn
371 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
373 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
374 struct ctdb_db_context *ctdb_db;
376 struct ctdb_rec_data *rec;
378 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
379 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
383 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
385 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
389 if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
390 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
394 if (ctdb_lockall_mark_prio(ctdb, ctdb_db->priority) != 0) {
395 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
399 rec = (struct ctdb_rec_data *)&reply->data[0];
401 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
402 reply->count, reply->db_id));
404 for (i=0;i<reply->count;i++) {
406 struct ctdb_ltdb_header *hdr;
408 key.dptr = &rec->data[0];
409 key.dsize = rec->keylen;
410 data.dptr = &rec->data[key.dsize];
411 data.dsize = rec->datalen;
413 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
414 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
417 hdr = (struct ctdb_ltdb_header *)data.dptr;
418 /* strip off any read only record flags. All readonly records
419 are revoked implicitely by a recovery
421 hdr->flags &= ~CTDB_REC_RO_FLAGS;
423 data.dptr += sizeof(*hdr);
424 data.dsize -= sizeof(*hdr);
426 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
428 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
432 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
435 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
436 reply->count, reply->db_id));
438 if (ctdb_db->readonly) {
439 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
441 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
442 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
443 ctdb_db->readonly = false;
444 tdb_close(ctdb_db->rottdb);
445 ctdb_db->rottdb = NULL;
446 ctdb_db->readonly = false;
448 while (ctdb_db->revokechild_active != NULL) {
449 talloc_free(ctdb_db->revokechild_active);
453 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
457 ctdb_lockall_unmark_prio(ctdb, ctdb_db->priority);
461 struct ctdb_set_recmode_state {
462 struct ctdb_context *ctdb;
463 struct ctdb_req_control *c;
466 struct timed_event *te;
467 struct fd_event *fde;
469 struct timeval start_time;
473 called if our set_recmode child times out. this would happen if
474 ctdb_recovery_lock() would block.
476 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
477 struct timeval t, void *private_data)
479 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
480 struct ctdb_set_recmode_state);
482 /* we consider this a success, not a failure, as we failed to
483 set the recovery lock which is what we wanted. This can be
484 caused by the cluster filesystem being very slow to
485 arbitrate locks immediately after a node failure.
487 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
488 state->ctdb->recovery_mode = state->recmode;
489 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
494 /* when we free the recmode state we must kill any child process.
496 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
498 double l = timeval_elapsed(&state->start_time);
500 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
502 if (state->fd[0] != -1) {
505 if (state->fd[1] != -1) {
508 ctdb_kill(state->ctdb, state->child, SIGKILL);
512 /* this is called when the client process has completed ctdb_recovery_lock()
513 and has written data back to us through the pipe.
515 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
516 uint16_t flags, void *private_data)
518 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
519 struct ctdb_set_recmode_state);
523 /* we got a response from our child process so we can abort the
526 talloc_free(state->te);
530 /* read the childs status when trying to lock the reclock file.
531 child wrote 0 if everything is fine and 1 if it did manage
532 to lock the file, which would be a problem since that means
533 we got a request to exit from recovery but we could still lock
534 the file which at this time SHOULD be locked by the recovery
535 daemon on the recmaster
537 ret = read(state->fd[0], &c, 1);
538 if (ret != 1 || c != 0) {
539 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
544 state->ctdb->recovery_mode = state->recmode;
546 /* release any deferred attach calls from clients */
547 if (state->recmode == CTDB_RECOVERY_NORMAL) {
548 ctdb_process_deferred_attach(state->ctdb);
551 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
557 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
558 struct timeval t, void *private_data)
560 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
562 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
563 talloc_free(ctdb->release_ips_ctx);
564 ctdb->release_ips_ctx = NULL;
566 ctdb_release_all_ips(ctdb);
570 * Set up an event to drop all public ips if we remain in recovery for too
573 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
575 if (ctdb->release_ips_ctx != NULL) {
576 talloc_free(ctdb->release_ips_ctx);
578 ctdb->release_ips_ctx = talloc_new(ctdb);
579 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
581 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
586 set the recovery mode
588 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
589 struct ctdb_req_control *c,
590 TDB_DATA indata, bool *async_reply,
591 const char **errormsg)
593 uint32_t recmode = *(uint32_t *)indata.dptr;
595 struct ctdb_set_recmode_state *state;
596 pid_t parent = getpid();
598 /* if we enter recovery but stay in recovery for too long
599 we will eventually drop all our ip addresses
601 if (recmode == CTDB_RECOVERY_NORMAL) {
602 talloc_free(ctdb->release_ips_ctx);
603 ctdb->release_ips_ctx = NULL;
605 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
606 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
610 if (recmode != ctdb->recovery_mode) {
611 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
612 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
615 if (recmode != CTDB_RECOVERY_NORMAL ||
616 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
617 ctdb->recovery_mode = recmode;
621 /* some special handling when ending recovery mode */
623 /* force the databases to thaw */
624 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
625 if (ctdb->freeze_handles[i] != NULL) {
626 ctdb_control_thaw(ctdb, i);
630 state = talloc(ctdb, struct ctdb_set_recmode_state);
631 CTDB_NO_MEMORY(ctdb, state);
633 state->start_time = timeval_current();
637 /* release any deferred attach calls from clients */
638 if (recmode == CTDB_RECOVERY_NORMAL) {
639 ctdb_process_deferred_attach(ctdb);
642 if (ctdb->tunable.verify_recovery_lock == 0) {
643 /* dont need to verify the reclock file */
644 ctdb->recovery_mode = recmode;
648 /* For the rest of what needs to be done, we need to do this in
649 a child process since
650 1, the call to ctdb_recovery_lock() can block if the cluster
651 filesystem is in the process of recovery.
653 ret = pipe(state->fd);
656 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
660 state->child = ctdb_fork(ctdb);
661 if (state->child == (pid_t)-1) {
668 if (state->child == 0) {
672 debug_extra = talloc_asprintf(NULL, "set_recmode:");
673 /* we should not be able to get the lock on the reclock file,
674 as it should be held by the recovery master
676 if (ctdb_recovery_lock(ctdb, false)) {
677 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
681 write(state->fd[1], &cc, 1);
682 /* make sure we die when our parent dies */
683 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
685 write(state->fd[1], &cc, 1);
690 set_close_on_exec(state->fd[0]);
694 talloc_set_destructor(state, set_recmode_destructor);
696 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
698 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
699 ctdb_set_recmode_timeout, state);
701 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
706 if (state->fde == NULL) {
710 tevent_fd_set_auto_close(state->fde);
713 state->recmode = recmode;
714 state->c = talloc_steal(state, c);
723 try and get the recovery lock in shared storage - should only work
724 on the recovery master recovery daemon. Anywhere else is a bug
726 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
731 DEBUG(DEBUG_ERR, ("Take the recovery lock\n"));
733 if (ctdb->recovery_lock_fd != -1) {
734 close(ctdb->recovery_lock_fd);
735 ctdb->recovery_lock_fd = -1;
738 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
739 if (ctdb->recovery_lock_fd == -1) {
740 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
741 ctdb->recovery_lock_file, strerror(errno)));
745 set_close_on_exec(ctdb->recovery_lock_fd);
747 lock.l_type = F_WRLCK;
748 lock.l_whence = SEEK_SET;
753 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
754 close(ctdb->recovery_lock_fd);
755 ctdb->recovery_lock_fd = -1;
757 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
763 close(ctdb->recovery_lock_fd);
764 ctdb->recovery_lock_fd = -1;
768 DEBUG(DEBUG_NOTICE, ("Recovery lock taken successfully\n"));
771 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
777 delete a record as part of the vacuum process
778 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
779 use non-blocking locks
781 return 0 if the record was successfully deleted (i.e. it does not exist
782 when the function returns)
783 or !0 is the record still exists in the tdb after returning.
785 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
788 struct ctdb_ltdb_header *hdr, *hdr2;
790 /* these are really internal tdb functions - but we need them here for
791 non-blocking lock of the freelist */
792 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
793 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
796 key.dsize = rec->keylen;
797 key.dptr = &rec->data[0];
798 data.dsize = rec->datalen;
799 data.dptr = &rec->data[rec->keylen];
801 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
802 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
806 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
807 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
811 hdr = (struct ctdb_ltdb_header *)data.dptr;
813 /* use a non-blocking lock */
814 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
818 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
819 if (data.dptr == NULL) {
820 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
824 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
825 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
826 tdb_delete(ctdb_db->ltdb->tdb, key);
827 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
828 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
830 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
835 hdr2 = (struct ctdb_ltdb_header *)data.dptr;
837 if (hdr2->rsn > hdr->rsn) {
838 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
839 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
840 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
845 /* do not allow deleting record that have readonly flags set. */
846 if (hdr->flags & CTDB_REC_RO_FLAGS) {
847 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
848 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
852 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
853 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
854 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
859 if (hdr2->dmaster == ctdb->pnn) {
860 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
861 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
866 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
867 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
872 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
873 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
874 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
875 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
880 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
881 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
888 struct recovery_callback_state {
889 struct ctdb_req_control *c;
894 called when the 'recovered' event script has finished
896 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
898 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
900 ctdb_enable_monitoring(ctdb);
901 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
904 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
905 if (status == -ETIME) {
910 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
913 gettimeofday(&ctdb->last_recovery_finished, NULL);
917 recovery has finished
919 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
920 struct ctdb_req_control *c,
924 struct recovery_callback_state *state;
926 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
928 ctdb_persistent_finish_trans3_commits(ctdb);
930 state = talloc(ctdb, struct recovery_callback_state);
931 CTDB_NO_MEMORY(ctdb, state);
935 ctdb_disable_monitoring(ctdb);
937 ret = ctdb_event_script_callback(ctdb, state,
938 ctdb_end_recovery_callback,
941 CTDB_EVENT_RECOVERED, "%s", "");
944 ctdb_enable_monitoring(ctdb);
946 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
951 /* tell the control that we will be reply asynchronously */
952 state->c = talloc_steal(state, c);
958 called when the 'startrecovery' event script has finished
960 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
962 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
965 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
968 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
973 run the startrecovery eventscript
975 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
976 struct ctdb_req_control *c,
980 struct recovery_callback_state *state;
982 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
983 gettimeofday(&ctdb->last_recovery_started, NULL);
985 state = talloc(ctdb, struct recovery_callback_state);
986 CTDB_NO_MEMORY(ctdb, state);
988 state->c = talloc_steal(state, c);
990 ctdb_disable_monitoring(ctdb);
992 ret = ctdb_event_script_callback(ctdb, state,
993 ctdb_start_recovery_callback,
995 CTDB_EVENT_START_RECOVERY,
999 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1004 /* tell the control that we will be reply asynchronously */
1005 *async_reply = true;
1010 try to delete all these records as part of the vacuuming process
1011 and return the records we failed to delete
1013 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1015 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1016 struct ctdb_db_context *ctdb_db;
1018 struct ctdb_rec_data *rec;
1019 struct ctdb_marshall_buffer *records;
1021 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1022 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1026 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1028 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1033 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1034 reply->count, reply->db_id));
1037 /* create a blob to send back the records we couldnt delete */
1038 records = (struct ctdb_marshall_buffer *)
1039 talloc_zero_size(outdata,
1040 offsetof(struct ctdb_marshall_buffer, data));
1041 if (records == NULL) {
1042 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1045 records->db_id = ctdb_db->db_id;
1048 rec = (struct ctdb_rec_data *)&reply->data[0];
1049 for (i=0;i<reply->count;i++) {
1052 key.dptr = &rec->data[0];
1053 key.dsize = rec->keylen;
1054 data.dptr = &rec->data[key.dsize];
1055 data.dsize = rec->datalen;
1057 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1058 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1062 /* If we cant delete the record we must add it to the reply
1063 so the lmaster knows it may not purge this record
1065 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1067 struct ctdb_ltdb_header *hdr;
1069 hdr = (struct ctdb_ltdb_header *)data.dptr;
1070 data.dptr += sizeof(*hdr);
1071 data.dsize -= sizeof(*hdr);
1073 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1075 old_size = talloc_get_size(records);
1076 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1077 if (records == NULL) {
1078 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1082 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1085 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1089 outdata->dptr = (uint8_t *)records;
1090 outdata->dsize = talloc_get_size(records);
1096 * Store a record as part of the vacuum process:
1097 * This is called from the RECEIVE_RECORD control which
1098 * the lmaster uses to send the current empty copy
1099 * to all nodes for storing, before it lets the other
1100 * nodes delete the records in the second phase with
1101 * the TRY_DELETE_RECORDS control.
1103 * Only store if we are not lmaster or dmaster, and our
1104 * rsn is <= the provided rsn. Use non-blocking locks.
1106 * return 0 if the record was successfully stored.
1107 * return !0 if the record still exists in the tdb after returning.
1109 static int store_tdb_record(struct ctdb_context *ctdb,
1110 struct ctdb_db_context *ctdb_db,
1111 struct ctdb_rec_data *rec)
1113 TDB_DATA key, data, data2;
1114 struct ctdb_ltdb_header *hdr, *hdr2;
1117 key.dsize = rec->keylen;
1118 key.dptr = &rec->data[0];
1119 data.dsize = rec->datalen;
1120 data.dptr = &rec->data[rec->keylen];
1122 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1123 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1124 "where we are lmaster\n"));
1128 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1129 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1133 hdr = (struct ctdb_ltdb_header *)data.dptr;
1135 /* use a non-blocking lock */
1136 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1137 DEBUG(DEBUG_ERR, (__location__ " Failed to lock chain\n"));
1141 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1142 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1143 tdb_store(ctdb_db->ltdb->tdb, key, data, 0);
1144 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1149 hdr2 = (struct ctdb_ltdb_header *)data.dptr;
1151 if (hdr2->rsn > hdr->rsn) {
1152 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1153 "rsn=%llu - called with rsn=%llu\n",
1154 (unsigned long long)hdr2->rsn,
1155 (unsigned long long)hdr->rsn));
1160 /* do not allow vacuuming of records that have readonly flags set. */
1161 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1162 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1167 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1168 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1174 if (hdr2->dmaster == ctdb->pnn) {
1175 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1176 "where we are the dmaster\n"));
1181 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1182 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1190 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1198 * Try to store all these records as part of the vacuuming process
1199 * and return the records we failed to store.
1201 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1202 TDB_DATA indata, TDB_DATA *outdata)
1204 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1205 struct ctdb_db_context *ctdb_db;
1207 struct ctdb_rec_data *rec;
1208 struct ctdb_marshall_buffer *records;
1210 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1212 (__location__ " invalid data in receive_records\n"));
1216 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1218 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1223 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1224 "dbid 0x%x\n", reply->count, reply->db_id));
1226 /* create a blob to send back the records we could not store */
1227 records = (struct ctdb_marshall_buffer *)
1228 talloc_zero_size(outdata,
1229 offsetof(struct ctdb_marshall_buffer, data));
1230 if (records == NULL) {
1231 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1234 records->db_id = ctdb_db->db_id;
1236 rec = (struct ctdb_rec_data *)&reply->data[0];
1237 for (i=0; i<reply->count; i++) {
1240 key.dptr = &rec->data[0];
1241 key.dsize = rec->keylen;
1242 data.dptr = &rec->data[key.dsize];
1243 data.dsize = rec->datalen;
1245 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1246 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1252 * If we can not store the record we must add it to the reply
1253 * so the lmaster knows it may not purge this record.
1255 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1257 struct ctdb_ltdb_header *hdr;
1259 hdr = (struct ctdb_ltdb_header *)data.dptr;
1260 data.dptr += sizeof(*hdr);
1261 data.dsize -= sizeof(*hdr);
1263 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1264 "record with hash 0x%08x in vacuum "
1265 "via RECEIVE_RECORDS\n",
1268 old_size = talloc_get_size(records);
1269 records = talloc_realloc_size(outdata, records,
1270 old_size + rec->length);
1271 if (records == NULL) {
1272 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1277 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1280 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1284 outdata->dptr = (uint8_t *)records;
1285 outdata->dsize = talloc_get_size(records);
1294 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1296 uint32_t *capabilities = NULL;
1298 capabilities = talloc(outdata, uint32_t);
1299 CTDB_NO_MEMORY(ctdb, capabilities);
1300 *capabilities = ctdb->capabilities;
1302 outdata->dsize = sizeof(uint32_t);
1303 outdata->dptr = (uint8_t *)capabilities;
1308 /* The recovery daemon will ping us at regular intervals.
1309 If we havent been pinged for a while we assume the recovery
1310 daemon is inoperable and we restart.
1312 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1314 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1315 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1317 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1319 if (*count < ctdb->tunable.recd_ping_failcount) {
1321 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1322 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1323 ctdb_recd_ping_timeout, ctdb);
1327 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1329 ctdb_stop_recoverd(ctdb);
1330 ctdb_start_recoverd(ctdb);
1333 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1335 talloc_free(ctdb->recd_ping_count);
1337 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1338 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1340 if (ctdb->tunable.recd_ping_timeout != 0) {
1341 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1342 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1343 ctdb_recd_ping_timeout, ctdb);
1351 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1353 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1355 ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];
1360 struct stop_node_callback_state {
1361 struct ctdb_req_control *c;
1365 called when the 'stopped' event script has finished
1367 static void ctdb_stop_node_callback(struct ctdb_context *ctdb, int status, void *p)
1369 struct stop_node_callback_state *state = talloc_get_type(p, struct stop_node_callback_state);
1372 DEBUG(DEBUG_ERR,(__location__ " stopped event script failed (status %d)\n", status));
1373 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1374 if (status == -ETIME) {
1375 ctdb_ban_self(ctdb);
1379 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1383 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb, struct ctdb_req_control *c, bool *async_reply)
1386 struct stop_node_callback_state *state;
1388 DEBUG(DEBUG_INFO,(__location__ " Stopping node\n"));
1390 state = talloc(ctdb, struct stop_node_callback_state);
1391 CTDB_NO_MEMORY(ctdb, state);
1393 state->c = talloc_steal(state, c);
1395 ctdb_disable_monitoring(ctdb);
1397 ret = ctdb_event_script_callback(ctdb, state,
1398 ctdb_stop_node_callback,
1400 CTDB_EVENT_STOPPED, "%s", "");
1403 ctdb_enable_monitoring(ctdb);
1405 DEBUG(DEBUG_ERR,(__location__ " Failed to stop node\n"));
1410 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1412 *async_reply = true;
1417 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1419 DEBUG(DEBUG_INFO,(__location__ " Continue node\n"));
1420 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;