2 ctdb control tool - database vacuum
4 Copyright (C) Andrew Tridgell 2008
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb.h"
25 #include "../include/ctdb_private.h"
26 #include "../common/rb_tree.h"
29 /* should be tunable */
30 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 a list of records to possibly delete
37 uint32_t vacuum_limit;
38 struct ctdb_context *ctdb;
39 struct ctdb_db_context *ctdb_db;
40 trbt_tree_t *delete_tree;
41 uint32_t delete_count;
42 struct ctdb_marshall_buffer **list;
47 /* this structure contains the information for one record to be deleted */
48 struct delete_record_data {
49 struct ctdb_context *ctdb;
50 struct ctdb_db_context *ctdb_db;
51 struct ctdb_ltdb_header hdr;
56 traverse function for vacuuming
58 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
60 struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
61 struct ctdb_context *ctdb = vdata->ctdb;
62 struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
64 struct ctdb_ltdb_header *hdr;
65 struct ctdb_rec_data *rec;
68 lmaster = ctdb_lmaster(ctdb, &key);
69 if (lmaster >= ctdb->vnn_map->size) {
73 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
74 /* its not a deleted record */
78 hdr = (struct ctdb_ltdb_header *)data.dptr;
80 if (hdr->dmaster != ctdb->pnn) {
85 /* is this a records we could possibly delete? I.e.
86 if the record is empty and also we are both lmaster
87 and dmaster for the record we should be able to delete it
89 if ( (lmaster == ctdb->pnn)
90 &&( (vdata->delete_count < vdata->vacuum_limit)
91 ||(vdata->vacuum_limit == 0) ) ){
94 hash = ctdb_hash(&key);
95 if (trbt_lookup32(vdata->delete_tree, hash)) {
96 DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
98 struct delete_record_data *dd;
100 /* store key and header indexed by the key hash */
101 dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
103 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
107 dd->ctdb_db = ctdb_db;
108 dd->key.dsize = key.dsize;
109 dd->key.dptr = talloc_memdup(dd, key.dptr, key.dsize);
110 if (dd->key.dptr == NULL) {
111 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
118 trbt_insert32(vdata->delete_tree, hash, dd);
120 vdata->delete_count++;
125 /* add the record to the blob ready to send to the nodes */
126 rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
128 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
129 vdata->traverse_error = true;
132 old_size = talloc_get_size(vdata->list[lmaster]);
133 vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster],
134 old_size + rec->length);
135 if (vdata->list[lmaster] == NULL) {
136 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
137 vdata->traverse_error = true;
140 vdata->list[lmaster]->count++;
141 memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
146 /* don't gather too many records */
147 if (vdata->vacuum_limit != 0 &&
148 vdata->total == vdata->vacuum_limit) {
155 struct delete_records_list {
156 struct ctdb_marshall_buffer *records;
160 traverse the tree of records to delete and marshall them into
164 delete_traverse(void *param, void *data)
166 struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
167 struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
168 struct ctdb_rec_data *rec;
171 rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
173 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
177 old_size = talloc_get_size(recs->records);
178 recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
179 if (recs->records == NULL) {
180 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
183 recs->records->count++;
184 memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
188 static void delete_record(void *param, void *d)
190 struct delete_record_data *dd = talloc_get_type(d, struct delete_record_data);
191 struct ctdb_context *ctdb = dd->ctdb;
192 struct ctdb_db_context *ctdb_db = dd->ctdb_db;
193 uint32_t *count = (uint32_t *)param;
194 struct ctdb_ltdb_header *hdr;
197 /* its deleted on all other nodes - refetch, check and delete */
198 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key) != 0) {
199 /* the chain is busy - come back later */
203 data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
204 if (data.dptr == NULL) {
205 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
208 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
210 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
214 hdr = (struct ctdb_ltdb_header *)data.dptr;
216 /* if we are not the lmaster and the dmaster then skip the record */
217 if (hdr->dmaster != ctdb->pnn ||
218 ctdb_lmaster(ctdb, &(dd->key)) != ctdb->pnn ||
219 dd->hdr.rsn != hdr->rsn) {
220 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
225 ctdb_block_signal(SIGALRM);
226 tdb_delete(ctdb_db->ltdb->tdb, dd->key);
227 ctdb_unblock_signal(SIGALRM);
228 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
234 /* vacuum one database */
235 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
236 bool persistent, uint32_t vacuum_limit)
238 struct ctdb_db_context *ctdb_db;
240 struct vacuum_data *vdata;
243 vdata = talloc_zero(ctdb, struct vacuum_data);
245 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
250 vdata->vacuum_limit = vacuum_limit;
251 vdata->delete_tree = trbt_create(vdata, 0);
252 if (vdata->delete_tree == NULL) {
253 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
257 if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
258 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
263 ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
264 if (ctdb_db == NULL) {
265 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
269 vdata->ctdb_db = ctdb_db;
271 /* the list needs to be of length num_nodes */
272 vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
273 if (vdata->list == NULL) {
274 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
278 for (i=0;i<ctdb->vnn_map->size;i++) {
279 vdata->list[i] = (struct ctdb_marshall_buffer *)
280 talloc_zero_size(vdata->list,
281 offsetof(struct ctdb_marshall_buffer, data));
282 if (vdata->list[i] == NULL) {
283 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
287 vdata->list[i]->db_id = db_id;
290 /* traverse, looking for records that might be able to be vacuumed */
291 if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
292 vdata->traverse_error) {
293 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
299 for (i=0;i<ctdb->vnn_map->size;i++) {
300 if (vdata->list[i]->count == 0) {
304 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
305 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
307 printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
309 data.dsize = talloc_get_size(vdata->list[i]);
310 data.dptr = (void *)vdata->list[i];
311 if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
312 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
313 ctdb->vnn_map->map[i]));
322 /* Process all records we can delete (if any) */
323 if (vdata->delete_count > 0) {
324 struct delete_records_list *recs;
325 TDB_DATA indata, outdata;
330 recs = talloc_zero(vdata, struct delete_records_list);
332 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
335 recs->records = (struct ctdb_marshall_buffer *)
336 talloc_zero_size(vdata,
337 offsetof(struct ctdb_marshall_buffer, data));
338 if (recs->records == NULL) {
339 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
342 recs->records->db_id = db_id;
344 /* traverse the tree of all records we want to delete and
345 create a blob we can send to the other nodes.
347 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
349 indata.dsize = talloc_get_size(recs->records);
350 indata.dptr = (void *)recs->records;
352 /* now tell all the other nodes to delete all these records
355 for (i=0;i<ctdb->vnn_map->size;i++) {
356 struct ctdb_marshall_buffer *records;
357 struct ctdb_rec_data *rec;
359 if (ctdb->vnn_map->map[i] == ctdb->pnn) {
360 /* we dont delete the records on the local node
366 ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
367 CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
368 indata, recs, &outdata, &res,
370 if (ret != 0 || res != 0) {
371 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
375 /* outdata countains the list of records coming back
376 from the node which the node could not delete
378 records = (struct ctdb_marshall_buffer *)outdata.dptr;
379 rec = (struct ctdb_rec_data *)&records->data[0];
380 while (records->count-- > 1) {
381 TDB_DATA reckey, recdata;
382 struct ctdb_ltdb_header *rechdr;
384 reckey.dptr = &rec->data[0];
385 reckey.dsize = rec->keylen;
386 recdata.dptr = &rec->data[reckey.dsize];
387 recdata.dsize = rec->datalen;
389 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
390 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
393 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
394 recdata.dptr += sizeof(*rechdr);
395 recdata.dsize -= sizeof(*rechdr);
397 /* that other node couldnt delete the record
398 so we shouldnt delete it either.
399 remove it from the tree.
401 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
403 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
408 /* the only records remaining in the tree would be those
409 records where all other nodes could successfully
410 delete them, so we can now safely delete them on the
414 trbt_traversearray32(vdata->delete_tree, 1, delete_record, &count);
415 if (vdata->delete_count != 0) {
416 printf("Deleted %u records out of %u on this node from '%s'\n", count, vdata->delete_count, name);
420 /* this ensures we run our event queue */
421 ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
430 vacuum all our databases
432 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
434 struct ctdb_dbid_map *dbmap=NULL;
435 struct ctdb_node_map *nodemap=NULL;
437 uint32_t vacuum_limit = 0;
440 vacuum_limit = atoi(argv[0]);
443 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
445 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
449 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
451 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
455 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
457 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
461 pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
463 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
468 for (i=0;i<dbmap->num;i++) {
469 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap,
470 dbmap->dbs[i].persistent, vacuum_limit) != 0) {
471 DEBUG(DEBUG_ERR,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
479 struct traverse_state {
481 struct tdb_context *dest_db;
485 traverse function for repacking
487 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
489 struct traverse_state *state = (struct traverse_state *)private;
490 if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
500 static int ctdb_repack_tdb(struct tdb_context *tdb)
502 struct tdb_context *tmp_db;
503 struct traverse_state state;
505 if (tdb_transaction_start(tdb) != 0) {
506 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
510 tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
511 TDB_INTERNAL|TDB_DISALLOW_NESTING,
513 if (tmp_db == NULL) {
514 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
515 tdb_transaction_cancel(tdb);
520 state.dest_db = tmp_db;
522 if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
523 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
524 tdb_transaction_cancel(tdb);
530 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
531 tdb_transaction_cancel(tdb);
536 if (tdb_wipe_all(tdb) != 0) {
537 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
538 tdb_transaction_cancel(tdb);
546 if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
547 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
548 tdb_transaction_cancel(tdb);
554 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
555 tdb_transaction_cancel(tdb);
562 if (tdb_transaction_commit(tdb) != 0) {
563 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
571 /* repack one database */
572 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id,
573 bool persistent, uint32_t repack_limit)
575 struct ctdb_db_context *ctdb_db;
579 if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
580 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
584 ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
585 if (ctdb_db == NULL) {
586 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
590 size = tdb_freelist_size(ctdb_db->ltdb->tdb);
592 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
596 if (size <= repack_limit) {
600 printf("Repacking %s with %u freelist entries\n", name, size);
602 if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
603 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
612 repack all our databases
614 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
616 struct ctdb_dbid_map *dbmap=NULL;
618 /* a reasonable default limit to prevent us using too much memory */
619 uint32_t repack_limit = 10000;
622 repack_limit = atoi(argv[0]);
625 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
627 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
631 for (i=0;i<dbmap->num;i++) {
632 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid,
633 dbmap->dbs[i].persistent, repack_limit) != 0) {
634 DEBUG(DEBUG_ERR,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));