4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
32 lock all databases - mark only
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
36 struct ctdb_db_context *ctdb_db;
37 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
38 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
41 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
42 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
50 lock all databases - unmark only
52 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
54 struct ctdb_db_context *ctdb_db;
55 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
56 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
59 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
60 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
69 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
71 CHECK_CONTROL_DATA_SIZE(0);
72 struct ctdb_vnn_map_wire *map;
75 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
76 map = talloc_size(outdata, len);
77 CTDB_NO_MEMORY(ctdb, map);
79 map->generation = ctdb->vnn_map->generation;
80 map->size = ctdb->vnn_map->size;
81 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
84 outdata->dptr = (uint8_t *)map;
90 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
92 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
94 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
95 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
99 talloc_free(ctdb->vnn_map);
101 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
102 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
104 ctdb->vnn_map->generation = map->generation;
105 ctdb->vnn_map->size = map->size;
106 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
107 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
109 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
115 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
118 struct ctdb_db_context *ctdb_db;
119 struct ctdb_dbid_map *dbid_map;
121 CHECK_CONTROL_DATA_SIZE(0);
124 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
129 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
130 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
131 if (!outdata->dptr) {
132 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
136 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
138 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
139 dbid_map->dbs[i].dbid = ctdb_db->db_id;
140 dbid_map->dbs[i].persistent = ctdb_db->persistent;
147 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
149 uint32_t i, num_nodes;
150 struct ctdb_node_map *node_map;
152 CHECK_CONTROL_DATA_SIZE(0);
154 num_nodes = ctdb->num_nodes;
156 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
157 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
158 if (!outdata->dptr) {
159 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
163 node_map = (struct ctdb_node_map *)outdata->dptr;
164 node_map->num = num_nodes;
165 for (i=0; i<num_nodes; i++) {
166 if (parse_ip(ctdb->nodes[i]->address.address,
167 NULL, /* TODO: pass in the correct interface here*/
169 &node_map->nodes[i].addr) == 0)
171 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
174 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
175 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
182 get an old style ipv4-only nodemap
185 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
187 uint32_t i, num_nodes;
188 struct ctdb_node_mapv4 *node_map;
190 CHECK_CONTROL_DATA_SIZE(0);
192 num_nodes = ctdb->num_nodes;
194 outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
195 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
196 if (!outdata->dptr) {
197 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
201 node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
202 node_map->num = num_nodes;
203 for (i=0; i<num_nodes; i++) {
204 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
205 DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
209 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
210 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
217 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
218 struct timeval t, void *private_data)
221 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
223 struct ctdb_node **nodes;
225 tmp_ctx = talloc_new(ctdb);
227 /* steal the old nodes file for a while */
228 talloc_steal(tmp_ctx, ctdb->nodes);
231 num_nodes = ctdb->num_nodes;
234 /* load the new nodes file */
235 ctdb_load_nodes_file(ctdb);
237 for (i=0; i<ctdb->num_nodes; i++) {
238 /* keep any identical pre-existing nodes and connections */
239 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
240 talloc_free(ctdb->nodes[i]);
241 ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
245 /* any new or different nodes must be added */
246 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
247 DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
248 ctdb_fatal(ctdb, "failed to add node. shutting down\n");
250 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
251 DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
252 ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
256 talloc_free(tmp_ctx);
261 reload the nodes file after a short delay (so that we can send the response
265 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
267 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
273 a traverse function for pulling all relevent records from pulldb
276 struct ctdb_context *ctdb;
277 struct ctdb_marshall_buffer *pulldata;
282 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
284 struct pulldb_data *params = (struct pulldb_data *)p;
285 struct ctdb_rec_data *rec;
287 /* add the record to the blob */
288 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
290 params->failed = true;
293 params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
294 if (params->pulldata == NULL) {
295 DEBUG(DEBUG_ERR,(__location__ " Failed to expand pulldb_data to %u (%u records)\n",
296 rec->length + params->len, params->pulldata->count));
297 params->failed = true;
300 params->pulldata->count++;
301 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
302 params->len += rec->length;
309 pul a bunch of records from a ltdb, filtering by lmaster
311 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
313 struct ctdb_control_pulldb *pull;
314 struct ctdb_db_context *ctdb_db;
315 struct pulldb_data params;
316 struct ctdb_marshall_buffer *reply;
318 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
319 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
323 pull = (struct ctdb_control_pulldb *)indata.dptr;
325 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
327 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
331 reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
332 CTDB_NO_MEMORY(ctdb, reply);
334 reply->db_id = pull->db_id;
337 params.pulldata = reply;
338 params.len = offsetof(struct ctdb_marshall_buffer, data);
339 params.failed = false;
341 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
342 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
346 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
347 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
348 ctdb_lock_all_databases_unmark(ctdb);
349 talloc_free(params.pulldata);
353 ctdb_lock_all_databases_unmark(ctdb);
355 outdata->dptr = (uint8_t *)params.pulldata;
356 outdata->dsize = params.len;
362 push a bunch of records into a ltdb, filtering by rsn
364 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
366 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
367 struct ctdb_db_context *ctdb_db;
369 struct ctdb_rec_data *rec;
371 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
372 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
376 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
377 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
381 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
383 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
387 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
388 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
392 rec = (struct ctdb_rec_data *)&reply->data[0];
394 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
395 reply->count, reply->db_id));
397 for (i=0;i<reply->count;i++) {
399 struct ctdb_ltdb_header *hdr;
401 key.dptr = &rec->data[0];
402 key.dsize = rec->keylen;
403 data.dptr = &rec->data[key.dsize];
404 data.dsize = rec->datalen;
406 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
407 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
410 hdr = (struct ctdb_ltdb_header *)data.dptr;
411 data.dptr += sizeof(*hdr);
412 data.dsize -= sizeof(*hdr);
414 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
416 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
420 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
423 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
424 reply->count, reply->db_id));
426 ctdb_lock_all_databases_unmark(ctdb);
430 ctdb_lock_all_databases_unmark(ctdb);
435 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
437 uint32_t *dmaster = (uint32_t *)p;
438 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
441 /* skip if already correct */
442 if (header->dmaster == *dmaster) {
446 header->dmaster = *dmaster;
448 ret = tdb_store(tdb, key, data, TDB_REPLACE);
450 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back ret:%d\n",ret));
454 /* TODO: add error checking here */
459 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
461 struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
462 struct ctdb_db_context *ctdb_db;
464 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
465 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
469 ctdb_db = find_ctdb_db(ctdb, p->db_id);
471 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
475 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
476 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
480 tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
482 ctdb_lock_all_databases_unmark(ctdb);
487 struct ctdb_set_recmode_state {
488 struct ctdb_context *ctdb;
489 struct ctdb_req_control *c;
492 struct timed_event *te;
493 struct fd_event *fde;
498 called if our set_recmode child times out. this would happen if
499 ctdb_recovery_lock() would block.
501 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
502 struct timeval t, void *private_data)
504 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
505 struct ctdb_set_recmode_state);
507 /* we consider this a success, not a failure, as we failed to
508 set the recovery lock which is what we wanted. This can be
509 caused by the cluster filesystem being very slow to
510 arbitrate locks immediately after a node failure.
512 DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
513 state->ctdb->recovery_mode = state->recmode;
514 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
519 /* when we free the recmode state we must kill any child process.
521 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
523 kill(state->child, SIGKILL);
527 /* this is called when the client process has completed ctdb_recovery_lock()
528 and has written data back to us through the pipe.
530 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
531 uint16_t flags, void *private_data)
533 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
534 struct ctdb_set_recmode_state);
538 /* we got a response from our child process so we can abort the
541 talloc_free(state->te);
545 /* read the childs status when trying to lock the reclock file.
546 child wrote 0 if everything is fine and 1 if it did manage
547 to lock the file, which would be a problem since that means
548 we got a request to exit from recovery but we could still lock
549 the file which at this time SHOULD be locked by the recovery
550 daemon on the recmaster
552 ret = read(state->fd[0], &c, 1);
553 if (ret != 1 || c != 0) {
554 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
559 state->ctdb->recovery_mode = state->recmode;
561 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
567 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te,
568 struct timeval t, void *private_data)
570 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
572 DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
573 talloc_free(ctdb->release_ips_ctx);
574 ctdb->release_ips_ctx = NULL;
576 ctdb_release_all_ips(ctdb);
580 set the recovery mode
582 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
583 struct ctdb_req_control *c,
584 TDB_DATA indata, bool *async_reply,
585 const char **errormsg)
587 uint32_t recmode = *(uint32_t *)indata.dptr;
589 struct ctdb_set_recmode_state *state;
590 pid_t parent = getpid();
592 /* if we enter recovery but stay in recovery for too long
593 we will eventually drop all our ip addresses
595 if (recmode == CTDB_RECOVERY_NORMAL) {
596 talloc_free(ctdb->release_ips_ctx);
597 ctdb->release_ips_ctx = NULL;
599 talloc_free(ctdb->release_ips_ctx);
600 ctdb->release_ips_ctx = talloc_new(ctdb);
601 CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
603 event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
607 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
608 DEBUG(DEBUG_ERR,("Attempt to change recovery mode to %u when not frozen\n",
610 (*errormsg) = "Cannot change recovery mode while not frozen";
614 if (recmode != ctdb->recovery_mode) {
615 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
616 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
619 if (recmode != CTDB_RECOVERY_NORMAL ||
620 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
621 ctdb->recovery_mode = recmode;
625 /* some special handling when ending recovery mode */
627 /* force the databased to thaw */
628 if (ctdb->freeze_handle) {
629 ctdb_control_thaw(ctdb);
632 state = talloc(ctdb, struct ctdb_set_recmode_state);
633 CTDB_NO_MEMORY(ctdb, state);
636 if (ctdb->tunable.verify_recovery_lock == 0) {
637 /* dont need to verify the reclock file */
638 ctdb->recovery_mode = recmode;
642 /* For the rest of what needs to be done, we need to do this in
643 a child process since
644 1, the call to ctdb_recovery_lock() can block if the cluster
645 filesystem is in the process of recovery.
647 ret = pipe(state->fd);
650 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
654 state->child = fork();
655 if (state->child == (pid_t)-1) {
662 if (state->child == 0) {
666 /* we should not be able to get the lock on the reclock file,
667 as it should be held by the recovery master
669 if (ctdb_recovery_lock(ctdb, false)) {
670 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
674 write(state->fd[1], &cc, 1);
675 /* make sure we die when our parent dies */
676 while (kill(parent, 0) == 0 || errno != ESRCH) {
678 write(state->fd[1], &cc, 1);
684 talloc_set_destructor(state, set_recmode_destructor);
686 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
687 ctdb_set_recmode_timeout, state);
689 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
690 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
693 if (state->fde == NULL) {
699 state->recmode = recmode;
700 state->c = talloc_steal(state, c);
709 try and get the recovery lock in shared storage - should only work
710 on the recovery master recovery daemon. Anywhere else is a bug
712 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
717 DEBUG(DEBUG_ERR, ("Take the recovery lock\n"));
719 if (ctdb->recovery_lock_fd != -1) {
720 close(ctdb->recovery_lock_fd);
722 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
723 if (ctdb->recovery_lock_fd == -1) {
724 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
725 ctdb->recovery_lock_file, strerror(errno)));
729 set_close_on_exec(ctdb->recovery_lock_fd);
731 lock.l_type = F_WRLCK;
732 lock.l_whence = SEEK_SET;
737 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
738 close(ctdb->recovery_lock_fd);
739 ctdb->recovery_lock_fd = -1;
741 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
747 close(ctdb->recovery_lock_fd);
748 ctdb->recovery_lock_fd = -1;
752 DEBUG(DEBUG_ERR, ("Recovery lock taken successfully\n"));
755 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
761 delete a record as part of the vacuum process
762 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
763 use non-blocking locks
765 return 0 if the record was successfully deleted (i.e. it does not exist
766 when the function returns)
767 or !0 is the record still exists in the tdb after returning.
769 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
772 struct ctdb_ltdb_header *hdr, *hdr2;
774 /* these are really internal tdb functions - but we need them here for
775 non-blocking lock of the freelist */
776 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
777 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
780 key.dsize = rec->keylen;
781 key.dptr = &rec->data[0];
782 data.dsize = rec->datalen;
783 data.dptr = &rec->data[rec->keylen];
785 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
786 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
790 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
791 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
795 hdr = (struct ctdb_ltdb_header *)data.dptr;
797 /* use a non-blocking lock */
798 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
802 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
803 if (data.dptr == NULL) {
804 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
808 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
809 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
810 tdb_delete(ctdb_db->ltdb->tdb, key);
811 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
812 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
814 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
819 hdr2 = (struct ctdb_ltdb_header *)data.dptr;
821 if (hdr2->rsn > hdr->rsn) {
822 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
823 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
824 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
829 if (hdr2->dmaster == ctdb->pnn) {
830 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
831 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
836 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
837 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
842 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
843 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
844 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
845 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
850 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
851 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
858 struct recovery_callback_state {
859 struct ctdb_req_control *c;
864 called when the 'recovered' event script has finished
866 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
868 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
870 ctdb_enable_monitoring(ctdb);
873 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
876 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
879 gettimeofday(&ctdb->last_recovery_finished, NULL);
883 recovery has finished
885 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
886 struct ctdb_req_control *c,
890 struct recovery_callback_state *state;
892 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
894 state = talloc(ctdb, struct recovery_callback_state);
895 CTDB_NO_MEMORY(ctdb, state);
897 state->c = talloc_steal(state, c);
899 ctdb_disable_monitoring(ctdb);
901 ret = ctdb_event_script_callback(ctdb,
902 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
904 ctdb_end_recovery_callback,
908 ctdb_enable_monitoring(ctdb);
910 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
915 /* tell the control that we will be reply asynchronously */
921 called when the 'startrecovery' event script has finished
923 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
925 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
928 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
931 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
936 run the startrecovery eventscript
938 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
939 struct ctdb_req_control *c,
943 struct recovery_callback_state *state;
945 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
946 gettimeofday(&ctdb->last_recovery_started, NULL);
948 state = talloc(ctdb, struct recovery_callback_state);
949 CTDB_NO_MEMORY(ctdb, state);
951 state->c = talloc_steal(state, c);
953 ctdb_disable_monitoring(ctdb);
955 ret = ctdb_event_script_callback(ctdb,
956 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
958 ctdb_start_recovery_callback,
959 state, "startrecovery");
962 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
967 /* tell the control that we will be reply asynchronously */
973 try to delete all these records as part of the vacuuming process
974 and return the records we failed to delete
976 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
978 struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
979 struct ctdb_db_context *ctdb_db;
981 struct ctdb_rec_data *rec;
982 struct ctdb_marshall_buffer *records;
984 if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
985 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
989 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
991 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
996 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
997 reply->count, reply->db_id));
1000 /* create a blob to send back the records we couldnt delete */
1001 records = (struct ctdb_marshall_buffer *)
1002 talloc_zero_size(outdata,
1003 offsetof(struct ctdb_marshall_buffer, data));
1004 if (records == NULL) {
1005 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1008 records->db_id = ctdb_db->db_id;
1011 rec = (struct ctdb_rec_data *)&reply->data[0];
1012 for (i=0;i<reply->count;i++) {
1015 key.dptr = &rec->data[0];
1016 key.dsize = rec->keylen;
1017 data.dptr = &rec->data[key.dsize];
1018 data.dsize = rec->datalen;
1020 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1021 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1025 /* If we cant delete the record we must add it to the reply
1026 so the lmaster knows it may not purge this record
1028 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1030 struct ctdb_ltdb_header *hdr;
1032 hdr = (struct ctdb_ltdb_header *)data.dptr;
1033 data.dptr += sizeof(*hdr);
1034 data.dsize -= sizeof(*hdr);
1036 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1038 old_size = talloc_get_size(records);
1039 records = talloc_realloc_size(outdata, records, old_size + rec->length);
1040 if (records == NULL) {
1041 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1045 memcpy(old_size+(uint8_t *)records, rec, rec->length);
1048 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1052 outdata->dptr = (uint8_t *)records;
1053 outdata->dsize = talloc_get_size(records);
1061 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1063 uint32_t *capabilities = NULL;
1065 capabilities = talloc(outdata, uint32_t);
1066 CTDB_NO_MEMORY(ctdb, capabilities);
1067 *capabilities = ctdb->capabilities;
1069 outdata->dsize = sizeof(uint32_t);
1070 outdata->dptr = (uint8_t *)capabilities;
1075 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1077 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1078 uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1080 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1082 if (*count < ctdb->tunable.recd_ping_failcount) {
1084 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1085 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1086 ctdb_recd_ping_timeout, ctdb);
1090 DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Shutting down ctdb daemon. (This can be caused if the cluster filesystem has hung)\n"));
1092 ctdb_stop_recoverd(ctdb);
1093 ctdb_stop_keepalive(ctdb);
1094 ctdb_stop_monitoring(ctdb);
1095 ctdb_release_all_ips(ctdb);
1096 if (ctdb->methods != NULL) {
1097 ctdb->methods->shutdown(ctdb);
1099 ctdb_event_script(ctdb, "shutdown");
1100 DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Daemon has been shut down.\n"));
1104 /* The recovery daemon will ping us at regular intervals.
1105 If we havent been pinged for a while we assume the recovery
1106 daemon is inoperable and we shut down.
1108 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1110 talloc_free(ctdb->recd_ping_count);
1112 ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1113 CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1115 if (ctdb->tunable.recd_ping_timeout != 0) {
1116 event_add_timed(ctdb->ev, ctdb->recd_ping_count,
1117 timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1118 ctdb_recd_ping_timeout, ctdb);
1126 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1128 CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1130 ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];