4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
32 lock all databases - mark only
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
36 struct ctdb_db_context *ctdb_db;
37 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
38 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
41 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
42 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
50 lock all databases - unmark only
52 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
54 struct ctdb_db_context *ctdb_db;
55 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
56 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
59 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
60 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
69 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
71 CHECK_CONTROL_DATA_SIZE(0);
72 struct ctdb_vnn_map_wire *map;
75 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
76 map = talloc_size(outdata, len);
77 CTDB_NO_MEMORY(ctdb, map);
79 map->generation = ctdb->vnn_map->generation;
80 map->size = ctdb->vnn_map->size;
81 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
84 outdata->dptr = (uint8_t *)map;
90 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
92 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
94 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
95 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
99 talloc_free(ctdb->vnn_map);
101 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
102 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
104 ctdb->vnn_map->generation = map->generation;
105 ctdb->vnn_map->size = map->size;
106 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
107 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
109 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
115 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
118 struct ctdb_db_context *ctdb_db;
119 struct ctdb_dbid_map *dbid_map;
121 CHECK_CONTROL_DATA_SIZE(0);
124 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
129 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
130 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
131 if (!outdata->dptr) {
132 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
136 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
138 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
139 dbid_map->dbs[i].dbid = ctdb_db->db_id;
140 dbid_map->dbs[i].persistent = ctdb_db->persistent;
147 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
149 uint32_t i, num_nodes;
150 struct ctdb_node_map *node_map;
152 CHECK_CONTROL_DATA_SIZE(0);
154 num_nodes = ctdb->num_nodes;
156 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
157 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
158 if (!outdata->dptr) {
159 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
163 node_map = (struct ctdb_node_map *)outdata->dptr;
164 node_map->num = num_nodes;
165 for (i=0; i<num_nodes; i++) {
166 inet_aton(ctdb->nodes[i]->address.address, &node_map->nodes[i].sin.sin_addr);
167 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
168 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
175 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
176 struct timeval t, void *private_data)
179 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
180 int ctdb_tcp_init(struct ctdb_context *);
182 /* shut down the transport */
183 if (ctdb->methods != NULL) {
184 ctdb->methods->shutdown(ctdb);
187 /* start the transport again */
188 ctdb_load_nodes_file(ctdb);
189 ret = ctdb_tcp_init(ctdb);
191 DEBUG(DEBUG_CRIT, (__location__ " Failed to init TCP\n"));
195 if (ctdb->methods == NULL) {
196 DEBUG(DEBUG_ALERT,(__location__ " Can not restart transport. ctdb->methods==NULL\n"));
197 ctdb_fatal(ctdb, "can not reinitialize transport.");
199 ctdb->methods->initialise(ctdb);
200 ctdb->methods->start(ctdb);
206 reload the nodes file after a short delay (so that we can send the response
210 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
212 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
218 a traverse function for pulling all relevent records from pulldb
221 struct ctdb_context *ctdb;
222 struct ctdb_control_pulldb_reply *pulldata;
227 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
229 struct pulldb_data *params = (struct pulldb_data *)p;
230 struct ctdb_rec_data *rec;
232 /* add the record to the blob */
233 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
235 params->failed = true;
238 params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
239 if (params->pulldata == NULL) {
240 DEBUG(DEBUG_ERR,(__location__ " Failed to expand pulldb_data to %u (%u records)\n",
241 rec->length + params->len, params->pulldata->count));
242 params->failed = true;
245 params->pulldata->count++;
246 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
247 params->len += rec->length;
254 pul a bunch of records from a ltdb, filtering by lmaster
256 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
258 struct ctdb_control_pulldb *pull;
259 struct ctdb_db_context *ctdb_db;
260 struct pulldb_data params;
261 struct ctdb_control_pulldb_reply *reply;
263 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
264 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
268 pull = (struct ctdb_control_pulldb *)indata.dptr;
270 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
272 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
276 reply = talloc_zero(outdata, struct ctdb_control_pulldb_reply);
277 CTDB_NO_MEMORY(ctdb, reply);
279 reply->db_id = pull->db_id;
282 params.pulldata = reply;
283 params.len = offsetof(struct ctdb_control_pulldb_reply, data);
284 params.failed = false;
286 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
287 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
291 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
292 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
293 ctdb_lock_all_databases_unmark(ctdb);
294 talloc_free(params.pulldata);
298 ctdb_lock_all_databases_unmark(ctdb);
300 outdata->dptr = (uint8_t *)params.pulldata;
301 outdata->dsize = params.len;
307 push a bunch of records into a ltdb, filtering by rsn
309 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
311 struct ctdb_control_pulldb_reply *reply = (struct ctdb_control_pulldb_reply *)indata.dptr;
312 struct ctdb_db_context *ctdb_db;
314 struct ctdb_rec_data *rec;
316 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
317 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
321 if (indata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
322 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
326 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
328 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
332 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
333 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
337 rec = (struct ctdb_rec_data *)&reply->data[0];
339 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
340 reply->count, reply->db_id));
342 for (i=0;i<reply->count;i++) {
344 struct ctdb_ltdb_header *hdr;
346 key.dptr = &rec->data[0];
347 key.dsize = rec->keylen;
348 data.dptr = &rec->data[key.dsize];
349 data.dsize = rec->datalen;
351 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
352 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
355 hdr = (struct ctdb_ltdb_header *)data.dptr;
356 data.dptr += sizeof(*hdr);
357 data.dsize -= sizeof(*hdr);
359 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
361 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
365 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
368 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
369 reply->count, reply->db_id));
371 ctdb_lock_all_databases_unmark(ctdb);
375 ctdb_lock_all_databases_unmark(ctdb);
380 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
382 uint32_t *dmaster = (uint32_t *)p;
383 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
386 /* skip if already correct */
387 if (header->dmaster == *dmaster) {
391 header->dmaster = *dmaster;
393 ret = tdb_store(tdb, key, data, TDB_REPLACE);
395 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back ret:%d\n",ret));
399 /* TODO: add error checking here */
404 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
406 struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
407 struct ctdb_db_context *ctdb_db;
409 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
410 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
414 ctdb_db = find_ctdb_db(ctdb, p->db_id);
416 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
420 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
421 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
425 tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
427 ctdb_lock_all_databases_unmark(ctdb);
432 struct ctdb_set_recmode_state {
433 struct ctdb_context *ctdb;
434 struct ctdb_req_control *c;
437 struct timed_event *te;
438 struct fd_event *fde;
443 called if our set_recmode child times out. this would happen if
444 ctdb_recovery_lock() would block.
446 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
447 struct timeval t, void *private_data)
449 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
450 struct ctdb_set_recmode_state);
452 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "timeout in ctdb_set_recmode");
457 /* when we free the recmode state we must kill any child process.
459 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
461 kill(state->child, SIGKILL);
462 waitpid(state->child, NULL, 0);
466 /* this is called when the client process has completed ctdb_recovery_lock()
467 and has written data back to us through the pipe.
469 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
470 uint16_t flags, void *private_data)
472 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
473 struct ctdb_set_recmode_state);
477 /* we got a response from our child process so we can abort the
480 talloc_free(state->te);
484 /* read the childs status when trying to lock the reclock file.
485 child wrote 0 if everything is fine and 1 if it did manage
486 to lock the file, which would be a problem since that means
487 we got a request to exit from recovery but we could still lock
488 the file which at this time SHOULD be locked by the recovery
489 daemon on the recmaster
491 ret = read(state->fd[0], &c, 1);
492 if (ret != 1 || c != 0) {
493 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
498 state->ctdb->recovery_mode = state->recmode;
500 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
506 set the recovery mode
508 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
509 struct ctdb_req_control *c,
510 TDB_DATA indata, bool *async_reply,
511 const char **errormsg)
513 uint32_t recmode = *(uint32_t *)indata.dptr;
515 struct ctdb_set_recmode_state *state;
516 pid_t parent = getpid();
518 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
519 DEBUG(DEBUG_ERR,("Attempt to change recovery mode to %u when not frozen\n",
521 (*errormsg) = "Cannot change recovery mode while not frozen";
525 if (recmode != ctdb->recovery_mode) {
526 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
527 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
530 if (recmode != CTDB_RECOVERY_NORMAL ||
531 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
532 ctdb->recovery_mode = recmode;
536 /* some special handling when ending recovery mode */
538 /* force the databased to thaw */
539 if (ctdb->freeze_handle) {
540 ctdb_control_thaw(ctdb);
543 state = talloc(ctdb, struct ctdb_set_recmode_state);
544 CTDB_NO_MEMORY(ctdb, state);
546 /* For the rest of what needs to be done, we need to do this in
547 a child process since
548 1, the call to ctdb_recovery_lock() can block if the cluster
549 filesystem is in the process of recovery.
550 2, running of the script may take a while.
552 ret = pipe(state->fd);
555 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
559 state->child = fork();
560 if (state->child == (pid_t)-1) {
567 if (state->child == 0) {
571 /* we should not be able to get the lock on the nodes list,
572 as it should be held by the recovery master
574 if (ctdb_recovery_lock(ctdb, false)) {
575 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
579 write(state->fd[1], &cc, 1);
580 /* make sure we die when our parent dies */
581 while (kill(parent, 0) == 0 || errno != ESRCH) {
588 talloc_set_destructor(state, set_recmode_destructor);
590 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(3, 0),
591 ctdb_set_recmode_timeout, state);
593 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
594 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
597 if (state->fde == NULL) {
603 state->recmode = recmode;
604 state->c = talloc_steal(state, c);
613 try and get the recovery lock in shared storage - should only work
614 on the recovery master recovery daemon. Anywhere else is a bug
616 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
620 if (ctdb->recovery_lock_fd != -1) {
621 close(ctdb->recovery_lock_fd);
623 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
624 if (ctdb->recovery_lock_fd == -1) {
625 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
626 ctdb->recovery_lock_file, strerror(errno)));
630 set_close_on_exec(ctdb->recovery_lock_fd);
632 lock.l_type = F_WRLCK;
633 lock.l_whence = SEEK_SET;
638 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
639 close(ctdb->recovery_lock_fd);
640 ctdb->recovery_lock_fd = -1;
642 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
648 close(ctdb->recovery_lock_fd);
649 ctdb->recovery_lock_fd = -1;
652 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
658 delete a record as part of the vacuum process
659 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
660 use non-blocking locks
662 return 0 if the record was successfully deleted (i.e. it does not exist
663 when the function returns)
664 or !0 is the record still exists in the tdb after returning.
666 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
669 struct ctdb_ltdb_header *hdr, *hdr2;
671 /* these are really internal tdb functions - but we need them here for
672 non-blocking lock of the freelist */
673 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
674 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
677 key.dsize = rec->keylen;
678 key.dptr = &rec->data[0];
679 data.dsize = rec->datalen;
680 data.dptr = &rec->data[rec->keylen];
682 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
683 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
687 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
688 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
692 hdr = (struct ctdb_ltdb_header *)data.dptr;
694 /* use a non-blocking lock */
695 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
699 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
700 if (data.dptr == NULL) {
701 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
705 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
706 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
707 tdb_delete(ctdb_db->ltdb->tdb, key);
708 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
709 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
711 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
716 hdr2 = (struct ctdb_ltdb_header *)data.dptr;
718 if (hdr2->rsn > hdr->rsn) {
719 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
720 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
721 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
726 if (hdr2->dmaster == ctdb->pnn) {
727 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
728 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
733 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
734 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
739 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
740 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
741 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
742 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
747 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
748 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
755 struct recovery_callback_state {
756 struct ctdb_req_control *c;
761 called when the 'recovered' event script has finished
763 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
765 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
767 ctdb_enable_monitoring(ctdb);
770 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
773 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
776 gettimeofday(&ctdb->last_recovery_finished, NULL);
780 recovery has finished
782 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
783 struct ctdb_req_control *c,
787 struct recovery_callback_state *state;
789 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
791 state = talloc(ctdb, struct recovery_callback_state);
792 CTDB_NO_MEMORY(ctdb, state);
794 state->c = talloc_steal(state, c);
796 ctdb_disable_monitoring(ctdb);
798 ret = ctdb_event_script_callback(ctdb,
799 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
801 ctdb_end_recovery_callback,
805 ctdb_enable_monitoring(ctdb);
807 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
812 /* tell the control that we will be reply asynchronously */
818 called when the 'startrecovery' event script has finished
820 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
822 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
825 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
828 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
833 run the startrecovery eventscript
835 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
836 struct ctdb_req_control *c,
840 struct recovery_callback_state *state;
842 DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
843 gettimeofday(&ctdb->last_recovery_started, NULL);
845 state = talloc(ctdb, struct recovery_callback_state);
846 CTDB_NO_MEMORY(ctdb, state);
848 state->c = talloc_steal(state, c);
850 ctdb_disable_monitoring(ctdb);
852 ret = ctdb_event_script_callback(ctdb,
853 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
855 ctdb_start_recovery_callback,
856 state, "startrecovery");
859 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
864 /* tell the control that we will be reply asynchronously */
870 report the location for the reclock file
872 int32_t ctdb_control_get_reclock_file(struct ctdb_context *ctdb, TDB_DATA *outdata)
874 char *reclock = NULL;
876 reclock = talloc_strdup(outdata, ctdb->recovery_lock_file);
877 CTDB_NO_MEMORY(ctdb, reclock);
879 outdata->dsize = strlen(reclock)+1;
880 outdata->dptr = (uint8_t *)reclock;
886 try to delete all these records as part of the vacuuming process
887 and return the records we failed to delete
889 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
891 struct ctdb_control_pulldb_reply *reply = (struct ctdb_control_pulldb_reply *)indata.dptr;
892 struct ctdb_db_context *ctdb_db;
894 struct ctdb_rec_data *rec;
895 struct ctdb_control_pulldb_reply *records;
897 if (indata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
898 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
902 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
904 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
909 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
910 reply->count, reply->db_id));
913 /* create a blob to send back the records we couldnt delete */
914 records = (struct ctdb_control_pulldb_reply *)
915 talloc_zero_size(outdata,
916 offsetof(struct ctdb_control_pulldb_reply, data));
917 if (records == NULL) {
918 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
921 records->db_id = ctdb_db->db_id;
924 rec = (struct ctdb_rec_data *)&reply->data[0];
925 for (i=0;i<reply->count;i++) {
928 key.dptr = &rec->data[0];
929 key.dsize = rec->keylen;
930 data.dptr = &rec->data[key.dsize];
931 data.dsize = rec->datalen;
933 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
934 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
938 /* If we cant delete the record we must add it to the reply
939 so the lmaster knows it may not purge this record
941 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
943 struct ctdb_ltdb_header *hdr;
945 hdr = (struct ctdb_ltdb_header *)data.dptr;
946 data.dptr += sizeof(*hdr);
947 data.dsize -= sizeof(*hdr);
949 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
951 old_size = talloc_get_size(records);
952 records = talloc_realloc_size(outdata, records, old_size + rec->length);
953 if (records == NULL) {
954 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
958 memcpy(old_size+(uint8_t *)records, rec, rec->length);
961 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
965 outdata->dptr = (uint8_t *)records;
966 outdata->dsize = talloc_get_size(records);
974 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
976 uint32_t *capabilities = NULL;
978 capabilities = talloc(outdata, uint32_t);
979 CTDB_NO_MEMORY(ctdb, capabilities);
980 *capabilities = ctdb->capabilities;
982 outdata->dsize = sizeof(uint32_t);
983 outdata->dptr = (uint8_t *)capabilities;