8fa5d79ac0ba5be8face143b05ea79c5dc5e3a83
[vlendec/samba-autobuild/.git] / ctdb / tools / ctdb_vacuum.c
1 /* 
2    ctdb control tool - database vacuum 
3
4    Copyright (C) Andrew Tridgell  2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb.h"
25 #include "../include/ctdb_private.h"
26 #include "db_wrap.h"
27
28 /* should be tunable */
29 #define TIMELIMIT() timeval_current_ofs(10, 0)
30
31 struct async_data {
32         uint32_t count;
33         uint32_t fail_count;
34 };
35
36 static void async_callback(struct ctdb_client_control_state *state)
37 {
38         struct async_data *data = talloc_get_type(state->async.private_data, struct async_data);
39         int ret;
40         int32_t res;
41
42         /* one more node has responded with recmode data */
43         data->count--;
44
45         /* if we failed to push the db, then return an error and let
46            the main loop try again.
47         */
48         if (state->state != CTDB_CONTROL_DONE) {
49                 data->fail_count++;
50                 return;
51         }
52         
53         state->async.fn = NULL;
54
55         ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
56         if ((ret != 0) || (res != 0)) {
57                 data->fail_count++;
58         }
59 }
60
61 static void async_add(struct async_data *data, struct ctdb_client_control_state *state)
62 {
63         /* set up the callback functions */
64         state->async.fn = async_callback;
65         state->async.private_data = data;
66         
67         /* one more control to wait for to complete */
68         data->count++;
69 }
70
71
72 /* wait for up to the maximum number of seconds allowed
73    or until all nodes we expect a response from has replied
74 */
75 static int async_wait(struct ctdb_context *ctdb, struct async_data *data)
76 {
77         while (data->count > 0) {
78                 event_loop_once(ctdb->ev);
79         }
80         if (data->fail_count != 0) {
81                 DEBUG(0,("Async wait failed - fail_count=%u\n", data->fail_count));
82                 return -1;
83         }
84         return 0;
85 }
86
87 /* 
88    perform a simple control on nodes in the vnn map except ourselves.
89    The control cannot return data
90  */
91 static int async_control_on_vnnmap(struct ctdb_context *ctdb, enum ctdb_controls opcode,
92                                    TDB_DATA data)
93 {
94         struct async_data *async_data;
95         struct ctdb_client_control_state *state;
96         int j;
97         struct timeval timeout = TIMELIMIT();
98         
99         async_data = talloc_zero(ctdb, struct async_data);
100         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
101
102         /* loop over all active nodes and send an async control to each of them */
103         for (j=0; j<ctdb->vnn_map->size; j++) {
104                 uint32_t pnn = ctdb->vnn_map->map[j];
105                 if (pnn == ctdb->pnn) {
106                         continue;
107                 }
108                 state = ctdb_control_send(ctdb, pnn, 0, opcode, 
109                                           0, data, async_data, NULL, &timeout, NULL);
110                 if (state == NULL) {
111                         DEBUG(0,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
112                         talloc_free(async_data);
113                         return -1;
114                 }
115                 
116                 async_add(async_data, state);
117         }
118
119         if (async_wait(ctdb, async_data) != 0) {
120                 talloc_free(async_data);
121                 return -1;
122         }
123
124         talloc_free(async_data);
125         return 0;
126 }
127
128
129 /*
130   vacuum one record
131  */
132 static int ctdb_vacuum_one(struct ctdb_context *ctdb, TDB_DATA key, struct ctdb_db_context *ctdb_db)
133 {
134         TDB_DATA data;
135         struct ctdb_ltdb_header *hdr;
136         struct ctdb_rec_data *rec;
137         uint64_t rsn;
138
139         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
140                 /* the chain is busy - come back later */
141                 return 0;
142         }
143
144         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
145         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
146         if (data.dptr == NULL) {
147                 return 0;
148         }
149         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
150                 free(data.dptr);
151                 return 0;
152         }
153
154
155         hdr = (struct ctdb_ltdb_header *)data.dptr;
156         rsn = hdr->rsn;
157
158         /* if we are not the lmaster and the dmaster then skip the record */
159         if (hdr->dmaster != ctdb->pnn ||
160             ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
161                 free(data.dptr);
162                 return 0;
163         }
164
165         rec = ctdb_marshall_record(ctdb, ctdb_db->db_id, key, hdr, tdb_null);
166         free(data.dptr);
167         if (rec == NULL) {
168                 /* try it again later */
169                 return 0;
170         }
171
172         data.dptr = (void *)rec;
173         data.dsize = rec->length;
174
175         if (async_control_on_vnnmap(ctdb, CTDB_CONTROL_DELETE_RECORD, data) != 0) {
176                 /* one or more nodes failed to delete a record - no problem! */
177                 talloc_free(rec);
178                 return 0;
179         }
180
181         talloc_free(rec);
182
183         /* its deleted on all other nodes - refetch, check and delete */
184         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
185                 /* the chain is busy - come back later */
186                 return 0;
187         }
188
189         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
190         if (data.dptr == NULL) {
191                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
192                 return 0;
193         }
194         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
195                 free(data.dptr);
196                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
197                 return 0;
198         }
199
200         hdr = (struct ctdb_ltdb_header *)data.dptr;
201
202         /* if we are not the lmaster and the dmaster then skip the record */
203         if (hdr->dmaster != ctdb->pnn ||
204             ctdb_lmaster(ctdb, &key) != ctdb->pnn ||
205             rsn != hdr->rsn) {
206                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
207                 free(data.dptr);
208                 return 0;
209         }
210
211         tdb_delete(ctdb_db->ltdb->tdb, key);
212         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
213         free(data.dptr);
214
215         return 0;
216 }
217
218
219 /*
220   vacuum records for which we are the lmaster 
221  */
222 static int ctdb_vacuum_local(struct ctdb_context *ctdb, struct ctdb_control_pulldb_reply *list, 
223                              struct ctdb_db_context *ctdb_db)
224 {
225         struct ctdb_rec_data *r;
226         int i;
227
228         r = (struct ctdb_rec_data *)&list->data[0];
229         
230         for (i=0;
231              i<list->count;
232              r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r), i++) {
233                 TDB_DATA key;
234                 key.dptr = &r->data[0];
235                 key.dsize = r->keylen;
236                 if (ctdb_vacuum_one(ctdb, key, ctdb_db) != 0) {
237                         return -1;
238                 }
239         }
240
241         return 0;       
242 }
243
244 /* 
245    a list of records to possibly delete
246  */
247 struct vacuum_data {
248         uint32_t vacuum_limit;
249         struct ctdb_context *ctdb;
250         struct ctdb_control_pulldb_reply **list;
251         bool traverse_error;
252         uint32_t total;
253 };
254
255 /*
256   traverse function for vacuuming
257  */
258 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
259 {
260         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
261         uint32_t lmaster;
262         struct ctdb_ltdb_header *hdr;
263         struct ctdb_rec_data *rec;
264         size_t old_size;
265                
266         lmaster = ctdb_lmaster(vdata->ctdb, &key);
267         if (lmaster >= vdata->ctdb->vnn_map->size) {
268                 return 0;
269         }
270
271         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
272                 /* its not a deleted record */
273                 return 0;
274         }
275
276         hdr = (struct ctdb_ltdb_header *)data.dptr;
277
278         if (hdr->dmaster != vdata->ctdb->pnn) {
279                 return 0;
280         }
281
282
283         /* add the record to the blob ready to send to the nodes */
284         rec = ctdb_marshall_record(vdata->list[lmaster], vdata->ctdb->pnn, key, NULL, tdb_null);
285         if (rec == NULL) {
286                 DEBUG(0,(__location__ " Out of memory\n"));
287                 vdata->traverse_error = true;
288                 return -1;
289         }
290         old_size = talloc_get_size(vdata->list[lmaster]);
291         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
292                                                    old_size + rec->length);
293         if (vdata->list[lmaster] == NULL) {
294                 DEBUG(0,(__location__ " Failed to expand\n"));
295                 vdata->traverse_error = true;
296                 return -1;
297         }
298         vdata->list[lmaster]->count++;
299         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
300         talloc_free(rec);
301
302         vdata->total++;
303
304         /* don't gather too many records */
305         if (vdata->vacuum_limit != 0 &&
306             vdata->total == vdata->vacuum_limit) {
307                 return -1;
308         }
309
310         return 0;
311 }
312
313
314 /* vacuum one database */
315 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
316                           bool persistent, uint32_t vacuum_limit)
317 {
318         struct ctdb_db_context *ctdb_db;
319         const char *name;
320         struct vacuum_data *vdata;
321         int i;
322
323         vdata = talloc_zero(ctdb, struct vacuum_data);
324         if (vdata == NULL) {
325                 DEBUG(0,(__location__ " Out of memory\n"));
326                 return -1;
327         }
328
329         vdata->ctdb = ctdb;
330         vdata->vacuum_limit = vacuum_limit;
331
332         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
333                 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
334                 talloc_free(vdata);
335                 return -1;
336         }
337
338         ctdb_db = ctdb_attach(ctdb, name, persistent);
339         if (ctdb_db == NULL) {
340                 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
341                 talloc_free(vdata);
342                 return -1;
343         }
344
345         /* the list needs to be of length num_nodes */
346         vdata->list = talloc_array(vdata, struct ctdb_control_pulldb_reply *, ctdb->vnn_map->size);
347         if (vdata->list == NULL) {
348                 DEBUG(0,(__location__ " Out of memory\n"));
349                 talloc_free(vdata);
350                 return -1;
351         }
352         for (i=0;i<ctdb->vnn_map->size;i++) {
353                 vdata->list[i] = (struct ctdb_control_pulldb_reply *)
354                         talloc_zero_size(vdata->list, 
355                                     offsetof(struct ctdb_control_pulldb_reply, data));
356                 if (vdata->list[i] == NULL) {
357                         DEBUG(0,(__location__ " Out of memory\n"));
358                         talloc_free(vdata);
359                         return -1;
360                 }
361                 vdata->list[i]->db_id = db_id;
362         }
363
364         /* traverse, looking for records that might be able to be vacuumed */
365         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
366             vdata->traverse_error) {
367                 DEBUG(0,(__location__ " Traverse error in vacuuming '%s'\n", name));
368                 talloc_free(vdata);
369                 return -1;              
370         }
371
372
373         for (i=0;i<ctdb->vnn_map->size;i++) {
374                 if (vdata->list[i]->count == 0) {
375                         continue;
376                 }
377
378                 printf("Found %u records for lmaster %u\n", vdata->list[i]->count, i);          
379
380                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
381                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
382                         TDB_DATA data;
383                         data.dsize = talloc_get_size(vdata->list[i]);
384                         data.dptr  = (void *)vdata->list[i];
385                         if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
386                                 DEBUG(0,(__location__ " Failed to send vacuum fetch message to %u\n",
387                                          ctdb->vnn_map->map[i]));
388                                 talloc_free(vdata);
389                                 return -1;              
390                         }
391                         continue;
392                 }
393
394                 /* for records where we are the lmaster, we can try to delete them */
395                 if (ctdb_vacuum_local(ctdb, vdata->list[i], ctdb_db) != 0) {
396                         DEBUG(0,(__location__ " Deletion error in vacuuming '%s'\n", name));
397                         talloc_free(vdata);
398                         return -1;                                      
399                 }
400         }       
401
402         /* this ensures we run our event queue */
403         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
404
405         talloc_free(vdata);
406
407         return 0;
408 }
409
410
411 /*
412   vacuum all our databases
413  */
414 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
415 {
416         struct ctdb_dbid_map *dbmap=NULL;
417         struct ctdb_node_map *nodemap=NULL;
418         int ret, i, pnn;
419         uint32_t vacuum_limit = 0;
420
421         if (argc > 0) {
422                 vacuum_limit = atoi(argv[0]);
423         }
424
425         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
426         if (ret != 0) {
427                 DEBUG(0, ("Unable to get dbids from local node\n"));
428                 return ret;
429         }
430
431         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
432         if (ret != 0) {
433                 DEBUG(0, ("Unable to get nodemap from local node\n"));
434                 return ret;
435         }
436
437         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
438         if (ret != 0) {
439                 DEBUG(0, ("Unable to get vnnmap from local node\n"));
440                 return ret;
441         }
442
443         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
444         if (pnn == -1) {
445                 DEBUG(0, ("Unable to get pnn from local node\n"));
446                 return -1;
447         }
448         ctdb->pnn = pnn;
449
450         for (i=0;i<dbmap->num;i++) {
451                 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap, 
452                                    dbmap->dbs[i].persistent, vacuum_limit) != 0) {
453                         DEBUG(0,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
454                         return -1;
455                 }
456         }
457
458         return 0;
459 }
460
461 struct traverse_state {
462         bool error;
463         struct tdb_context *dest_db;
464 };
465
466 /*
467   traverse function for repacking
468  */
469 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
470 {
471         struct traverse_state *state = (struct traverse_state *)private;
472         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
473                 state->error = true;
474                 return -1;
475         }
476         return 0;
477 }
478
479 /*
480   repack a tdb
481  */
482 static int ctdb_repack_tdb(struct tdb_context *tdb)
483 {
484         struct tdb_context *tmp_db;
485         struct traverse_state state;
486
487         if (tdb_transaction_start(tdb) != 0) {
488                 DEBUG(0,(__location__ " Failed to start transaction\n"));
489                 return -1;
490         }
491
492         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
493         if (tmp_db == NULL) {
494                 DEBUG(0,(__location__ " Failed to create tmp_db\n"));
495                 tdb_transaction_cancel(tdb);
496                 return -1;
497         }
498
499         state.error = false;
500         state.dest_db = tmp_db;
501
502         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
503                 DEBUG(0,(__location__ " Failed to traverse copying out\n"));
504                 tdb_transaction_cancel(tdb);
505                 tdb_close(tmp_db);
506                 return -1;              
507         }
508
509         if (state.error) {
510                 DEBUG(0,(__location__ " Error during traversal\n"));
511                 tdb_transaction_cancel(tdb);
512                 tdb_close(tmp_db);
513                 return -1;
514         }
515
516         if (tdb_wipe_all(tdb) != 0) {
517                 DEBUG(0,(__location__ " Failed to wipe database\n"));
518                 tdb_transaction_cancel(tdb);
519                 tdb_close(tmp_db);
520                 return -1;
521         }
522
523         state.error = false;
524         state.dest_db = tdb;
525
526         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
527                 DEBUG(0,(__location__ " Failed to traverse copying back\n"));
528                 tdb_transaction_cancel(tdb);
529                 tdb_close(tmp_db);
530                 return -1;              
531         }
532
533         if (state.error) {
534                 DEBUG(0,(__location__ " Error during second traversal\n"));
535                 tdb_transaction_cancel(tdb);
536                 tdb_close(tmp_db);
537                 return -1;
538         }
539
540         tdb_close(tmp_db);
541
542         if (tdb_transaction_commit(tdb) != 0) {
543                 DEBUG(0,(__location__ " Failed to commit\n"));
544                 return -1;
545         }
546
547         return 0;
548 }
549
550
551 /* repack one database */
552 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id, 
553                           bool persistent, uint32_t repack_limit)
554 {
555         struct ctdb_db_context *ctdb_db;
556         const char *name;
557         int size;
558
559         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
560                 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
561                 return -1;
562         }
563
564         ctdb_db = ctdb_attach(ctdb, name, persistent);
565         if (ctdb_db == NULL) {
566                 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
567                 return -1;
568         }
569
570         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
571         if (size == -1) {
572                 DEBUG(0,(__location__ " Failed to get freelist size for '%s'\n", name));
573                 return -1;
574         }
575
576         if (size <= repack_limit) {
577                 return 0;
578         }
579
580         DEBUG(0,("Repacking %s with %u freelist entries\n", name, size));
581
582         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
583                 DEBUG(0,(__location__ " Failed to repack '%s'\n", name));
584                 return -1;
585         }
586
587         return 0;
588 }
589
590
591 /*
592   repack all our databases
593  */
594 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
595 {
596         struct ctdb_dbid_map *dbmap=NULL;
597         int ret, i;
598         uint32_t repack_limit = 100;
599
600         if (argc > 0) {
601                 repack_limit = atoi(argv[0]);
602         }
603
604         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
605         if (ret != 0) {
606                 DEBUG(0, ("Unable to get dbids from local node\n"));
607                 return ret;
608         }
609
610         for (i=0;i<dbmap->num;i++) {
611                 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid, 
612                                    dbmap->dbs[i].persistent, repack_limit) != 0) {
613                         DEBUG(0,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
614                         return -1;
615                 }
616         }
617
618         return 0;
619 }