4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/wait.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
35 #include "ctdb_private.h"
36 #include "ctdb_client.h"
37 #include "ctdb_logging.h"
39 #include "common/system.h"
40 #include "common/common.h"
43 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
45 struct ctdb_vnn_map_wire *map;
48 CHECK_CONTROL_DATA_SIZE(0);
50 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
51 map = talloc_size(outdata, len);
52 CTDB_NO_MEMORY(ctdb, map);
54 map->generation = ctdb->vnn_map->generation;
55 map->size = ctdb->vnn_map->size;
56 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
59 outdata->dptr = (uint8_t *)map;
65 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
67 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
69 if (ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
70 DEBUG(DEBUG_ERR, ("Attempt to set vnnmap when not in recovery\n"));
74 talloc_free(ctdb->vnn_map);
76 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
77 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
79 ctdb->vnn_map->generation = map->generation;
80 ctdb->vnn_map->size = map->size;
81 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
82 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
84 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
90 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
93 struct ctdb_db_context *ctdb_db;
94 struct ctdb_dbid_map_old *dbid_map;
96 CHECK_CONTROL_DATA_SIZE(0);
99 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
104 outdata->dsize = offsetof(struct ctdb_dbid_map_old, dbs) + sizeof(dbid_map->dbs[0])*len;
105 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
106 if (!outdata->dptr) {
107 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
111 dbid_map = (struct ctdb_dbid_map_old *)outdata->dptr;
113 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
114 dbid_map->dbs[i].db_id = ctdb_db->db_id;
115 if (ctdb_db->persistent != 0) {
116 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
118 if (ctdb_db->readonly != 0) {
119 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
121 if (ctdb_db->sticky != 0) {
122 dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
130 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
132 CHECK_CONTROL_DATA_SIZE(0);
134 outdata->dptr = (unsigned char *)ctdb_node_list_to_map(ctdb->nodes,
137 if (outdata->dptr == NULL) {
141 outdata->dsize = talloc_get_size(outdata->dptr);
147 reload the nodes file
150 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
154 struct ctdb_node **nodes;
156 tmp_ctx = talloc_new(ctdb);
158 /* steal the old nodes file for a while */
159 talloc_steal(tmp_ctx, ctdb->nodes);
162 num_nodes = ctdb->num_nodes;
165 /* load the new nodes file */
166 ctdb_load_nodes_file(ctdb);
168 for (i=0; i<ctdb->num_nodes; i++) {
169 /* keep any identical pre-existing nodes and connections */
170 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
171 talloc_free(ctdb->nodes[i]);
172 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
176 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
180 /* any new or different nodes must be added */
181 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
182 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
183 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
185 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
186 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
187 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
191 /* tell the recovery daemon to reaload the nodes file too */
192 ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
194 talloc_free(tmp_ctx);
200 a traverse function for pulling all relevent records from pulldb
203 struct ctdb_context *ctdb;
204 struct ctdb_db_context *ctdb_db;
205 struct ctdb_marshall_buffer *pulldata;
207 uint32_t allocated_len;
211 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
213 struct pulldb_data *params = (struct pulldb_data *)p;
214 struct ctdb_rec_data_old *rec;
215 struct ctdb_context *ctdb = params->ctdb;
216 struct ctdb_db_context *ctdb_db = params->ctdb_db;
218 /* add the record to the blob */
219 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
221 params->failed = true;
224 if (params->len + rec->length >= params->allocated_len) {
225 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
226 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
228 if (params->pulldata == NULL) {
229 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
230 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
232 params->pulldata->count++;
233 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
234 params->len += rec->length;
236 if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
237 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
246 pull a bunch of records from a ltdb, filtering by lmaster
248 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
250 struct ctdb_pulldb *pull;
251 struct ctdb_db_context *ctdb_db;
252 struct pulldb_data params;
253 struct ctdb_marshall_buffer *reply;
255 pull = (struct ctdb_pulldb *)indata.dptr;
257 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
259 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
263 if (!ctdb_db_frozen(ctdb_db)) {
265 ("rejecting ctdb_control_pull_db when not frozen\n"));
269 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
270 CTDB_NO_MEMORY(ctdb, reply);
272 reply->db_id = pull->db_id;
275 params.ctdb_db = ctdb_db;
276 params.pulldata = reply;
277 params.len = offsetof(struct ctdb_marshall_buffer, data);
278 params.allocated_len = params.len;
279 params.failed = false;
281 if (ctdb_db->unhealthy_reason) {
282 /* this is just a warning, as the tdb should be empty anyway */
283 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
284 ctdb_db->db_name, ctdb_db->unhealthy_reason));
287 if (ctdb_lockdb_mark(ctdb_db) != 0) {
288 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
292 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
293 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
294 ctdb_lockdb_unmark(ctdb_db);
295 talloc_free(params.pulldata);
299 ctdb_lockdb_unmark(ctdb_db);
301 outdata->dptr = (uint8_t *)params.pulldata;
302 outdata->dsize = params.len;
304 if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
305 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
307 if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
308 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
316 push a bunch of records into a ltdb, filtering by rsn
318 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
320 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
321 struct ctdb_db_context *ctdb_db;
323 struct ctdb_rec_data_old *rec;
325 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
326 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
330 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
332 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
336 if (!ctdb_db_frozen(ctdb_db)) {
338 ("rejecting ctdb_control_push_db when not frozen\n"));
342 if (ctdb_lockdb_mark(ctdb_db) != 0) {
343 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entire db - failing\n"));
347 rec = (struct ctdb_rec_data_old *)&reply->data[0];
349 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
350 reply->count, reply->db_id));
352 for (i=0;i<reply->count;i++) {
354 struct ctdb_ltdb_header *hdr;
356 key.dptr = &rec->data[0];
357 key.dsize = rec->keylen;
358 data.dptr = &rec->data[key.dsize];
359 data.dsize = rec->datalen;
361 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
362 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
365 hdr = (struct ctdb_ltdb_header *)data.dptr;
366 /* strip off any read only record flags. All readonly records
367 are revoked implicitely by a recovery
369 hdr->flags &= ~CTDB_REC_RO_FLAGS;
371 data.dptr += sizeof(*hdr);
372 data.dsize -= sizeof(*hdr);
374 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
376 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
380 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
383 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
384 reply->count, reply->db_id));
386 if (ctdb_db->readonly) {
387 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
389 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
390 DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
391 ctdb_db->readonly = false;
392 tdb_close(ctdb_db->rottdb);
393 ctdb_db->rottdb = NULL;
394 ctdb_db->readonly = false;
396 while (ctdb_db->revokechild_active != NULL) {
397 talloc_free(ctdb_db->revokechild_active);
401 ctdb_lockdb_unmark(ctdb_db);
405 ctdb_lockdb_unmark(ctdb_db);
409 struct ctdb_set_recmode_state {
410 struct ctdb_context *ctdb;
411 struct ctdb_req_control_old *c;
414 struct tevent_timer *te;
415 struct tevent_fd *fde;
417 struct timeval start_time;
421 called if our set_recmode child times out. this would happen if
422 ctdb_recovery_lock() would block.
424 static void ctdb_set_recmode_timeout(struct tevent_context *ev,
425 struct tevent_timer *te,
426 struct timeval t, void *private_data)
428 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
429 struct ctdb_set_recmode_state);
431 /* we consider this a success, not a failure, as we failed to
432 set the recovery lock which is what we wanted. This can be
433 caused by the cluster filesystem being very slow to
434 arbitrate locks immediately after a node failure.
436 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
437 state->ctdb->recovery_mode = state->recmode;
438 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
443 /* when we free the recmode state we must kill any child process.
445 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
447 double l = timeval_elapsed(&state->start_time);
449 CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
451 if (state->fd[0] != -1) {
454 if (state->fd[1] != -1) {
457 ctdb_kill(state->ctdb, state->child, SIGKILL);
461 /* this is called when the client process has completed ctdb_recovery_lock()
462 and has written data back to us through the pipe.
464 static void set_recmode_handler(struct tevent_context *ev,
465 struct tevent_fd *fde,
466 uint16_t flags, void *private_data)
468 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
469 struct ctdb_set_recmode_state);
473 /* we got a response from our child process so we can abort the
476 talloc_free(state->te);
480 /* If, as expected, the child was unable to take the recovery
481 * lock then it will have written 0 into the pipe, so
482 * continue. However, any other value (e.g. 1) indicates that
483 * it was able to take the recovery lock when it should have
484 * been held by the recovery daemon on the recovery master.
486 ret = sys_read(state->fd[0], &c, 1);
487 if (ret != 1 || c != 0) {
488 ctdb_request_control_reply(
489 state->ctdb, state->c, NULL, -1,
490 "Took recovery lock from daemon during recovery - probably a cluster filesystem lock coherence problem");
495 state->ctdb->recovery_mode = state->recmode;
497 /* release any deferred attach calls from clients */
498 if (state->recmode == CTDB_RECOVERY_NORMAL) {
499 ctdb_process_deferred_attach(state->ctdb);
502 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
508 ctdb_drop_all_ips_event(struct tevent_context *ev, struct tevent_timer *te,
509 struct timeval t, void *private_data)
511 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
513 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
514 talloc_free(ctdb->release_ips_ctx);
515 ctdb->release_ips_ctx = NULL;
517 ctdb_release_all_ips(ctdb);
521 * Set up an event to drop all public ips if we remain in recovery for too
524 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
526 if (ctdb->release_ips_ctx != NULL) {
527 talloc_free(ctdb->release_ips_ctx);
529 ctdb->release_ips_ctx = talloc_new(ctdb);
530 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
532 tevent_add_timer(ctdb->ev, ctdb->release_ips_ctx,
533 timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0),
534 ctdb_drop_all_ips_event, ctdb);
539 set the recovery mode
541 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
542 struct ctdb_req_control_old *c,
543 TDB_DATA indata, bool *async_reply,
544 const char **errormsg)
546 uint32_t recmode = *(uint32_t *)indata.dptr;
548 struct ctdb_set_recmode_state *state;
549 pid_t parent = getpid();
550 struct ctdb_db_context *ctdb_db;
552 /* if we enter recovery but stay in recovery for too long
553 we will eventually drop all our ip addresses
555 if (recmode == CTDB_RECOVERY_NORMAL) {
556 talloc_free(ctdb->release_ips_ctx);
557 ctdb->release_ips_ctx = NULL;
559 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
560 DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
564 if (recmode != ctdb->recovery_mode) {
565 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
566 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
569 if (recmode != CTDB_RECOVERY_NORMAL ||
570 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
571 ctdb->recovery_mode = recmode;
575 /* some special handling when ending recovery mode */
577 for (ctdb_db = ctdb->db_list; ctdb_db != NULL; ctdb_db = ctdb_db->next) {
578 if (ctdb_db->generation != ctdb->vnn_map->generation) {
580 ("Inconsistent DB generation %u for %s\n",
581 ctdb_db->generation, ctdb_db->db_name));
582 DEBUG(DEBUG_ERR, ("Recovery mode set to ACTIVE\n"));
587 /* force the databases to thaw */
588 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
589 if (ctdb_db_prio_frozen(ctdb, i)) {
590 ctdb_control_thaw(ctdb, i, false);
594 state = talloc(ctdb, struct ctdb_set_recmode_state);
595 CTDB_NO_MEMORY(ctdb, state);
597 state->start_time = timeval_current();
601 /* release any deferred attach calls from clients */
602 if (recmode == CTDB_RECOVERY_NORMAL) {
603 ctdb_process_deferred_attach(ctdb);
606 if (ctdb->recovery_lock_file == NULL) {
607 /* Not using recovery lock file */
608 ctdb->recovery_mode = recmode;
612 /* For the rest of what needs to be done, we need to do this in
613 a child process since
614 1, the call to ctdb_recovery_lock() can block if the cluster
615 filesystem is in the process of recovery.
617 ret = pipe(state->fd);
620 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
624 state->child = ctdb_fork(ctdb);
625 if (state->child == (pid_t)-1) {
632 if (state->child == 0) {
636 ctdb_set_process_name("ctdb_recmode");
637 debug_extra = talloc_asprintf(NULL, "set_recmode:");
638 /* Daemon should not be able to get the recover lock,
639 * as it should be held by the recovery master */
640 if (ctdb_recovery_lock(ctdb)) {
642 ("ERROR: Daemon able to take recovery lock on \"%s\" during recovery\n",
643 ctdb->recovery_lock_file));
644 ctdb_recovery_unlock(ctdb);
648 sys_write(state->fd[1], &cc, 1);
649 /* make sure we die when our parent dies */
650 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
652 sys_write(state->fd[1], &cc, 1);
657 set_close_on_exec(state->fd[0]);
661 talloc_set_destructor(state, set_recmode_destructor);
663 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
665 state->te = tevent_add_timer(ctdb->ev, state, timeval_current_ofs(5, 0),
666 ctdb_set_recmode_timeout, state);
668 state->fde = tevent_add_fd(ctdb->ev, state, state->fd[0], TEVENT_FD_READ,
669 set_recmode_handler, (void *)state);
671 if (state->fde == NULL) {
675 tevent_fd_set_auto_close(state->fde);
678 state->recmode = recmode;
679 state->c = talloc_steal(state, c);
687 bool ctdb_recovery_have_lock(struct ctdb_context *ctdb)
689 return ctdb->recovery_lock_fd != -1;
693 try and get the recovery lock in shared storage - should only work
694 on the recovery master recovery daemon. Anywhere else is a bug
696 bool ctdb_recovery_lock(struct ctdb_context *ctdb)
700 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file,
701 O_RDWR|O_CREAT, 0600);
702 if (ctdb->recovery_lock_fd == -1) {
704 ("ctdb_recovery_lock: Unable to open %s - (%s)\n",
705 ctdb->recovery_lock_file, strerror(errno)));
709 set_close_on_exec(ctdb->recovery_lock_fd);
711 lock.l_type = F_WRLCK;
712 lock.l_whence = SEEK_SET;
717 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
718 int saved_errno = errno;
719 close(ctdb->recovery_lock_fd);
720 ctdb->recovery_lock_fd = -1;
721 /* Fail silently on these errors, since they indicate
722 * lock contention, but log an error for any other
724 if (saved_errno != EACCES &&
725 saved_errno != EAGAIN) {
726 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Failed to get "
727 "recovery lock on '%s' - (%s)\n",
728 ctdb->recovery_lock_file,
729 strerror(saved_errno)));
737 void ctdb_recovery_unlock(struct ctdb_context *ctdb)
739 if (ctdb->recovery_lock_fd != -1) {
740 DEBUG(DEBUG_NOTICE, ("Releasing recovery lock\n"));
741 close(ctdb->recovery_lock_fd);
742 ctdb->recovery_lock_fd = -1;
747 delete a record as part of the vacuum process
748 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
749 use non-blocking locks
751 return 0 if the record was successfully deleted (i.e. it does not exist
752 when the function returns)
753 or !0 is the record still exists in the tdb after returning.
755 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data_old *rec)
757 TDB_DATA key, data, data2;
758 struct ctdb_ltdb_header *hdr, *hdr2;
760 /* these are really internal tdb functions - but we need them here for
761 non-blocking lock of the freelist */
762 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
763 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
766 key.dsize = rec->keylen;
767 key.dptr = &rec->data[0];
768 data.dsize = rec->datalen;
769 data.dptr = &rec->data[rec->keylen];
771 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
772 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
776 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
777 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
781 hdr = (struct ctdb_ltdb_header *)data.dptr;
783 /* use a non-blocking lock */
784 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
788 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
789 if (data2.dptr == NULL) {
790 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
794 if (data2.dsize < sizeof(struct ctdb_ltdb_header)) {
795 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
796 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
797 DEBUG(DEBUG_CRIT,(__location__ " Failed to delete corrupt record\n"));
799 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
800 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
802 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
807 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
809 if (hdr2->rsn > hdr->rsn) {
810 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
811 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
812 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
817 /* do not allow deleting record that have readonly flags set. */
818 if (hdr->flags & CTDB_REC_RO_FLAGS) {
819 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
820 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
824 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
825 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
826 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
831 if (hdr2->dmaster == ctdb->pnn) {
832 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
833 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
838 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
839 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
844 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
845 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
846 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
847 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
852 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
853 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
860 struct recovery_callback_state {
861 struct ctdb_req_control_old *c;
866 called when the 'recovered' event script has finished
868 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
870 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
872 ctdb_enable_monitoring(ctdb);
873 CTDB_INCREMENT_STAT(ctdb, num_recoveries);
876 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
877 if (status == -ETIME) {
882 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
885 gettimeofday(&ctdb->last_recovery_finished, NULL);
887 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
888 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_STARTUP);
893 recovery has finished
895 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
896 struct ctdb_req_control_old *c,
900 struct recovery_callback_state *state;
902 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
904 ctdb_persistent_finish_trans3_commits(ctdb);
906 state = talloc(ctdb, struct recovery_callback_state);
907 CTDB_NO_MEMORY(ctdb, state);
911 ctdb_disable_monitoring(ctdb);
913 ret = ctdb_event_script_callback(ctdb, state,
914 ctdb_end_recovery_callback,
916 CTDB_EVENT_RECOVERED, "%s", "");
919 ctdb_enable_monitoring(ctdb);
921 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
926 /* tell the control that we will be reply asynchronously */
927 state->c = talloc_steal(state, c);
933 called when the 'startrecovery' event script has finished
935 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
937 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
940 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
943 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
948 run the startrecovery eventscript
950 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
951 struct ctdb_req_control_old *c,
955 struct recovery_callback_state *state;
957 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
958 gettimeofday(&ctdb->last_recovery_started, NULL);
960 state = talloc(ctdb, struct recovery_callback_state);
961 CTDB_NO_MEMORY(ctdb, state);
963 state->c = talloc_steal(state, c);
965 ctdb_disable_monitoring(ctdb);
967 ret = ctdb_event_script_callback(ctdb, state,
968 ctdb_start_recovery_callback,
970 CTDB_EVENT_START_RECOVERY,
974 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
979 /* tell the control that we will be reply asynchronously */
985 try to delete all these records as part of the vacuuming process
986 and return the records we failed to delete
988 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
990 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
991 struct ctdb_db_context *ctdb_db;
993 struct ctdb_rec_data_old *rec;
994 struct ctdb_marshall_buffer *records;
996 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
997 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1001 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1003 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1008 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1009 reply->count, reply->db_id));
1012 /* create a blob to send back the records we couldnt delete */
1013 records = (struct ctdb_marshall_buffer *)
1014 talloc_zero_size(outdata,
1015 offsetof(struct ctdb_marshall_buffer, data));
1016 if (records == NULL) {
1017 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1020 records->db_id = ctdb_db->db_id;
1023 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1024 for (i=0;i<reply->count;i++) {
1027 key.dptr = &rec->data[0];
1028 key.dsize = rec->keylen;
1029 data.dptr = &rec->data[key.dsize];
1030 data.dsize = rec->datalen;
1032 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1033 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1037 /* If we cant delete the record we must add it to the reply
1038 so the lmaster knows it may not purge this record
1040 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1042 struct ctdb_ltdb_header *hdr;
1044 hdr = (struct ctdb_ltdb_header *)data.dptr;
1045 data.dptr += sizeof(*hdr);
1046 data.dsize -= sizeof(*hdr);
1048 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1050 old_size = talloc_get_size(records);
1051 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1052 if (records == NULL) {
1053 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1057 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1060 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1064 *outdata = ctdb_marshall_finish(records);
1070 * Store a record as part of the vacuum process:
1071 * This is called from the RECEIVE_RECORD control which
1072 * the lmaster uses to send the current empty copy
1073 * to all nodes for storing, before it lets the other
1074 * nodes delete the records in the second phase with
1075 * the TRY_DELETE_RECORDS control.
1077 * Only store if we are not lmaster or dmaster, and our
1078 * rsn is <= the provided rsn. Use non-blocking locks.
1080 * return 0 if the record was successfully stored.
1081 * return !0 if the record still exists in the tdb after returning.
1083 static int store_tdb_record(struct ctdb_context *ctdb,
1084 struct ctdb_db_context *ctdb_db,
1085 struct ctdb_rec_data_old *rec)
1087 TDB_DATA key, data, data2;
1088 struct ctdb_ltdb_header *hdr, *hdr2;
1091 key.dsize = rec->keylen;
1092 key.dptr = &rec->data[0];
1093 data.dsize = rec->datalen;
1094 data.dptr = &rec->data[rec->keylen];
1096 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
1097 DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
1098 "where we are lmaster\n"));
1102 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
1103 DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
1107 hdr = (struct ctdb_ltdb_header *)data.dptr;
1109 /* use a non-blocking lock */
1110 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
1111 DEBUG(DEBUG_INFO, (__location__ " Failed to lock chain in non-blocking mode\n"));
1115 data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
1116 if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
1117 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) == -1) {
1118 DEBUG(DEBUG_ERR, (__location__ "Failed to store record\n"));
1122 DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
1127 hdr2 = (struct ctdb_ltdb_header *)data2.dptr;
1129 if (hdr2->rsn > hdr->rsn) {
1130 DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
1131 "rsn=%llu - called with rsn=%llu\n",
1132 (unsigned long long)hdr2->rsn,
1133 (unsigned long long)hdr->rsn));
1138 /* do not allow vacuuming of records that have readonly flags set. */
1139 if (hdr->flags & CTDB_REC_RO_FLAGS) {
1140 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1145 if (hdr2->flags & CTDB_REC_RO_FLAGS) {
1146 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
1152 if (hdr2->dmaster == ctdb->pnn) {
1153 DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
1154 "where we are the dmaster\n"));
1159 if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
1160 DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
1168 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1176 * Try to store all these records as part of the vacuuming process
1177 * and return the records we failed to store.
1179 int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
1180 TDB_DATA indata, TDB_DATA *outdata)
1182 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1183 struct ctdb_db_context *ctdb_db;
1185 struct ctdb_rec_data_old *rec;
1186 struct ctdb_marshall_buffer *records;
1188 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1190 (__location__ " invalid data in receive_records\n"));
1194 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1196 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1201 DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
1202 "dbid 0x%x\n", reply->count, reply->db_id));
1204 /* create a blob to send back the records we could not store */
1205 records = (struct ctdb_marshall_buffer *)
1206 talloc_zero_size(outdata,
1207 offsetof(struct ctdb_marshall_buffer, data));
1208 if (records == NULL) {
1209 DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
1212 records->db_id = ctdb_db->db_id;
1214 rec = (struct ctdb_rec_data_old *)&reply->data[0];
1215 for (i=0; i<reply->count; i++) {
1218 key.dptr = &rec->data[0];
1219 key.dsize = rec->keylen;
1220 data.dptr = &rec->data[key.dsize];
1221 data.dsize = rec->datalen;
1223 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1224 DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
1230 * If we can not store the record we must add it to the reply
1231 * so the lmaster knows it may not purge this record.
1233 if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
1235 struct ctdb_ltdb_header *hdr;
1237 hdr = (struct ctdb_ltdb_header *)data.dptr;
1238 data.dptr += sizeof(*hdr);
1239 data.dsize -= sizeof(*hdr);
1241 DEBUG(DEBUG_INFO, (__location__ " Failed to store "
1242 "record with hash 0x%08x in vacuum "
1243 "via RECEIVE_RECORDS\n",
1246 old_size = talloc_get_size(records);
1247 records = talloc_realloc_size(outdata, records,
1248 old_size + rec->length);
1249 if (records == NULL) {
1250 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1255 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1258 rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1261 *outdata = ctdb_marshall_finish(records);
1270 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1272 uint32_t *capabilities = NULL;
1274 capabilities = talloc(outdata, uint32_t);
1275 CTDB_NO_MEMORY(ctdb, capabilities);
1276 *capabilities = ctdb->capabilities;
1278 outdata->dsize = sizeof(uint32_t);
1279 outdata->dptr = (uint8_t *)capabilities;
1284 /* The recovery daemon will ping us at regular intervals.
1285 If we havent been pinged for a while we assume the recovery
1286 daemon is inoperable and we restart.
1288 static void ctdb_recd_ping_timeout(struct tevent_context *ev,
1289 struct tevent_timer *te,
1290 struct timeval t, void *p)
1292 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1293 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1295 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1297 if (*count < ctdb->tunable.recd_ping_failcount) {
1299 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1300 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1301 ctdb_recd_ping_timeout, ctdb);
1305 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1307 ctdb_stop_recoverd(ctdb);
1308 ctdb_start_recoverd(ctdb);
1311 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1313 talloc_free(ctdb->recd_ping_count);
1315 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1316 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1318 if (ctdb->tunable.recd_ping_timeout != 0) {
1319 tevent_add_timer(ctdb->ev, ctdb->recd_ping_count,
1320 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1321 ctdb_recd_ping_timeout, ctdb);
1329 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1331 uint32_t new_recmaster;
1333 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1334 new_recmaster = ((uint32_t *)(&indata.dptr[0]))[0];
1336 if (ctdb->pnn != new_recmaster && ctdb->recovery_master == ctdb->pnn) {
1338 ("This node (%u) is no longer the recovery master\n", ctdb->pnn));
1341 if (ctdb->pnn == new_recmaster && ctdb->recovery_master != new_recmaster) {
1343 ("This node (%u) is now the recovery master\n", ctdb->pnn));
1346 ctdb->recovery_master = new_recmaster;
1351 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb)
1353 DEBUG(DEBUG_NOTICE, ("Stopping node\n"));
1354 ctdb_disable_monitoring(ctdb);
1355 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1360 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1362 DEBUG(DEBUG_NOTICE, ("Continue node\n"));
1363 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;