2 ctdb control tool - database vacuum
4 Copyright (C) Andrew Tridgell 2008
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb.h"
25 #include "../include/ctdb_private.h"
28 /* should be tunable */
29 #define TIMELIMIT() timeval_current_ofs(10, 0)
36 static void async_callback(struct ctdb_client_control_state *state)
38 struct async_data *data = talloc_get_type(state->async.private_data, struct async_data);
42 /* one more node has responded with recmode data */
45 /* if we failed to push the db, then return an error and let
46 the main loop try again.
48 if (state->state != CTDB_CONTROL_DONE) {
53 state->async.fn = NULL;
55 ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
56 if ((ret != 0) || (res != 0)) {
61 static void async_add(struct async_data *data, struct ctdb_client_control_state *state)
63 /* set up the callback functions */
64 state->async.fn = async_callback;
65 state->async.private_data = data;
67 /* one more control to wait for to complete */
72 /* wait for up to the maximum number of seconds allowed
73 or until all nodes we expect a response from has replied
75 static int async_wait(struct ctdb_context *ctdb, struct async_data *data)
77 while (data->count > 0) {
78 event_loop_once(ctdb->ev);
80 if (data->fail_count != 0) {
81 DEBUG(0,("Async wait failed - fail_count=%u\n", data->fail_count));
88 perform a simple control on nodes in the vnn map except ourselves.
89 The control cannot return data
91 static int async_control_on_vnnmap(struct ctdb_context *ctdb, enum ctdb_controls opcode,
94 struct async_data *async_data;
95 struct ctdb_client_control_state *state;
97 struct timeval timeout = TIMELIMIT();
99 async_data = talloc_zero(ctdb, struct async_data);
100 CTDB_NO_MEMORY_FATAL(ctdb, async_data);
102 /* loop over all active nodes and send an async control to each of them */
103 for (j=0; j<ctdb->vnn_map->size; j++) {
104 uint32_t pnn = ctdb->vnn_map->map[j];
105 if (pnn == ctdb->pnn) {
108 state = ctdb_control_send(ctdb, pnn, 0, opcode,
109 0, data, async_data, NULL, &timeout, NULL);
111 DEBUG(0,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
112 talloc_free(async_data);
116 async_add(async_data, state);
119 if (async_wait(ctdb, async_data) != 0) {
120 talloc_free(async_data);
124 talloc_free(async_data);
132 static int ctdb_vacuum_one(struct ctdb_context *ctdb, TDB_DATA key, struct ctdb_db_context *ctdb_db)
135 struct ctdb_ltdb_header *hdr;
136 struct ctdb_rec_data *rec;
139 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
140 /* the chain is busy - come back later */
144 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
145 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
146 if (data.dptr == NULL) {
149 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
155 hdr = (struct ctdb_ltdb_header *)data.dptr;
158 /* if we are not the lmaster and the dmaster then skip the record */
159 if (hdr->dmaster != ctdb->pnn ||
160 ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
165 rec = ctdb_marshall_record(ctdb, ctdb_db->db_id, key, hdr, tdb_null);
168 /* try it again later */
172 data.dptr = (void *)rec;
173 data.dsize = rec->length;
175 if (async_control_on_vnnmap(ctdb, CTDB_CONTROL_DELETE_RECORD, data) != 0) {
176 /* one or more nodes failed to delete a record - no problem! */
183 /* its deleted on all other nodes - refetch, check and delete */
184 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
185 /* the chain is busy - come back later */
189 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
190 if (data.dptr == NULL) {
191 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
194 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
196 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
200 hdr = (struct ctdb_ltdb_header *)data.dptr;
202 /* if we are not the lmaster and the dmaster then skip the record */
203 if (hdr->dmaster != ctdb->pnn ||
204 ctdb_lmaster(ctdb, &key) != ctdb->pnn ||
206 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
211 tdb_delete(ctdb_db->ltdb->tdb, key);
212 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
220 vacuum records for which we are the lmaster
222 static int ctdb_vacuum_local(struct ctdb_context *ctdb, struct ctdb_control_pulldb_reply *list,
223 struct ctdb_db_context *ctdb_db)
225 struct ctdb_rec_data *r;
228 r = (struct ctdb_rec_data *)&list->data[0];
232 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r), i++) {
234 key.dptr = &r->data[0];
235 key.dsize = r->keylen;
236 if (ctdb_vacuum_one(ctdb, key, ctdb_db) != 0) {
245 a list of records to possibly delete
248 uint32_t vacuum_limit;
249 struct ctdb_context *ctdb;
250 struct ctdb_control_pulldb_reply **list;
256 traverse function for vacuuming
258 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
260 struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
262 struct ctdb_ltdb_header *hdr;
263 struct ctdb_rec_data *rec;
266 lmaster = ctdb_lmaster(vdata->ctdb, &key);
267 if (lmaster >= vdata->ctdb->vnn_map->size) {
271 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
272 /* its not a deleted record */
276 hdr = (struct ctdb_ltdb_header *)data.dptr;
278 if (hdr->dmaster != vdata->ctdb->pnn) {
283 /* add the record to the blob ready to send to the nodes */
284 rec = ctdb_marshall_record(vdata->list[lmaster], vdata->ctdb->pnn, key, NULL, tdb_null);
286 DEBUG(0,(__location__ " Out of memory\n"));
287 vdata->traverse_error = true;
290 old_size = talloc_get_size(vdata->list[lmaster]);
291 vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster],
292 old_size + rec->length);
293 if (vdata->list[lmaster] == NULL) {
294 DEBUG(0,(__location__ " Failed to expand\n"));
295 vdata->traverse_error = true;
298 vdata->list[lmaster]->count++;
299 memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
304 /* don't gather too many records */
305 if (vdata->vacuum_limit != 0 &&
306 vdata->total == vdata->vacuum_limit) {
314 /* vacuum one database */
315 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
316 bool persistent, uint32_t vacuum_limit)
318 struct ctdb_db_context *ctdb_db;
320 struct vacuum_data *vdata;
323 vdata = talloc_zero(ctdb, struct vacuum_data);
325 DEBUG(0,(__location__ " Out of memory\n"));
330 vdata->vacuum_limit = vacuum_limit;
332 if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
333 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
338 ctdb_db = ctdb_attach(ctdb, name, persistent);
339 if (ctdb_db == NULL) {
340 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
345 /* the list needs to be of length num_nodes */
346 vdata->list = talloc_array(vdata, struct ctdb_control_pulldb_reply *, ctdb->vnn_map->size);
347 if (vdata->list == NULL) {
348 DEBUG(0,(__location__ " Out of memory\n"));
352 for (i=0;i<ctdb->vnn_map->size;i++) {
353 vdata->list[i] = (struct ctdb_control_pulldb_reply *)
354 talloc_zero_size(vdata->list,
355 offsetof(struct ctdb_control_pulldb_reply, data));
356 if (vdata->list[i] == NULL) {
357 DEBUG(0,(__location__ " Out of memory\n"));
361 vdata->list[i]->db_id = db_id;
364 /* traverse, looking for records that might be able to be vacuumed */
365 if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
366 vdata->traverse_error) {
367 DEBUG(0,(__location__ " Traverse error in vacuuming '%s'\n", name));
373 for (i=0;i<ctdb->vnn_map->size;i++) {
374 if (vdata->list[i]->count == 0) {
378 printf("Found %u records for lmaster %u\n", vdata->list[i]->count, i);
380 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
381 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
383 data.dsize = talloc_get_size(vdata->list[i]);
384 data.dptr = (void *)vdata->list[i];
385 if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
386 DEBUG(0,(__location__ " Failed to send vacuum fetch message to %u\n",
387 ctdb->vnn_map->map[i]));
394 /* for records where we are the lmaster, we can try to delete them */
395 if (ctdb_vacuum_local(ctdb, vdata->list[i], ctdb_db) != 0) {
396 DEBUG(0,(__location__ " Deletion error in vacuuming '%s'\n", name));
402 /* this ensures we run our event queue */
403 ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
412 vacuum all our databases
414 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
416 struct ctdb_dbid_map *dbmap=NULL;
417 struct ctdb_node_map *nodemap=NULL;
419 uint32_t vacuum_limit = 0;
422 vacuum_limit = atoi(argv[0]);
425 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
427 DEBUG(0, ("Unable to get dbids from local node\n"));
431 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
433 DEBUG(0, ("Unable to get nodemap from local node\n"));
437 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
439 DEBUG(0, ("Unable to get vnnmap from local node\n"));
443 pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
445 DEBUG(0, ("Unable to get pnn from local node\n"));
450 for (i=0;i<dbmap->num;i++) {
451 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap,
452 dbmap->dbs[i].persistent, vacuum_limit) != 0) {
453 DEBUG(0,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
461 struct traverse_state {
463 struct tdb_context *dest_db;
467 traverse function for repacking
469 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
471 struct traverse_state *state = (struct traverse_state *)private;
472 if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
482 static int ctdb_repack_tdb(struct tdb_context *tdb)
484 struct tdb_context *tmp_db;
485 struct traverse_state state;
487 if (tdb_transaction_start(tdb) != 0) {
488 DEBUG(0,(__location__ " Failed to start transaction\n"));
492 tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
493 if (tmp_db == NULL) {
494 DEBUG(0,(__location__ " Failed to create tmp_db\n"));
495 tdb_transaction_cancel(tdb);
500 state.dest_db = tmp_db;
502 if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
503 DEBUG(0,(__location__ " Failed to traverse copying out\n"));
504 tdb_transaction_cancel(tdb);
510 DEBUG(0,(__location__ " Error during traversal\n"));
511 tdb_transaction_cancel(tdb);
516 if (tdb_wipe_all(tdb) != 0) {
517 DEBUG(0,(__location__ " Failed to wipe database\n"));
518 tdb_transaction_cancel(tdb);
526 if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
527 DEBUG(0,(__location__ " Failed to traverse copying back\n"));
528 tdb_transaction_cancel(tdb);
534 DEBUG(0,(__location__ " Error during second traversal\n"));
535 tdb_transaction_cancel(tdb);
542 if (tdb_transaction_commit(tdb) != 0) {
543 DEBUG(0,(__location__ " Failed to commit\n"));
551 /* repack one database */
552 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id,
553 bool persistent, uint32_t repack_limit)
555 struct ctdb_db_context *ctdb_db;
559 if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
560 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
564 ctdb_db = ctdb_attach(ctdb, name, persistent);
565 if (ctdb_db == NULL) {
566 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
570 size = tdb_freelist_size(ctdb_db->ltdb->tdb);
572 DEBUG(0,(__location__ " Failed to get freelist size for '%s'\n", name));
576 if (size <= repack_limit) {
580 DEBUG(0,("Repacking %s with %u freelist entries\n", name, size));
582 if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
583 DEBUG(0,(__location__ " Failed to repack '%s'\n", name));
592 repack all our databases
594 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
596 struct ctdb_dbid_map *dbmap=NULL;
598 uint32_t repack_limit = 100;
601 repack_limit = atoi(argv[0]);
604 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
606 DEBUG(0, ("Unable to get dbids from local node\n"));
610 for (i=0;i<dbmap->num;i++) {
611 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid,
612 dbmap->dbs[i].persistent, repack_limit) != 0) {
613 DEBUG(0,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));