4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "common/system.h"
32 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
34 struct ctdb_vnn_map_wire *map;
37 CHECK_CONTROL_DATA_SIZE(0);
39 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
40 map = talloc_size(outdata, len);
41 CTDB_NO_MEMORY(ctdb, map);
43 map->generation = ctdb->vnn_map->generation;
44 map->size = ctdb->vnn_map->size;
45 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
48 outdata->dptr = (uint8_t *)map;
54 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
56 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
58 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
59 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
63 talloc_free(ctdb->vnn_map);
65 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
66 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
68 ctdb->vnn_map->generation = map->generation;
69 ctdb->vnn_map->size = map->size;
70 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
71 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
73 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
79 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
82 struct ctdb_db_context *ctdb_db;
83 struct ctdb_dbid_map *dbid_map;
85 CHECK_CONTROL_DATA_SIZE(0);
88 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
93 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
94 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
96 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
100 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
102 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
103 dbid_map->dbs[i].dbid = ctdb_db->db_id;
104 if (ctdb_db->persistent != 0) {
105 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
107 if (ctdb_db->readonly != 0) {
108 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
110 if (ctdb_db->sticky != 0) {
111 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
119 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
121 CHECK_CONTROL_DATA_SIZE(0);
123 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
126 if (outdata->dptr == NULL) {
130 outdata->dsize = talloc_get_size(outdata->dptr);
136 reload the nodes file
139 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
143 struct ctdb_node **nodes;
145 tmp_ctx = talloc_new(ctdb);
147 /* steal the old nodes file for a while */
148 talloc_steal(tmp_ctx, ctdb->nodes);
151 num_nodes = ctdb->num_nodes;
154 /* load the new nodes file */
155 ctdb_load_nodes_file(ctdb);
157 for (i=0; i<ctdb->num_nodes; i++) {
158 /* keep any identical pre-existing nodes and connections */
159 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
160 talloc_free(ctdb->nodes[i]);
161 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
165 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
169 /* any new or different nodes must be added */
170 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
171 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
172 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
174 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
175 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
176 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
180 /* tell the recovery daemon to reaload the nodes file too */
181 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
183 talloc_free(tmp_ctx);
189 a traverse function for pulling all relevent records from pulldb
192 struct ctdb_context *ctdb;
193 struct ctdb_db_context *ctdb_db;
194 struct ctdb_marshall_buffer *pulldata;
196 uint32_t allocated_len;
200 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
202 struct pulldb_data *params = (struct pulldb_data *)p;
203 struct ctdb_rec_data *rec;
204 struct ctdb_context *ctdb = params->ctdb;
205 struct ctdb_db_context *ctdb_db = params->ctdb_db;
207 /* add the record to the blob */
208 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
210 params->failed = true;
213 if (params->len + rec->length >= params->allocated_len) {
214 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
215 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
217 if (params->pulldata == NULL) {
218 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
219 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
221 params->pulldata->count++;
222 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
223 params->len += rec->length;
225 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
226 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
235 pull a bunch of records from a ltdb, filtering by lmaster
237 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
239 struct ctdb_control_pulldb *pull;
240 struct ctdb_db_context *ctdb_db;
241 struct pulldb_data params;
242 struct ctdb_marshall_buffer *reply;
244 pull = (struct ctdb_control_pulldb *)indata.dptr;
246 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
248 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
252 if (!ctdb_db_frozen(ctdb_db)) {
254 ("rejecting ctdb_control_pull_db when not frozen\n"));
258 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
259 CTDB_NO_MEMORY(ctdb, reply);
261 reply->db_id = pull->db_id;
264 params.ctdb_db = ctdb_db;
265 params.pulldata = reply;
266 params.len = offsetof(struct ctdb_marshall_buffer, data);
267 params.allocated_len = params.len;
268 params.failed = false;
270 if (ctdb_db->unhealthy_reason) {
271 /* this is just a warning, as the tdb should be empty anyway */
272 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
273 ctdb_db->db_name, ctdb_db->unhealthy_reason));
276 if (ctdb_lockdb_mark(ctdb_db) != 0) {
277 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
281 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
282 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
283 ctdb_lockdb_unmark(ctdb_db);
284 talloc_free(params.pulldata);
288 ctdb_lockdb_unmark(ctdb_db);
290 outdata->dptr = (uint8_t *)params.pulldata;
291 outdata->dsize = params.len;
293 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
294 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
296 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
297 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
305 push a bunch of records into a ltdb, filtering by rsn
307 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
309 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
310 struct ctdb_db_context *ctdb_db;
312 struct ctdb_rec_data *rec;
314 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
315 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
319 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
321 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
325 if (!ctdb_db_frozen(ctdb_db)) {
327 ("rejecting ctdb_control_push_db when not frozen\n"));
331 if (ctdb_lockdb_mark(ctdb_db) != 0) {
332 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
336 rec = (struct ctdb_rec_data *)&reply->data[0];
338 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
339 reply->count, reply->db_id));
341 for (i=0;i<reply->count;i++) {
343 struct ctdb_ltdb_header *hdr;
345 key.dptr = &rec->data[0];
346 key.dsize = rec->keylen;
347 data.dptr = &rec->data[key.dsize];
348 data.dsize = rec->datalen;
350 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
351 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
354 hdr = (struct ctdb_ltdb_header *)data.dptr;
355 /* strip off any read only record flags. All readonly records
356 are revoked implicitely by a recovery
358 hdr->flags &= ~CTDB_REC_RO_FLAGS;
360 data.dptr += sizeof(*hdr);
361 data.dsize -= sizeof(*hdr);
363 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
365 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
369 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
372 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
373 reply->count, reply->db_id));
375 if (ctdb_db->readonly) {
376 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
378 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
379 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
380 ctdb_db->readonly = false;
381 tdb_close(ctdb_db->rottdb);
382 ctdb_db->rottdb = NULL;
383 ctdb_db->readonly = false;
385 while (ctdb_db->revokechild_active != NULL) {
386 talloc_free(ctdb_db->revokechild_active);
390 ctdb_lockdb_unmark(ctdb_db);
394 ctdb_lockdb_unmark(ctdb_db);
398 struct ctdb_set_recmode_state {
399 struct ctdb_context *ctdb;
400 struct ctdb_req_control *c;
403 struct timed_event *te;
404 struct fd_event *fde;
406 struct timeval start_time;
410 called if our set_recmode child times out. this would happen if
411 ctdb_recovery_lock() would block.
413 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
414 struct timeval t, void *private_data)
416 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
417 struct ctdb_set_recmode_state);
419 /* we consider this a success, not a failure, as we failed to
420 set the recovery lock which is what we wanted. This can be
421 caused by the cluster filesystem being very slow to
422 arbitrate locks immediately after a node failure.
424 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
425 state->ctdb->recovery_mode = state->recmode;
426 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
431 /* when we free the recmode state we must kill any child process.
433 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
435 double l = timeval_elapsed(&state->start_time);
437 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
439 if (state->fd[0] != -1) {
442 if (state->fd[1] != -1) {
445 ctdb_kill(state->ctdb, state->child, SIGKILL);
449 /* this is called when the client process has completed ctdb_recovery_lock()
450 and has written data back to us through the pipe.
452 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
453 uint16_t flags, void *private_data)
455 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
456 struct ctdb_set_recmode_state);
460 /* we got a response from our child process so we can abort the
463 talloc_free(state->te);
467 /* If, as expected, the child was unable to take the recovery
468 * lock then it will have written 0 into the pipe, so
469 * continue. However, any other value (e.g. 1) indicates that
470 * it was able to take the recovery lock when it should have
471 * been held by the recovery daemon on the recovery master.
473 ret = sys_read(state->fd[0], &c, 1);
474 if (ret != 1 || c != 0) {
475 ctdb_request_control_reply(
476 state->ctdb, state->c, NULL, -1,
477 "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem");
482 state->ctdb->recovery_mode = state->recmode;
484 /* release any deferred attach calls from clients */
485 if (state->recmode == CTDB_RECOVERY_NORMAL) {
486 ctdb_process_deferred_attach(state->ctdb);
489 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
495 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
496 struct timeval t, void *private_data)
498 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
500 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
501 talloc_free(ctdb->release_ips_ctx);
502 ctdb->release_ips_ctx = NULL;
504 ctdb_release_all_ips(ctdb);
508 * Set up an event to drop all public ips if we remain in recovery for too
511 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
513 if (ctdb->release_ips_ctx != NULL) {
514 talloc_free(ctdb->release_ips_ctx);
516 ctdb->release_ips_ctx = talloc_new(ctdb);
517 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
519 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
524 set the recovery mode
526 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
527 struct ctdb_req_control *c,
528 TDB_DATA indata, bool *async_reply,
529 const char **errormsg)
531 uint32_t recmode = *(uint32_t *)indata.dptr;
533 struct ctdb_set_recmode_state *state;
534 pid_t parent = getpid();
535 struct ctdb_db_context *ctdb_db;
537 /* if we enter recovery but stay in recovery for too long
538 we will eventually drop all our ip addresses
540 if (recmode == CTDB_RECOVERY_NORMAL) {
541 talloc_free(ctdb->release_ips_ctx);
542 ctdb->release_ips_ctx = NULL;
544 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
545 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
549 if (recmode != ctdb->recovery_mode) {
550 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
551 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
554 if (recmode != CTDB_RECOVERY_NORMAL ||
555 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
556 ctdb->recovery_mode = recmode;
560 /* some special handling when ending recovery mode */
562 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
563 if (ctdb_db->generation != ctdb->vnn_map->generation) {
565 ("Inconsistent DB generation %u for %s\n",
566 ctdb_db->generation, ctdb_db->db_name));
567 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
572 /* force the databases to thaw */
573 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
574 if (ctdb_db_prio_frozen(ctdb, i)) {
575 ctdb_control_thaw(ctdb, i, false);
579 state = talloc(ctdb, struct ctdb_set_recmode_state);
580 CTDB_NO_MEMORY(ctdb, state);
582 state->start_time = timeval_current();
586 /* release any deferred attach calls from clients */
587 if (recmode == CTDB_RECOVERY_NORMAL) {
588 ctdb_process_deferred_attach(ctdb);
591 if (ctdb->recovery_lock_file == NULL) {
592 /* Not using recovery lock file */
593 ctdb->recovery_mode = recmode;
597 /* For the rest of what needs to be done, we need to do this in
598 a child process since
599 1, the call to ctdb_recovery_lock() can block if the cluster
600 filesystem is in the process of recovery.
602 ret = pipe(state->fd);
605 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
609 state->child = ctdb_fork(ctdb);
610 if (state->child == (pid_t)-1) {
617 if (state->child == 0) {
621 ctdb_set_process_name("ctdb_recmode");
622 debug_extra = talloc_asprintf(NULL, "set_recmode:");
623 /* Daemon should not be able to get the recover lock,
624 * as it should be held by the recovery master */
625 if (ctdb_recovery_lock(ctdb)) {
627 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
628 ctdb->recovery_lock_file));
629 ctdb_recovery_unlock(ctdb);
633 sys_write(state->fd[1], &cc, 1);
634 /* make sure we die when our parent dies */
635 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
637 sys_write(state->fd[1], &cc, 1);
642 set_close_on_exec(state->fd[0]);
646 talloc_set_destructor(state, set_recmode_destructor);
648 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
650 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
651 ctdb_set_recmode_timeout, state);
653 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
658 if (state->fde == NULL) {
662 tevent_fd_set_auto_close(state->fde);
665 state->recmode = recmode;
666 state->c = talloc_steal(state, c);
674 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
676 return ctdb->recovery_lock_fd != -1;
680 try and get the recovery lock in shared storage - should only work
681 on the recovery master recovery daemon. Anywhere else is a bug
683 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
687 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
688 O_RDWR|O_CREAT, 0600);
689 if (ctdb->recovery_lock_fd == -1) {
691 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
692 ctdb->recovery_lock_file, strerror(errno)));
696 set_close_on_exec(ctdb->recovery_lock_fd);
698 lock.l_type = F_WRLCK;
699 lock.l_whence = SEEK_SET;
704 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
705 int saved_errno = errno;
706 close(ctdb->recovery_lock_fd);
707 ctdb->recovery_lock_fd = -1;
708 /* Fail silently on these errors, since they indicate
709 * lock contention, but log an error for any other
711 if (saved_errno != EACCES &&
712 saved_errno != EAGAIN) {
713 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
714 "recovery lock on '%s' - (%s)\n",
715 ctdb->recovery_lock_file,
716 strerror(saved_errno)));
724 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
726 if (ctdb->recovery_lock_fd != -1) {
727 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
728 close(ctdb->recovery_lock_fd);
729 ctdb->recovery_lock_fd = -1;
734 delete a record as part of the vacuum process
735 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
736 use non-blocking locks
738 return 0 if the record was successfully deleted (i.e. it does not exist
739 when the function returns)
740 or !0 is the record still exists in the tdb after returning.
742 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
744 TDB_DATA key, data, data2;
745 struct ctdb_ltdb_header *hdr, *hdr2;
747 /* these are really internal tdb functions - but we need them here for
748 non-blocking lock of the freelist */
749 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
750 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
753 key.dsize = rec->keylen;
754 key.dptr = &rec->data[0];
755 data.dsize = rec->datalen;
756 data.dptr = &rec->data[rec->keylen];
758 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
759 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
763 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
764 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
768 hdr = (struct ctdb_ltdb_header *)data.dptr;
770 /* use a non-blocking lock */
771 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
775 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
776 if (data2.dptr == NULL) {
777 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
781 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
782 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
783 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
784 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
786 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
787 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
789 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
794 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
796 if (hdr2->rsn > hdr->rsn) {
797 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
798 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
799 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
804 /* do not allow deleting record that have readonly flags set. */
805 if (hdr->flags & CTDB_REC_RO_FLAGS) {
806 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
807 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
811 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
812 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
813 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
818 if (hdr2->dmaster == ctdb->pnn) {
819 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
820 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
825 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
826 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
831 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
832 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
833 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
834 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
839 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
840 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
847 struct recovery_callback_state {
848 struct ctdb_req_control *c;
853 called when the 'recovered' event script has finished
855 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
857 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
859 ctdb_enable_monitoring(ctdb);
860 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
863 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
864 if (status == -ETIME) {
869 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
872 gettimeofday(&ctdb->last_recovery_finished, NULL);
874 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
875 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
880 recovery has finished
882 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
883 struct ctdb_req_control *c,
887 struct recovery_callback_state *state;
889 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
891 ctdb_persistent_finish_trans3_commits(ctdb);
893 state = talloc(ctdb, struct recovery_callback_state);
894 CTDB_NO_MEMORY(ctdb, state);
898 ctdb_disable_monitoring(ctdb);
900 ret = ctdb_event_script_callback(ctdb, state,
901 ctdb_end_recovery_callback,
903 CTDB_EVENT_RECOVERED, "%s", "");
906 ctdb_enable_monitoring(ctdb);
908 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
913 /* tell the control that we will be reply asynchronously */
914 state->c = talloc_steal(state, c);
920 called when the 'startrecovery' event script has finished
922 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
924 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
927 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
930 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
935 run the startrecovery eventscript
937 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
938 struct ctdb_req_control *c,
942 struct recovery_callback_state *state;
944 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
945 gettimeofday(&ctdb->last_recovery_started, NULL);
947 state = talloc(ctdb, struct recovery_callback_state);
948 CTDB_NO_MEMORY(ctdb, state);
950 state->c = talloc_steal(state, c);
952 ctdb_disable_monitoring(ctdb);
954 ret = ctdb_event_script_callback(ctdb, state,
955 ctdb_start_recovery_callback,
957 CTDB_EVENT_START_RECOVERY,
961 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
966 /* tell the control that we will be reply asynchronously */
972 try to delete all these records as part of the vacuuming process
973 and return the records we failed to delete
975 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
977 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
978 struct ctdb_db_context *ctdb_db;
980 struct ctdb_rec_data *rec;
981 struct ctdb_marshall_buffer *records;
983 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
984 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
988 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
990 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
995 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
996 reply->count, reply->db_id));
999 /* create a blob to send back the records we couldnt delete */
1000 records = (struct ctdb_marshall_buffer *)
1001 talloc_zero_size(outdata,
1002 offsetof(struct ctdb_marshall_buffer, data));
1003 if (records == NULL) {
1004 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1007 records->db_id = ctdb_db->db_id;
1010 rec = (struct ctdb_rec_data *)&reply->data[0];
1011 for (i=0;i<reply->count;i++) {
1014 key.dptr = &rec->data[0];
1015 key.dsize = rec->keylen;
1016 data.dptr = &rec->data[key.dsize];
1017 data.dsize = rec->datalen;
1019 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1020 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1024 /* If we cant delete the record we must add it to the reply
1025 so the lmaster knows it may not purge this record
1027 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1029 struct ctdb_ltdb_header *hdr;
1031 hdr = (struct ctdb_ltdb_header *)data.dptr;
1032 data.dptr += sizeof(*hdr);
1033 data.dsize -= sizeof(*hdr);
1035 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1037 old_size = talloc_get_size(records);
1038 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1039 if (records == NULL) {
1040 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1044 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1047 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1051 *outdata = ctdb_marshall_finish(records);
1057 * Store a record as part of the vacuum process:
1058 * This is called from the RECEIVE_RECORD control which
1059 * the lmaster uses to send the current empty copy
1060 * to all nodes for storing, before it lets the other
1061 * nodes delete the records in the second phase with
1062 * the TRY_DELETE_RECORDS control.
1064 * Only store if we are not lmaster or dmaster, and our
1065 * rsn is <= the provided rsn. Use non-blocking locks.
1067 * return 0 if the record was successfully stored.
1068 * return !0 if the record still exists in the tdb after returning.
1070 static int store_tdb_record(struct ctdb_context *ctdb,
1071 struct ctdb_db_context *ctdb_db,
1072 struct ctdb_rec_data *rec)
1074 TDB_DATA key, data, data2;
1075 struct ctdb_ltdb_header *hdr, *hdr2;
1078 key.dsize = rec->keylen;
1079 key.dptr = &rec->data[0];
1080 data.dsize = rec->datalen;
1081 data.dptr = &rec->data[rec->keylen];
1083 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1084 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1085 "where we are lmaster\n"));
1089 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1090 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1094 hdr = (struct ctdb_ltdb_header *)data.dptr;
1096 /* use a non-blocking lock */
1097 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1098 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1102 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1103 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1104 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1105 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1109 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1114 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1116 if (hdr2->rsn > hdr->rsn) {
1117 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1118 "rsn=%llu - called with rsn=%llu\n",
1119 (unsigned long long)hdr2->rsn,
1120 (unsigned long long)hdr->rsn));
1125 /* do not allow vacuuming of records that have readonly flags set. */
1126 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1127 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1132 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1133 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1139 if (hdr2->dmaster == ctdb->pnn) {
1140 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1141 "where we are the dmaster\n"));
1146 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1147 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1155 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1163 * Try to store all these records as part of the vacuuming process
1164 * and return the records we failed to store.
1166 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1167 TDB_DATA indata, TDB_DATA *outdata)
1169 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1170 struct ctdb_db_context *ctdb_db;
1172 struct ctdb_rec_data *rec;
1173 struct ctdb_marshall_buffer *records;
1175 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1177 (__location__ " invalid data in receive_records\n"));
1181 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1183 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1188 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1189 "dbid 0x%x\n", reply->count, reply->db_id));
1191 /* create a blob to send back the records we could not store */
1192 records = (struct ctdb_marshall_buffer *)
1193 talloc_zero_size(outdata,
1194 offsetof(struct ctdb_marshall_buffer, data));
1195 if (records == NULL) {
1196 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1199 records->db_id = ctdb_db->db_id;
1201 rec = (struct ctdb_rec_data *)&reply->data[0];
1202 for (i=0; i<reply->count; i++) {
1205 key.dptr = &rec->data[0];
1206 key.dsize = rec->keylen;
1207 data.dptr = &rec->data[key.dsize];
1208 data.dsize = rec->datalen;
1210 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1211 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1217 * If we can not store the record we must add it to the reply
1218 * so the lmaster knows it may not purge this record.
1220 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1222 struct ctdb_ltdb_header *hdr;
1224 hdr = (struct ctdb_ltdb_header *)data.dptr;
1225 data.dptr += sizeof(*hdr);
1226 data.dsize -= sizeof(*hdr);
1228 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1229 "record with hash 0x%08x in vacuum "
1230 "via RECEIVE_RECORDS\n",
1233 old_size = talloc_get_size(records);
1234 records = talloc_realloc_size(outdata, records,
1235 old_size + rec->length);
1236 if (records == NULL) {
1237 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1242 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1245 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1248 *outdata = ctdb_marshall_finish(records);
1257 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1259 uint32_t *capabilities = NULL;
1261 capabilities = talloc(outdata, uint32_t);
1262 CTDB_NO_MEMORY(ctdb, capabilities);
1263 *capabilities = ctdb->capabilities;
1265 outdata->dsize = sizeof(uint32_t);
1266 outdata->dptr = (uint8_t *)capabilities;
1271 /* The recovery daemon will ping us at regular intervals.
1272 If we havent been pinged for a while we assume the recovery
1273 daemon is inoperable and we restart.
1275 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1277 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1278 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1280 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1282 if (*count < ctdb->tunable.recd_ping_failcount) {
1284 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1285 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1286 ctdb_recd_ping_timeout, ctdb);
1290 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1292 ctdb_stop_recoverd(ctdb);
1293 ctdb_start_recoverd(ctdb);
1296 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1298 talloc_free(ctdb->recd_ping_count);
1300 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1301 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1303 if (ctdb->tunable.recd_ping_timeout != 0) {
1304 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1305 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1306 ctdb_recd_ping_timeout, ctdb);
1314 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1316 uint32_t new_recmaster;
1318 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1319 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1321 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1323 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1326 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1328 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1331 ctdb->recovery_master = new_recmaster;
1336 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1338 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1339 ctdb_disable_monitoring(ctdb);
1340 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1345 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1347 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1348 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;