Merge commit 'rusty/libctdb-new' into foo
[amitay/samba.git] / ctdb / tools / ctdb_vacuum.c
1 /* 
2    ctdb control tool - database vacuum 
3
4    Copyright (C) Andrew Tridgell  2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb_client.h"
25 #include "../include/ctdb_private.h"
26 #include "../common/rb_tree.h"
27 #include "db_wrap.h"
28
29 /* should be tunable */
30 #define TIMELIMIT() timeval_current_ofs(10, 0)
31
32
33 /* 
34    a list of records to possibly delete
35  */
36 struct vacuum_data {
37         uint32_t vacuum_limit;
38         struct ctdb_context *ctdb;
39         struct ctdb_db_context *ctdb_db;
40         trbt_tree_t *delete_tree;
41         uint32_t delete_count;
42         struct ctdb_marshall_buffer **list;
43         bool traverse_error;
44         uint32_t total;
45 };
46
47 /* this structure contains the information for one record to be deleted */
48 struct delete_record_data {
49         struct ctdb_context *ctdb;
50         struct ctdb_db_context *ctdb_db;
51         struct ctdb_ltdb_header hdr;
52         TDB_DATA key;
53 };
54
55 /*
56   traverse function for vacuuming
57  */
58 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
59 {
60         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
61         struct ctdb_context *ctdb = vdata->ctdb;
62         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
63         uint32_t lmaster;
64         struct ctdb_ltdb_header *hdr;
65         struct ctdb_rec_data *rec;
66         size_t old_size;
67                
68         lmaster = ctdb_lmaster(ctdb, &key);
69         if (lmaster >= ctdb->vnn_map->size) {
70                 return 0;
71         }
72
73         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
74                 /* its not a deleted record */
75                 return 0;
76         }
77
78         hdr = (struct ctdb_ltdb_header *)data.dptr;
79
80         if (hdr->dmaster != ctdb->pnn) {
81                 return 0;
82         }
83
84
85         /* is this a records we could possibly delete? I.e.
86            if the record is empty and also we are both lmaster
87            and dmaster for the record we should be able to delete it
88         */
89         if ( (lmaster == ctdb->pnn)
90            &&( (vdata->delete_count < vdata->vacuum_limit)
91              ||(vdata->vacuum_limit == 0) ) ){
92                 uint32_t hash;
93
94                 hash = ctdb_hash(&key);
95                 if (trbt_lookup32(vdata->delete_tree, hash)) {
96                         DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
97                 } else {
98                         struct delete_record_data *dd;
99
100                         /* store key and header indexed by the key hash */
101                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
102                         if (dd == NULL) {
103                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
104                                 return -1;
105                         }
106                         dd->ctdb      = ctdb;
107                         dd->ctdb_db   = ctdb_db;
108                         dd->key.dsize = key.dsize;
109                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
110                         if (dd->key.dptr == NULL) {
111                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
112                                 return -1;
113                         }
114
115                         dd->hdr = *hdr;
116
117         
118                         trbt_insert32(vdata->delete_tree, hash, dd);
119
120                         vdata->delete_count++;
121                 }
122         }
123
124
125         /* add the record to the blob ready to send to the nodes */
126         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
127         if (rec == NULL) {
128                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
129                 vdata->traverse_error = true;
130                 return -1;
131         }
132         old_size = talloc_get_size(vdata->list[lmaster]);
133         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
134                                                    old_size + rec->length);
135         if (vdata->list[lmaster] == NULL) {
136                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
137                 vdata->traverse_error = true;
138                 return -1;
139         }
140         vdata->list[lmaster]->count++;
141         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
142         talloc_free(rec);
143
144         vdata->total++;
145
146         /* don't gather too many records */
147         if (vdata->vacuum_limit != 0 &&
148             vdata->total == vdata->vacuum_limit) {
149                 return -1;
150         }
151
152         return 0;
153 }
154
155 struct delete_records_list {
156         struct ctdb_marshall_buffer *records;
157 };
158
159 /*
160  traverse the tree of records to delete and marshall them into
161  a blob
162 */
163 static void
164 delete_traverse(void *param, void *data)
165 {
166         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
167         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
168         struct ctdb_rec_data *rec;
169         size_t old_size;
170
171         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
172         if (rec == NULL) {
173                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
174                 return;
175         }
176
177         old_size = talloc_get_size(recs->records);
178         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
179         if (recs->records == NULL) {
180                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
181                 return;
182         }
183         recs->records->count++;
184         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
185 }
186
187
188 static void delete_record(void *param, void *d)
189 {
190         struct delete_record_data *dd = talloc_get_type(d, struct delete_record_data);
191         struct ctdb_context *ctdb = dd->ctdb;
192         struct ctdb_db_context *ctdb_db = dd->ctdb_db;
193         uint32_t *count = (uint32_t *)param;
194         struct ctdb_ltdb_header *hdr;
195         TDB_DATA data;
196
197         /* its deleted on all other nodes - refetch, check and delete */
198         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key) != 0) {
199                 /* the chain is busy - come back later */
200                 return;
201         }
202
203         data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
204         if (data.dptr == NULL) {
205                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
206                 return;
207         }
208         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
209                 free(data.dptr);
210                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
211                 return;
212         }
213
214         hdr = (struct ctdb_ltdb_header *)data.dptr;
215
216         /* if we are not the lmaster and the dmaster then skip the record */
217         if (hdr->dmaster != ctdb->pnn ||
218             ctdb_lmaster(ctdb, &(dd->key)) != ctdb->pnn ||
219             dd->hdr.rsn != hdr->rsn) {
220                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
221                 free(data.dptr);
222                 return;
223         }
224
225         ctdb_block_signal(SIGALRM);
226         tdb_delete(ctdb_db->ltdb->tdb, dd->key);
227         ctdb_unblock_signal(SIGALRM);
228         tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
229         free(data.dptr);
230
231         (*count)++;
232 }
233
234 /* vacuum one database */
235 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
236                           bool persistent, uint32_t vacuum_limit)
237 {
238         struct ctdb_db_context *ctdb_db;
239         const char *name;
240         struct vacuum_data *vdata;
241         int i;
242
243         vdata = talloc_zero(ctdb, struct vacuum_data);
244         if (vdata == NULL) {
245                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
246                 return -1;
247         }
248
249         vdata->ctdb = ctdb;
250         vdata->vacuum_limit = vacuum_limit;
251         vdata->delete_tree = trbt_create(vdata, 0);
252         if (vdata->delete_tree == NULL) {
253                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
254                 return -1;
255         }
256
257         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
258                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
259                 talloc_free(vdata);
260                 return -1;
261         }
262
263         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
264         if (ctdb_db == NULL) {
265                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
266                 talloc_free(vdata);
267                 return -1;
268         }
269         vdata->ctdb_db = ctdb_db;
270
271         /* the list needs to be of length num_nodes */
272         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
273         if (vdata->list == NULL) {
274                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
275                 talloc_free(vdata);
276                 return -1;
277         }
278         for (i=0;i<ctdb->vnn_map->size;i++) {
279                 vdata->list[i] = (struct ctdb_marshall_buffer *)
280                         talloc_zero_size(vdata->list, 
281                                     offsetof(struct ctdb_marshall_buffer, data));
282                 if (vdata->list[i] == NULL) {
283                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
284                         talloc_free(vdata);
285                         return -1;
286                 }
287                 vdata->list[i]->db_id = db_id;
288         }
289
290         /* traverse, looking for records that might be able to be vacuumed */
291         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
292             vdata->traverse_error) {
293                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
294                 talloc_free(vdata);
295                 return -1;              
296         }
297
298
299         for (i=0;i<ctdb->vnn_map->size;i++) {
300                 if (vdata->list[i]->count == 0) {
301                         continue;
302                 }
303
304                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
305                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
306                         TDB_DATA data;
307                         printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
308
309                         data.dsize = talloc_get_size(vdata->list[i]);
310                         data.dptr  = (void *)vdata->list[i];
311                         if (ctdb_client_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
312                                 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
313                                          ctdb->vnn_map->map[i]));
314                                 talloc_free(vdata);
315                                 return -1;              
316                         }
317                         continue;
318                 }
319         }       
320
321
322         /* Process all records we can delete (if any) */
323         if (vdata->delete_count > 0) {
324                 struct delete_records_list *recs;
325                 TDB_DATA indata, outdata;
326                 int ret;
327                 int32_t res;
328                 uint32_t count;
329
330                 recs = talloc_zero(vdata, struct delete_records_list);
331                 if (recs == NULL) {
332                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
333                         return -1;
334                 }
335                 recs->records = (struct ctdb_marshall_buffer *)
336                         talloc_zero_size(vdata, 
337                                     offsetof(struct ctdb_marshall_buffer, data));
338                 if (recs->records == NULL) {
339                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
340                         return -1;
341                 }
342                 recs->records->db_id = db_id;
343
344                 /* traverse the tree of all records we want to delete and
345                    create a blob we can send to the other nodes.
346                 */
347                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
348
349                 indata.dsize = talloc_get_size(recs->records);
350                 indata.dptr  = (void *)recs->records;
351
352                 /* now tell all the other nodes to delete all these records
353                    (if possible)
354                  */
355                 for (i=0;i<ctdb->vnn_map->size;i++) {
356                         struct ctdb_marshall_buffer *records;
357                         struct ctdb_rec_data *rec;
358
359                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
360                                 /* we dont delete the records on the local node
361                                    just yet
362                                 */
363                                 continue;
364                         }
365
366                         ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
367                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
368                                         indata, recs, &outdata, &res,
369                                         NULL, NULL);
370                         if (ret != 0 || res != 0) {
371                                 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
372                                 exit(10);
373                         }
374
375                         /* outdata countains the list of records coming back
376                            from the node which the node could not delete
377                         */
378                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
379                         rec = (struct ctdb_rec_data *)&records->data[0];
380                         while (records->count-- > 1) {
381                                 TDB_DATA reckey, recdata;
382                                 struct ctdb_ltdb_header *rechdr;
383
384                                 reckey.dptr = &rec->data[0];
385                                 reckey.dsize = rec->keylen;
386                                 recdata.dptr = &rec->data[reckey.dsize];
387                                 recdata.dsize = rec->datalen;
388
389                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
390                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
391                                         exit(10);
392                                 }
393                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
394                                 recdata.dptr += sizeof(*rechdr);
395                                 recdata.dsize -= sizeof(*rechdr);
396
397                                 /* that other node couldnt delete the record
398                                    so we shouldnt delete it either.
399                                    remove it from the tree.
400                                 */
401                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
402
403                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
404                         }           
405                 }
406
407
408                 /* the only records remaining in the tree would be those
409                    records where all other nodes could successfully
410                    delete them, so we can now safely delete them on the
411                    lmaster as well.
412                 */
413                 count = 0;
414                 trbt_traversearray32(vdata->delete_tree, 1, delete_record, &count);
415                 if (vdata->delete_count != 0) {
416                         printf("Deleted %u records out of %u on this node from '%s'\n", count, vdata->delete_count, name);
417                 }
418         }
419
420         /* this ensures we run our event queue */
421         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
422
423         talloc_free(vdata);
424
425         return 0;
426 }
427
428
429 /*
430   vacuum all our databases
431  */
432 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
433 {
434         struct ctdb_dbid_map *dbmap=NULL;
435         struct ctdb_node_map *nodemap=NULL;
436         int ret, i, pnn;
437         uint32_t vacuum_limit = 0;
438
439         if (argc > 0) {
440                 vacuum_limit = atoi(argv[0]);
441         }
442
443         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
444         if (ret != 0) {
445                 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
446                 return ret;
447         }
448
449         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
450         if (ret != 0) {
451                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
452                 return ret;
453         }
454
455         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
456         if (ret != 0) {
457                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
458                 return ret;
459         }
460
461         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
462         if (pnn == -1) {
463                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
464                 return -1;
465         }
466         ctdb->pnn = pnn;
467
468         for (i=0;i<dbmap->num;i++) {
469                 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap, 
470                                    dbmap->dbs[i].persistent, vacuum_limit) != 0) {
471                         DEBUG(DEBUG_ERR,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
472                         return -1;
473                 }
474         }
475
476         return 0;
477 }
478
479 struct traverse_state {
480         bool error;
481         struct tdb_context *dest_db;
482 };
483
484 /*
485   traverse function for repacking
486  */
487 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
488 {
489         struct traverse_state *state = (struct traverse_state *)private;
490         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
491                 state->error = true;
492                 return -1;
493         }
494         return 0;
495 }
496
497 /*
498   repack a tdb
499  */
500 static int ctdb_repack_tdb(struct tdb_context *tdb)
501 {
502         struct tdb_context *tmp_db;
503         struct traverse_state state;
504
505         if (tdb_transaction_start(tdb) != 0) {
506                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
507                 return -1;
508         }
509
510         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
511                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
512                           O_RDWR|O_CREAT, 0);
513         if (tmp_db == NULL) {
514                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
515                 tdb_transaction_cancel(tdb);
516                 return -1;
517         }
518
519         state.error = false;
520         state.dest_db = tmp_db;
521
522         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
523                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
524                 tdb_transaction_cancel(tdb);
525                 tdb_close(tmp_db);
526                 return -1;              
527         }
528
529         if (state.error) {
530                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
531                 tdb_transaction_cancel(tdb);
532                 tdb_close(tmp_db);
533                 return -1;
534         }
535
536         if (tdb_wipe_all(tdb) != 0) {
537                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
538                 tdb_transaction_cancel(tdb);
539                 tdb_close(tmp_db);
540                 return -1;
541         }
542
543         state.error = false;
544         state.dest_db = tdb;
545
546         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
547                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
548                 tdb_transaction_cancel(tdb);
549                 tdb_close(tmp_db);
550                 return -1;              
551         }
552
553         if (state.error) {
554                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
555                 tdb_transaction_cancel(tdb);
556                 tdb_close(tmp_db);
557                 return -1;
558         }
559
560         tdb_close(tmp_db);
561
562         if (tdb_transaction_commit(tdb) != 0) {
563                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
564                 return -1;
565         }
566
567         return 0;
568 }
569
570
571 /* repack one database */
572 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id, 
573                           bool persistent, uint32_t repack_limit)
574 {
575         struct ctdb_db_context *ctdb_db;
576         const char *name;
577         int size;
578
579         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
580                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
581                 return -1;
582         }
583
584         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
585         if (ctdb_db == NULL) {
586                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
587                 return -1;
588         }
589
590         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
591         if (size == -1) {
592                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
593                 return -1;
594         }
595
596         if (size <= repack_limit) {
597                 return 0;
598         }
599
600         printf("Repacking %s with %u freelist entries\n", name, size);
601
602         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
603                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
604                 return -1;
605         }
606
607         return 0;
608 }
609
610
611 /*
612   repack all our databases
613  */
614 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
615 {
616         struct ctdb_dbid_map *dbmap=NULL;
617         int ret, i;
618         /* a reasonable default limit to prevent us using too much memory */
619         uint32_t repack_limit = 10000; 
620
621         if (argc > 0) {
622                 repack_limit = atoi(argv[0]);
623         }
624
625         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
626         if (ret != 0) {
627                 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
628                 return ret;
629         }
630
631         for (i=0;i<dbmap->num;i++) {
632                 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid, 
633                                    dbmap->dbs[i].persistent, repack_limit) != 0) {
634                         DEBUG(DEBUG_ERR,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
635                         return -1;
636                 }
637         }
638
639         return 0;
640 }