merge from tridge
[amitay/samba.git] / ctdb / tools / ctdb_vacuum.c
1 /* 
2    ctdb control tool - database vacuum 
3
4    Copyright (C) Andrew Tridgell  2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb.h"
25 #include "../include/ctdb_private.h"
26 #include "db_wrap.h"
27
28 /* should be tunable */
29 #define TIMELIMIT() timeval_current_ofs(10, 0)
30
31 struct async_data {
32         uint32_t count;
33         uint32_t fail_count;
34 };
35
36 static void async_callback(struct ctdb_client_control_state *state)
37 {
38         struct async_data *data = talloc_get_type(state->async.private_data, struct async_data);
39         int ret;
40         int32_t res;
41
42         /* one more node has responded with recmode data */
43         data->count--;
44
45         /* if we failed to push the db, then return an error and let
46            the main loop try again.
47         */
48         if (state->state != CTDB_CONTROL_DONE) {
49                 data->fail_count++;
50                 return;
51         }
52         
53         state->async.fn = NULL;
54
55         ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
56         if ((ret != 0) || (res != 0)) {
57                 data->fail_count++;
58         }
59 }
60
61 static void async_add(struct async_data *data, struct ctdb_client_control_state *state)
62 {
63         /* set up the callback functions */
64         state->async.fn = async_callback;
65         state->async.private_data = data;
66         
67         /* one more control to wait for to complete */
68         data->count++;
69 }
70
71
72 /* wait for up to the maximum number of seconds allowed
73    or until all nodes we expect a response from has replied
74 */
75 static int async_wait(struct ctdb_context *ctdb, struct async_data *data)
76 {
77         while (data->count > 0) {
78                 event_loop_once(ctdb->ev);
79         }
80         if (data->fail_count != 0) {
81                 return -1;
82         }
83         return 0;
84 }
85
86 /* 
87    perform a simple control on nodes in the vnn map except ourselves.
88    The control cannot return data
89  */
90 static int async_control_on_vnnmap(struct ctdb_context *ctdb, enum ctdb_controls opcode,
91                                    TDB_DATA data)
92 {
93         struct async_data *async_data;
94         struct ctdb_client_control_state *state;
95         int j;
96         struct timeval timeout = TIMELIMIT();
97         
98         async_data = talloc_zero(ctdb, struct async_data);
99         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
100
101         /* loop over all active nodes and send an async control to each of them */
102         for (j=0; j<ctdb->vnn_map->size; j++) {
103                 uint32_t pnn = ctdb->vnn_map->map[j];
104                 if (pnn == ctdb->pnn) {
105                         continue;
106                 }
107                 state = ctdb_control_send(ctdb, pnn, 0, opcode, 
108                                           0, data, async_data, NULL, &timeout, NULL);
109                 if (state == NULL) {
110                         DEBUG(0,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
111                         talloc_free(async_data);
112                         return -1;
113                 }
114                 
115                 async_add(async_data, state);
116         }
117
118         if (async_wait(ctdb, async_data) != 0) {
119                 talloc_free(async_data);
120                 return -1;
121         }
122
123         talloc_free(async_data);
124         return 0;
125 }
126
127
128 /*
129   vacuum one record
130  */
131 static int ctdb_vacuum_one(struct ctdb_context *ctdb, TDB_DATA key, 
132                            struct ctdb_db_context *ctdb_db, uint32_t *count)
133 {
134         TDB_DATA data;
135         struct ctdb_ltdb_header *hdr;
136         struct ctdb_rec_data *rec;
137         uint64_t rsn;
138
139         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
140                 /* the chain is busy - come back later */
141                 return 0;
142         }
143
144         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
145         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
146         if (data.dptr == NULL) {
147                 return 0;
148         }
149         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
150                 free(data.dptr);
151                 return 0;
152         }
153
154
155         hdr = (struct ctdb_ltdb_header *)data.dptr;
156         rsn = hdr->rsn;
157
158         /* if we are not the lmaster and the dmaster then skip the record */
159         if (hdr->dmaster != ctdb->pnn ||
160             ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
161                 free(data.dptr);
162                 return 0;
163         }
164
165         rec = ctdb_marshall_record(ctdb, ctdb_db->db_id, key, hdr, tdb_null);
166         free(data.dptr);
167         if (rec == NULL) {
168                 /* try it again later */
169                 return 0;
170         }
171
172         data.dptr = (void *)rec;
173         data.dsize = rec->length;
174
175         if (async_control_on_vnnmap(ctdb, CTDB_CONTROL_DELETE_RECORD, data) != 0) {
176                 /* one or more nodes failed to delete a record - no problem! */
177                 talloc_free(rec);
178                 return 0;
179         }
180
181         talloc_free(rec);
182
183         /* its deleted on all other nodes - refetch, check and delete */
184         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
185                 /* the chain is busy - come back later */
186                 return 0;
187         }
188
189         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
190         if (data.dptr == NULL) {
191                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
192                 return 0;
193         }
194         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
195                 free(data.dptr);
196                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
197                 return 0;
198         }
199
200         hdr = (struct ctdb_ltdb_header *)data.dptr;
201
202         /* if we are not the lmaster and the dmaster then skip the record */
203         if (hdr->dmaster != ctdb->pnn ||
204             ctdb_lmaster(ctdb, &key) != ctdb->pnn ||
205             rsn != hdr->rsn) {
206                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
207                 free(data.dptr);
208                 return 0;
209         }
210
211         tdb_delete(ctdb_db->ltdb->tdb, key);
212         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
213         free(data.dptr);
214
215         (*count)++;
216
217         return 0;
218 }
219
220
221 /*
222   vacuum records for which we are the lmaster 
223  */
224 static int ctdb_vacuum_local(struct ctdb_context *ctdb, struct ctdb_control_pulldb_reply *list, 
225                              struct ctdb_db_context *ctdb_db, uint32_t *count)
226 {
227         struct ctdb_rec_data *r;
228         int i;
229
230         r = (struct ctdb_rec_data *)&list->data[0];
231         
232         for (i=0;
233              i<list->count;
234              r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r), i++) {
235                 TDB_DATA key;
236                 key.dptr = &r->data[0];
237                 key.dsize = r->keylen;
238                 if (ctdb_vacuum_one(ctdb, key, ctdb_db, count) != 0) {
239                         return -1;
240                 }
241         }
242
243         return 0;       
244 }
245
246 /* 
247    a list of records to possibly delete
248  */
249 struct vacuum_data {
250         uint32_t vacuum_limit;
251         struct ctdb_context *ctdb;
252         struct ctdb_control_pulldb_reply **list;
253         bool traverse_error;
254         uint32_t total;
255 };
256
257 /*
258   traverse function for vacuuming
259  */
260 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
261 {
262         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
263         uint32_t lmaster;
264         struct ctdb_ltdb_header *hdr;
265         struct ctdb_rec_data *rec;
266         size_t old_size;
267                
268         lmaster = ctdb_lmaster(vdata->ctdb, &key);
269         if (lmaster >= vdata->ctdb->vnn_map->size) {
270                 return 0;
271         }
272
273         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
274                 /* its not a deleted record */
275                 return 0;
276         }
277
278         hdr = (struct ctdb_ltdb_header *)data.dptr;
279
280         if (hdr->dmaster != vdata->ctdb->pnn) {
281                 return 0;
282         }
283
284
285         /* add the record to the blob ready to send to the nodes */
286         rec = ctdb_marshall_record(vdata->list[lmaster], vdata->ctdb->pnn, key, NULL, tdb_null);
287         if (rec == NULL) {
288                 DEBUG(0,(__location__ " Out of memory\n"));
289                 vdata->traverse_error = true;
290                 return -1;
291         }
292         old_size = talloc_get_size(vdata->list[lmaster]);
293         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
294                                                    old_size + rec->length);
295         if (vdata->list[lmaster] == NULL) {
296                 DEBUG(0,(__location__ " Failed to expand\n"));
297                 vdata->traverse_error = true;
298                 return -1;
299         }
300         vdata->list[lmaster]->count++;
301         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
302         talloc_free(rec);
303
304         vdata->total++;
305
306         /* don't gather too many records */
307         if (vdata->vacuum_limit != 0 &&
308             vdata->total == vdata->vacuum_limit) {
309                 return -1;
310         }
311
312         return 0;
313 }
314
315
316 /* vacuum one database */
317 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
318                           bool persistent, uint32_t vacuum_limit)
319 {
320         struct ctdb_db_context *ctdb_db;
321         const char *name;
322         struct vacuum_data *vdata;
323         int i;
324
325         vdata = talloc_zero(ctdb, struct vacuum_data);
326         if (vdata == NULL) {
327                 DEBUG(0,(__location__ " Out of memory\n"));
328                 return -1;
329         }
330
331         vdata->ctdb = ctdb;
332         vdata->vacuum_limit = vacuum_limit;
333
334         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
335                 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
336                 talloc_free(vdata);
337                 return -1;
338         }
339
340         ctdb_db = ctdb_attach(ctdb, name, persistent);
341         if (ctdb_db == NULL) {
342                 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
343                 talloc_free(vdata);
344                 return -1;
345         }
346
347         /* the list needs to be of length num_nodes */
348         vdata->list = talloc_array(vdata, struct ctdb_control_pulldb_reply *, ctdb->vnn_map->size);
349         if (vdata->list == NULL) {
350                 DEBUG(0,(__location__ " Out of memory\n"));
351                 talloc_free(vdata);
352                 return -1;
353         }
354         for (i=0;i<ctdb->vnn_map->size;i++) {
355                 vdata->list[i] = (struct ctdb_control_pulldb_reply *)
356                         talloc_zero_size(vdata->list, 
357                                     offsetof(struct ctdb_control_pulldb_reply, data));
358                 if (vdata->list[i] == NULL) {
359                         DEBUG(0,(__location__ " Out of memory\n"));
360                         talloc_free(vdata);
361                         return -1;
362                 }
363                 vdata->list[i]->db_id = db_id;
364         }
365
366         /* traverse, looking for records that might be able to be vacuumed */
367         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
368             vdata->traverse_error) {
369                 DEBUG(0,(__location__ " Traverse error in vacuuming '%s'\n", name));
370                 talloc_free(vdata);
371                 return -1;              
372         }
373
374
375         for (i=0;i<ctdb->vnn_map->size;i++) {
376                 if (vdata->list[i]->count == 0) {
377                         continue;
378                 }
379
380                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
381                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
382                         TDB_DATA data;
383                         printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
384
385                         data.dsize = talloc_get_size(vdata->list[i]);
386                         data.dptr  = (void *)vdata->list[i];
387                         if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
388                                 DEBUG(0,(__location__ " Failed to send vacuum fetch message to %u\n",
389                                          ctdb->vnn_map->map[i]));
390                                 talloc_free(vdata);
391                                 return -1;              
392                         }
393                         continue;
394                 }
395         }       
396
397         for (i=0;i<ctdb->vnn_map->size;i++) {
398                 uint32_t count = 0;
399
400                 if (vdata->list[i]->count == 0) {
401                         continue;
402                 }
403
404                 /* for records where we are the lmaster, we can try to delete them */
405                 if (ctdb_vacuum_local(ctdb, vdata->list[i], ctdb_db, &count) != 0) {
406                         DEBUG(0,(__location__ " Deletion error in vacuuming '%s'\n", name));
407                         talloc_free(vdata);
408                         return -1;                                      
409                 }
410                 if (count != 0) {
411                         printf("Deleted %u records on this node from '%s'\n", count, name);
412                 }
413         }       
414
415         /* this ensures we run our event queue */
416         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
417
418         talloc_free(vdata);
419
420         return 0;
421 }
422
423
424 /*
425   vacuum all our databases
426  */
427 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
428 {
429         struct ctdb_dbid_map *dbmap=NULL;
430         struct ctdb_node_map *nodemap=NULL;
431         int ret, i, pnn;
432         uint32_t vacuum_limit = 0;
433
434         if (argc > 0) {
435                 vacuum_limit = atoi(argv[0]);
436         }
437
438         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
439         if (ret != 0) {
440                 DEBUG(0, ("Unable to get dbids from local node\n"));
441                 return ret;
442         }
443
444         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
445         if (ret != 0) {
446                 DEBUG(0, ("Unable to get nodemap from local node\n"));
447                 return ret;
448         }
449
450         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
451         if (ret != 0) {
452                 DEBUG(0, ("Unable to get vnnmap from local node\n"));
453                 return ret;
454         }
455
456         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
457         if (pnn == -1) {
458                 DEBUG(0, ("Unable to get pnn from local node\n"));
459                 return -1;
460         }
461         ctdb->pnn = pnn;
462
463         for (i=0;i<dbmap->num;i++) {
464                 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap, 
465                                    dbmap->dbs[i].persistent, vacuum_limit) != 0) {
466                         DEBUG(0,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
467                         return -1;
468                 }
469         }
470
471         return 0;
472 }
473
474 struct traverse_state {
475         bool error;
476         struct tdb_context *dest_db;
477 };
478
479 /*
480   traverse function for repacking
481  */
482 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
483 {
484         struct traverse_state *state = (struct traverse_state *)private;
485         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
486                 state->error = true;
487                 return -1;
488         }
489         return 0;
490 }
491
492 /*
493   repack a tdb
494  */
495 static int ctdb_repack_tdb(struct tdb_context *tdb)
496 {
497         struct tdb_context *tmp_db;
498         struct traverse_state state;
499
500         if (tdb_transaction_start(tdb) != 0) {
501                 DEBUG(0,(__location__ " Failed to start transaction\n"));
502                 return -1;
503         }
504
505         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
506         if (tmp_db == NULL) {
507                 DEBUG(0,(__location__ " Failed to create tmp_db\n"));
508                 tdb_transaction_cancel(tdb);
509                 return -1;
510         }
511
512         state.error = false;
513         state.dest_db = tmp_db;
514
515         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
516                 DEBUG(0,(__location__ " Failed to traverse copying out\n"));
517                 tdb_transaction_cancel(tdb);
518                 tdb_close(tmp_db);
519                 return -1;              
520         }
521
522         if (state.error) {
523                 DEBUG(0,(__location__ " Error during traversal\n"));
524                 tdb_transaction_cancel(tdb);
525                 tdb_close(tmp_db);
526                 return -1;
527         }
528
529         if (tdb_wipe_all(tdb) != 0) {
530                 DEBUG(0,(__location__ " Failed to wipe database\n"));
531                 tdb_transaction_cancel(tdb);
532                 tdb_close(tmp_db);
533                 return -1;
534         }
535
536         state.error = false;
537         state.dest_db = tdb;
538
539         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
540                 DEBUG(0,(__location__ " Failed to traverse copying back\n"));
541                 tdb_transaction_cancel(tdb);
542                 tdb_close(tmp_db);
543                 return -1;              
544         }
545
546         if (state.error) {
547                 DEBUG(0,(__location__ " Error during second traversal\n"));
548                 tdb_transaction_cancel(tdb);
549                 tdb_close(tmp_db);
550                 return -1;
551         }
552
553         tdb_close(tmp_db);
554
555         if (tdb_transaction_commit(tdb) != 0) {
556                 DEBUG(0,(__location__ " Failed to commit\n"));
557                 return -1;
558         }
559
560         return 0;
561 }
562
563
564 /* repack one database */
565 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id, 
566                           bool persistent, uint32_t repack_limit)
567 {
568         struct ctdb_db_context *ctdb_db;
569         const char *name;
570         int size;
571
572         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
573                 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
574                 return -1;
575         }
576
577         ctdb_db = ctdb_attach(ctdb, name, persistent);
578         if (ctdb_db == NULL) {
579                 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
580                 return -1;
581         }
582
583         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
584         if (size == -1) {
585                 DEBUG(0,(__location__ " Failed to get freelist size for '%s'\n", name));
586                 return -1;
587         }
588
589         if (size <= repack_limit) {
590                 return 0;
591         }
592
593         printf("Repacking %s with %u freelist entries\n", name, size);
594
595         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
596                 DEBUG(0,(__location__ " Failed to repack '%s'\n", name));
597                 return -1;
598         }
599
600         return 0;
601 }
602
603
604 /*
605   repack all our databases
606  */
607 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
608 {
609         struct ctdb_dbid_map *dbmap=NULL;
610         int ret, i;
611         /* a reasonable default limit to prevent us using too much memory */
612         uint32_t repack_limit = 10000; 
613
614         if (argc > 0) {
615                 repack_limit = atoi(argv[0]);
616         }
617
618         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
619         if (ret != 0) {
620                 DEBUG(0, ("Unable to get dbids from local node\n"));
621                 return ret;
622         }
623
624         for (i=0;i<dbmap->num;i++) {
625                 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid, 
626                                    dbmap->dbs[i].persistent, repack_limit) != 0) {
627                         DEBUG(0,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
628                         return -1;
629                 }
630         }
631
632         return 0;
633 }