4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
32 lock all databases - mark only
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
36 struct ctdb_db_context *ctdb_db;
37 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
38 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
41 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
42 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
50 lock all databases - unmark only
52 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
54 struct ctdb_db_context *ctdb_db;
55 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
56 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
59 for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
60 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
69 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
71 CHECK_CONTROL_DATA_SIZE(0);
72 struct ctdb_vnn_map_wire *map;
75 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
76 map = talloc_size(outdata, len);
77 CTDB_NO_MEMORY_VOID(ctdb, map);
79 map->generation = ctdb->vnn_map->generation;
80 map->size = ctdb->vnn_map->size;
81 memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
84 outdata->dptr = (uint8_t *)map;
90 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
92 struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
94 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
95 DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
99 talloc_free(ctdb->vnn_map);
101 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
102 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
104 ctdb->vnn_map->generation = map->generation;
105 ctdb->vnn_map->size = map->size;
106 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
107 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
109 memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
115 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
118 struct ctdb_db_context *ctdb_db;
119 struct ctdb_dbid_map *dbid_map;
121 CHECK_CONTROL_DATA_SIZE(0);
124 for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
129 outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
130 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
131 if (!outdata->dptr) {
132 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
136 dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
138 for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
139 dbid_map->dbs[i].dbid = ctdb_db->db_id;
140 dbid_map->dbs[i].persistent = ctdb_db->persistent;
147 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
149 uint32_t i, num_nodes;
150 struct ctdb_node_map *node_map;
152 CHECK_CONTROL_DATA_SIZE(0);
154 num_nodes = ctdb->num_nodes;
156 outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
157 outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
158 if (!outdata->dptr) {
159 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
163 node_map = (struct ctdb_node_map *)outdata->dptr;
164 node_map->num = num_nodes;
165 for (i=0; i<num_nodes; i++) {
166 inet_aton(ctdb->nodes[i]->address.address, &node_map->nodes[i].sin.sin_addr);
167 node_map->nodes[i].pnn = ctdb->nodes[i]->pnn;
168 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
175 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
176 struct timeval t, void *private_data)
179 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
180 int ctdb_tcp_init(struct ctdb_context *);
182 /* shut down the transport */
183 ctdb->methods->shutdown(ctdb);
185 /* start the transport again */
186 ctdb_load_nodes_file(ctdb);
187 ret = ctdb_tcp_init(ctdb);
189 DEBUG(DEBUG_CRIT, (__location__ " Failed to init TCP\n"));
192 ctdb->methods->initialise(ctdb);
193 ctdb->methods->start(ctdb);
199 reload the nodes file after a short delay (so that we can send the response
203 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
205 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
211 a traverse function for pulling all relevent records from pulldb
214 struct ctdb_context *ctdb;
215 struct ctdb_control_pulldb_reply *pulldata;
220 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
222 struct pulldb_data *params = (struct pulldb_data *)p;
223 struct ctdb_rec_data *rec;
225 /* add the record to the blob */
226 rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
228 params->failed = true;
231 params->pulldata = talloc_realloc_size(NULL, params->pulldata, rec->length + params->len);
232 if (params->pulldata == NULL) {
233 DEBUG(DEBUG_ERR,(__location__ " Failed to expand pulldb_data to %u (%u records)\n",
234 rec->length + params->len, params->pulldata->count));
235 params->failed = true;
238 params->pulldata->count++;
239 memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
240 params->len += rec->length;
247 pul a bunch of records from a ltdb, filtering by lmaster
249 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
251 struct ctdb_control_pulldb *pull;
252 struct ctdb_db_context *ctdb_db;
253 struct pulldb_data params;
254 struct ctdb_control_pulldb_reply *reply;
256 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
257 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
261 pull = (struct ctdb_control_pulldb *)indata.dptr;
263 ctdb_db = find_ctdb_db(ctdb, pull->db_id);
265 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
269 reply = talloc_zero(outdata, struct ctdb_control_pulldb_reply);
270 CTDB_NO_MEMORY(ctdb, reply);
272 reply->db_id = pull->db_id;
275 params.pulldata = reply;
276 params.len = offsetof(struct ctdb_control_pulldb_reply, data);
277 params.failed = false;
279 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
280 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
284 if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, ¶ms) == -1) {
285 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
286 ctdb_lock_all_databases_unmark(ctdb);
287 talloc_free(params.pulldata);
291 ctdb_lock_all_databases_unmark(ctdb);
293 outdata->dptr = (uint8_t *)params.pulldata;
294 outdata->dsize = params.len;
300 push a bunch of records into a ltdb, filtering by rsn
302 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
304 struct ctdb_control_pulldb_reply *reply = (struct ctdb_control_pulldb_reply *)indata.dptr;
305 struct ctdb_db_context *ctdb_db;
307 struct ctdb_rec_data *rec;
309 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
310 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
314 if (indata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
315 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
319 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
321 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
325 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
326 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
330 rec = (struct ctdb_rec_data *)&reply->data[0];
332 DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
333 reply->count, reply->db_id));
335 for (i=0;i<reply->count;i++) {
337 struct ctdb_ltdb_header *hdr;
339 key.dptr = &rec->data[0];
340 key.dsize = rec->keylen;
341 data.dptr = &rec->data[key.dsize];
342 data.dsize = rec->datalen;
344 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
345 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
348 hdr = (struct ctdb_ltdb_header *)data.dptr;
349 data.dptr += sizeof(*hdr);
350 data.dsize -= sizeof(*hdr);
352 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
354 DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
358 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
361 DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
362 reply->count, reply->db_id));
364 ctdb_lock_all_databases_unmark(ctdb);
368 ctdb_lock_all_databases_unmark(ctdb);
373 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
375 uint32_t *dmaster = (uint32_t *)p;
376 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
379 /* skip if already correct */
380 if (header->dmaster == *dmaster) {
384 header->dmaster = *dmaster;
386 ret = tdb_store(tdb, key, data, TDB_REPLACE);
388 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back ret:%d\n",ret));
392 /* TODO: add error checking here */
397 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
399 struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
400 struct ctdb_db_context *ctdb_db;
402 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
403 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
407 ctdb_db = find_ctdb_db(ctdb, p->db_id);
409 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
413 if (ctdb_lock_all_databases_mark(ctdb) != 0) {
414 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
418 tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
420 ctdb_lock_all_databases_unmark(ctdb);
425 struct ctdb_set_recmode_state {
426 struct ctdb_context *ctdb;
427 struct ctdb_req_control *c;
430 struct timed_event *te;
431 struct fd_event *fde;
436 called if our set_recmode child times out. this would happen if
437 ctdb_recovery_lock() would block.
439 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te,
440 struct timeval t, void *private_data)
442 struct ctdb_set_recmode_state *state = talloc_get_type(private_data,
443 struct ctdb_set_recmode_state);
445 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "timeout in ctdb_set_recmode");
450 /* when we free the recmode state we must kill any child process.
452 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
454 kill(state->child, SIGKILL);
455 waitpid(state->child, NULL, 0);
459 /* this is called when the client process has completed ctdb_recovery_lock()
460 and has written data back to us through the pipe.
462 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde,
463 uint16_t flags, void *private_data)
465 struct ctdb_set_recmode_state *state= talloc_get_type(private_data,
466 struct ctdb_set_recmode_state);
470 /* we got a response from our child process so we can abort the
473 talloc_free(state->te);
477 /* read the childs status when trying to lock the reclock file.
478 child wrote 0 if everything is fine and 1 if it did manage
479 to lock the file, which would be a problem since that means
480 we got a request to exit from recovery but we could still lock
481 the file which at this time SHOULD be locked by the recovery
482 daemon on the recmaster
484 ret = read(state->fd[0], &c, 1);
485 if (ret != 1 || c != 0) {
486 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
491 state->ctdb->recovery_mode = state->recmode;
493 ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
499 set the recovery mode
501 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
502 struct ctdb_req_control *c,
503 TDB_DATA indata, bool *async_reply,
504 const char **errormsg)
506 uint32_t recmode = *(uint32_t *)indata.dptr;
508 struct ctdb_set_recmode_state *state;
509 pid_t parent = getpid();
511 if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
512 DEBUG(DEBUG_ERR,("Attempt to change recovery mode to %u when not frozen\n",
514 (*errormsg) = "Cannot change recovery mode while not frozen";
518 if (recmode != ctdb->recovery_mode) {
519 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n",
520 recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
523 if (recmode != CTDB_RECOVERY_NORMAL ||
524 ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
525 ctdb->recovery_mode = recmode;
529 /* some special handling when ending recovery mode */
530 state = talloc(ctdb, struct ctdb_set_recmode_state);
531 CTDB_NO_MEMORY(ctdb, state);
533 /* For the rest of what needs to be done, we need to do this in
534 a child process since
535 1, the call to ctdb_recovery_lock() can block if the cluster
536 filesystem is in the process of recovery.
537 2, running of the script may take a while.
539 ret = pipe(state->fd);
542 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
546 state->child = fork();
547 if (state->child == (pid_t)-1) {
554 if (state->child == 0) {
558 /* we should not be able to get the lock on the nodes list,
559 as it should be held by the recovery master
561 if (ctdb_recovery_lock(ctdb, false)) {
562 DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
566 write(state->fd[1], &cc, 1);
567 /* make sure we die when our parent dies */
568 while (kill(parent, 0) == 0 || errno != ESRCH) {
575 talloc_set_destructor(state, set_recmode_destructor);
577 state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(3, 0),
578 ctdb_set_recmode_timeout, state);
580 state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
581 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
584 if (state->fde == NULL) {
590 state->recmode = recmode;
591 state->c = talloc_steal(state, c);
600 try and get the recovery lock in shared storage - should only work
601 on the recovery master recovery daemon. Anywhere else is a bug
603 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
607 if (ctdb->recovery_lock_fd != -1) {
608 close(ctdb->recovery_lock_fd);
610 ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
611 if (ctdb->recovery_lock_fd == -1) {
612 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n",
613 ctdb->recovery_lock_file, strerror(errno)));
617 set_close_on_exec(ctdb->recovery_lock_fd);
619 lock.l_type = F_WRLCK;
620 lock.l_whence = SEEK_SET;
625 if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
626 close(ctdb->recovery_lock_fd);
627 ctdb->recovery_lock_fd = -1;
629 DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
635 close(ctdb->recovery_lock_fd);
636 ctdb->recovery_lock_fd = -1;
639 DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
645 delete a record as part of the vacuum process
646 only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
647 use non-blocking locks
649 return 0 if the record was successfully deleted (i.e. it does not exist
650 when the function returns)
651 or !0 is the record still exists in the tdb after returning.
653 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
656 struct ctdb_ltdb_header *hdr, *hdr2;
658 /* these are really internal tdb functions - but we need them here for
659 non-blocking lock of the freelist */
660 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
661 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
664 key.dsize = rec->keylen;
665 key.dptr = &rec->data[0];
666 data.dsize = rec->datalen;
667 data.dptr = &rec->data[rec->keylen];
669 if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
670 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
674 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
675 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
679 hdr = (struct ctdb_ltdb_header *)data.dptr;
681 /* use a non-blocking lock */
682 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
686 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
687 if (data.dptr == NULL) {
688 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
692 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
693 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
694 tdb_delete(ctdb_db->ltdb->tdb, key);
695 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
696 DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
698 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
703 hdr2 = (struct ctdb_ltdb_header *)data.dptr;
705 if (hdr2->rsn > hdr->rsn) {
706 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
707 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
708 (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
713 if (hdr2->dmaster == ctdb->pnn) {
714 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
715 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
720 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
721 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
726 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
727 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
728 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
729 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
734 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
735 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
742 struct recovery_callback_state {
743 struct ctdb_req_control *c;
748 called when the 'recovered' event script has finished
750 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
752 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
754 ctdb_enable_monitoring(ctdb);
757 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
760 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
763 gettimeofday(&ctdb->last_recovery_time, NULL);
767 recovery has finished
769 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb,
770 struct ctdb_req_control *c,
774 struct recovery_callback_state *state;
776 DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
778 state = talloc(ctdb, struct recovery_callback_state);
779 CTDB_NO_MEMORY(ctdb, state);
781 state->c = talloc_steal(state, c);
783 ctdb_disable_monitoring(ctdb);
785 ret = ctdb_event_script_callback(ctdb,
786 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
788 ctdb_end_recovery_callback,
792 ctdb_enable_monitoring(ctdb);
794 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
799 /* tell the control that we will be reply asynchronously */
805 called when the 'startrecovery' event script has finished
807 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
809 struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
811 ctdb_enable_monitoring(ctdb);
814 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
817 ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
824 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb,
825 struct ctdb_req_control *c,
829 struct recovery_callback_state *state;
831 DEBUG(DEBUG_NOTICE,("Recovery has started\n"));
833 state = talloc(ctdb, struct recovery_callback_state);
834 CTDB_NO_MEMORY(ctdb, state);
836 state->c = talloc_steal(state, c);
838 ctdb_disable_monitoring(ctdb);
840 ret = ctdb_event_script_callback(ctdb,
841 timeval_current_ofs(ctdb->tunable.script_timeout, 0),
843 ctdb_start_recovery_callback,
844 state, "startrecovery");
847 ctdb_enable_monitoring(ctdb);
849 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
854 /* tell the control that we will be reply asynchronously */
860 report the location for the reclock file
862 int32_t ctdb_control_get_reclock_file(struct ctdb_context *ctdb, TDB_DATA *outdata)
864 char *reclock = NULL;
866 reclock = talloc_strdup(outdata, ctdb->recovery_lock_file);
867 CTDB_NO_MEMORY(ctdb, reclock);
869 outdata->dsize = strlen(reclock)+1;
870 outdata->dptr = (uint8_t *)reclock;
876 try to delete all these records as part of the vacuuming process
877 and return the records we failed to delete
879 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
881 struct ctdb_control_pulldb_reply *reply = (struct ctdb_control_pulldb_reply *)indata.dptr;
882 struct ctdb_db_context *ctdb_db;
884 struct ctdb_rec_data *rec;
885 struct ctdb_control_pulldb_reply *records;
887 if (indata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
888 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
892 ctdb_db = find_ctdb_db(ctdb, reply->db_id);
894 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
899 DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
900 reply->count, reply->db_id));
903 /* create a blob to send back the records we couldnt delete */
904 records = (struct ctdb_control_pulldb_reply *)
905 talloc_zero_size(outdata,
906 offsetof(struct ctdb_control_pulldb_reply, data));
907 if (records == NULL) {
908 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
911 records->db_id = ctdb_db->db_id;
914 rec = (struct ctdb_rec_data *)&reply->data[0];
915 for (i=0;i<reply->count;i++) {
918 key.dptr = &rec->data[0];
919 key.dsize = rec->keylen;
920 data.dptr = &rec->data[key.dsize];
921 data.dsize = rec->datalen;
923 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
924 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
928 /* If we cant delete the record we must add it to the reply
929 so the lmaster knows it may not purge this record
931 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
933 struct ctdb_ltdb_header *hdr;
935 hdr = (struct ctdb_ltdb_header *)data.dptr;
936 data.dptr += sizeof(*hdr);
937 data.dsize -= sizeof(*hdr);
939 DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
941 old_size = talloc_get_size(records);
942 records = talloc_realloc_size(outdata, records, old_size + rec->length);
943 if (records == NULL) {
944 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
948 memcpy(old_size+(uint8_t *)records, rec, rec->length);
951 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
955 outdata->dptr = (uint8_t *)records;
956 outdata->dsize = talloc_get_size(records);
964 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
966 uint32_t *capabilities = NULL;
968 capabilities = talloc(outdata, uint32_t);
969 CTDB_NO_MEMORY(ctdb, capabilities);
970 *capabilities = ctdb->capabilities;
972 outdata->dsize = sizeof(uint32_t);
973 outdata->dptr = (uint8_t *)capabilities;