block alarm signals during critical sections of vacuum
[amitay/samba.git] / ctdb / tools / ctdb_vacuum.c
1 /* 
2    ctdb control tool - database vacuum 
3
4    Copyright (C) Andrew Tridgell  2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb.h"
25 #include "../include/ctdb_private.h"
26 #include "db_wrap.h"
27
28 /* should be tunable */
29 #define TIMELIMIT() timeval_current_ofs(10, 0)
30
31 struct async_data {
32         uint32_t count;
33         uint32_t fail_count;
34 };
35
36 static void async_callback(struct ctdb_client_control_state *state)
37 {
38         struct async_data *data = talloc_get_type(state->async.private_data, struct async_data);
39         int ret;
40         int32_t res;
41
42         /* one more node has responded with recmode data */
43         data->count--;
44
45         /* if we failed to push the db, then return an error and let
46            the main loop try again.
47         */
48         if (state->state != CTDB_CONTROL_DONE) {
49                 data->fail_count++;
50                 return;
51         }
52         
53         state->async.fn = NULL;
54
55         ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
56         if ((ret != 0) || (res != 0)) {
57                 data->fail_count++;
58         }
59 }
60
61 static void async_add(struct async_data *data, struct ctdb_client_control_state *state)
62 {
63         /* set up the callback functions */
64         state->async.fn = async_callback;
65         state->async.private_data = data;
66         
67         /* one more control to wait for to complete */
68         data->count++;
69 }
70
71
72 /* wait for up to the maximum number of seconds allowed
73    or until all nodes we expect a response from has replied
74 */
75 static int async_wait(struct ctdb_context *ctdb, struct async_data *data)
76 {
77         while (data->count > 0) {
78                 event_loop_once(ctdb->ev);
79         }
80         if (data->fail_count != 0) {
81                 return -1;
82         }
83         return 0;
84 }
85
86 /* 
87    perform a simple control on nodes in the vnn map except ourselves.
88    The control cannot return data
89  */
90 static int async_control_on_vnnmap(struct ctdb_context *ctdb, enum ctdb_controls opcode,
91                                    TDB_DATA data)
92 {
93         struct async_data *async_data;
94         struct ctdb_client_control_state *state;
95         int j;
96         struct timeval timeout = TIMELIMIT();
97         
98         async_data = talloc_zero(ctdb, struct async_data);
99         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
100
101         /* loop over all active nodes and send an async control to each of them */
102         for (j=0; j<ctdb->vnn_map->size; j++) {
103                 uint32_t pnn = ctdb->vnn_map->map[j];
104                 if (pnn == ctdb->pnn) {
105                         continue;
106                 }
107                 state = ctdb_control_send(ctdb, pnn, 0, opcode, 
108                                           0, data, async_data, NULL, &timeout, NULL);
109                 if (state == NULL) {
110                         DEBUG(0,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
111                         talloc_free(async_data);
112                         return -1;
113                 }
114                 
115                 async_add(async_data, state);
116         }
117
118         if (async_wait(ctdb, async_data) != 0) {
119                 talloc_free(async_data);
120                 return -1;
121         }
122
123         talloc_free(async_data);
124         return 0;
125 }
126
127
128 /*
129   vacuum one record
130  */
131 static int ctdb_vacuum_one(struct ctdb_context *ctdb, TDB_DATA key, 
132                            struct ctdb_db_context *ctdb_db, uint32_t *count)
133 {
134         TDB_DATA data;
135         struct ctdb_ltdb_header *hdr;
136         struct ctdb_rec_data *rec;
137         uint64_t rsn;
138
139         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
140                 /* the chain is busy - come back later */
141                 return 0;
142         }
143
144         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
145         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
146         if (data.dptr == NULL) {
147                 return 0;
148         }
149         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
150                 free(data.dptr);
151                 return 0;
152         }
153
154
155         hdr = (struct ctdb_ltdb_header *)data.dptr;
156         rsn = hdr->rsn;
157
158         /* if we are not the lmaster and the dmaster then skip the record */
159         if (hdr->dmaster != ctdb->pnn ||
160             ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
161                 free(data.dptr);
162                 return 0;
163         }
164
165         rec = ctdb_marshall_record(ctdb, ctdb_db->db_id, key, hdr, tdb_null);
166         free(data.dptr);
167         if (rec == NULL) {
168                 /* try it again later */
169                 return 0;
170         }
171
172         data.dptr = (void *)rec;
173         data.dsize = rec->length;
174
175         if (async_control_on_vnnmap(ctdb, CTDB_CONTROL_DELETE_RECORD, data) != 0) {
176                 /* one or more nodes failed to delete a record - no problem! */
177                 talloc_free(rec);
178                 return 0;
179         }
180
181         talloc_free(rec);
182
183         /* its deleted on all other nodes - refetch, check and delete */
184         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
185                 /* the chain is busy - come back later */
186                 return 0;
187         }
188
189         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
190         if (data.dptr == NULL) {
191                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
192                 return 0;
193         }
194         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
195                 free(data.dptr);
196                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
197                 return 0;
198         }
199
200         hdr = (struct ctdb_ltdb_header *)data.dptr;
201
202         /* if we are not the lmaster and the dmaster then skip the record */
203         if (hdr->dmaster != ctdb->pnn ||
204             ctdb_lmaster(ctdb, &key) != ctdb->pnn ||
205             rsn != hdr->rsn) {
206                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
207                 free(data.dptr);
208                 return 0;
209         }
210
211         ctdb_block_signal(SIGALRM);
212         tdb_delete(ctdb_db->ltdb->tdb, key);
213         ctdb_unblock_signal(SIGALRM);
214         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
215         free(data.dptr);
216
217         (*count)++;
218
219         return 0;
220 }
221
222
223 /*
224   vacuum records for which we are the lmaster 
225  */
226 static int ctdb_vacuum_local(struct ctdb_context *ctdb, struct ctdb_control_pulldb_reply *list, 
227                              struct ctdb_db_context *ctdb_db, uint32_t *count)
228 {
229         struct ctdb_rec_data *r;
230         int i;
231
232         r = (struct ctdb_rec_data *)&list->data[0];
233         
234         for (i=0;
235              i<list->count;
236              r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r), i++) {
237                 TDB_DATA key;
238                 key.dptr = &r->data[0];
239                 key.dsize = r->keylen;
240                 if (ctdb_vacuum_one(ctdb, key, ctdb_db, count) != 0) {
241                         return -1;
242                 }
243         }
244
245         return 0;       
246 }
247
248 /* 
249    a list of records to possibly delete
250  */
251 struct vacuum_data {
252         uint32_t vacuum_limit;
253         struct ctdb_context *ctdb;
254         struct ctdb_control_pulldb_reply **list;
255         bool traverse_error;
256         uint32_t total;
257 };
258
259 /*
260   traverse function for vacuuming
261  */
262 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
263 {
264         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
265         uint32_t lmaster;
266         struct ctdb_ltdb_header *hdr;
267         struct ctdb_rec_data *rec;
268         size_t old_size;
269                
270         lmaster = ctdb_lmaster(vdata->ctdb, &key);
271         if (lmaster >= vdata->ctdb->vnn_map->size) {
272                 return 0;
273         }
274
275         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
276                 /* its not a deleted record */
277                 return 0;
278         }
279
280         hdr = (struct ctdb_ltdb_header *)data.dptr;
281
282         if (hdr->dmaster != vdata->ctdb->pnn) {
283                 return 0;
284         }
285
286
287         /* add the record to the blob ready to send to the nodes */
288         rec = ctdb_marshall_record(vdata->list[lmaster], vdata->ctdb->pnn, key, NULL, tdb_null);
289         if (rec == NULL) {
290                 DEBUG(0,(__location__ " Out of memory\n"));
291                 vdata->traverse_error = true;
292                 return -1;
293         }
294         old_size = talloc_get_size(vdata->list[lmaster]);
295         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
296                                                    old_size + rec->length);
297         if (vdata->list[lmaster] == NULL) {
298                 DEBUG(0,(__location__ " Failed to expand\n"));
299                 vdata->traverse_error = true;
300                 return -1;
301         }
302         vdata->list[lmaster]->count++;
303         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
304         talloc_free(rec);
305
306         vdata->total++;
307
308         /* don't gather too many records */
309         if (vdata->vacuum_limit != 0 &&
310             vdata->total == vdata->vacuum_limit) {
311                 return -1;
312         }
313
314         return 0;
315 }
316
317
318 /* vacuum one database */
319 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
320                           bool persistent, uint32_t vacuum_limit)
321 {
322         struct ctdb_db_context *ctdb_db;
323         const char *name;
324         struct vacuum_data *vdata;
325         int i;
326
327         vdata = talloc_zero(ctdb, struct vacuum_data);
328         if (vdata == NULL) {
329                 DEBUG(0,(__location__ " Out of memory\n"));
330                 return -1;
331         }
332
333         vdata->ctdb = ctdb;
334         vdata->vacuum_limit = vacuum_limit;
335
336         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
337                 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
338                 talloc_free(vdata);
339                 return -1;
340         }
341
342         ctdb_db = ctdb_attach(ctdb, name, persistent);
343         if (ctdb_db == NULL) {
344                 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
345                 talloc_free(vdata);
346                 return -1;
347         }
348
349         /* the list needs to be of length num_nodes */
350         vdata->list = talloc_array(vdata, struct ctdb_control_pulldb_reply *, ctdb->vnn_map->size);
351         if (vdata->list == NULL) {
352                 DEBUG(0,(__location__ " Out of memory\n"));
353                 talloc_free(vdata);
354                 return -1;
355         }
356         for (i=0;i<ctdb->vnn_map->size;i++) {
357                 vdata->list[i] = (struct ctdb_control_pulldb_reply *)
358                         talloc_zero_size(vdata->list, 
359                                     offsetof(struct ctdb_control_pulldb_reply, data));
360                 if (vdata->list[i] == NULL) {
361                         DEBUG(0,(__location__ " Out of memory\n"));
362                         talloc_free(vdata);
363                         return -1;
364                 }
365                 vdata->list[i]->db_id = db_id;
366         }
367
368         /* traverse, looking for records that might be able to be vacuumed */
369         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
370             vdata->traverse_error) {
371                 DEBUG(0,(__location__ " Traverse error in vacuuming '%s'\n", name));
372                 talloc_free(vdata);
373                 return -1;              
374         }
375
376
377         for (i=0;i<ctdb->vnn_map->size;i++) {
378                 if (vdata->list[i]->count == 0) {
379                         continue;
380                 }
381
382                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
383                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
384                         TDB_DATA data;
385                         printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
386
387                         data.dsize = talloc_get_size(vdata->list[i]);
388                         data.dptr  = (void *)vdata->list[i];
389                         if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
390                                 DEBUG(0,(__location__ " Failed to send vacuum fetch message to %u\n",
391                                          ctdb->vnn_map->map[i]));
392                                 talloc_free(vdata);
393                                 return -1;              
394                         }
395                         continue;
396                 }
397         }       
398
399         for (i=0;i<ctdb->vnn_map->size;i++) {
400                 uint32_t count = 0;
401
402                 if (vdata->list[i]->count == 0) {
403                         continue;
404                 }
405
406                 /* for records where we are the lmaster, we can try to delete them */
407                 if (ctdb_vacuum_local(ctdb, vdata->list[i], ctdb_db, &count) != 0) {
408                         DEBUG(0,(__location__ " Deletion error in vacuuming '%s'\n", name));
409                         talloc_free(vdata);
410                         return -1;                                      
411                 }
412                 if (count != 0) {
413                         printf("Deleted %u records on this node from '%s'\n", count, name);
414                 }
415         }       
416
417         /* this ensures we run our event queue */
418         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
419
420         talloc_free(vdata);
421
422         return 0;
423 }
424
425
426 /*
427   vacuum all our databases
428  */
429 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
430 {
431         struct ctdb_dbid_map *dbmap=NULL;
432         struct ctdb_node_map *nodemap=NULL;
433         int ret, i, pnn;
434         uint32_t vacuum_limit = 0;
435
436         if (argc > 0) {
437                 vacuum_limit = atoi(argv[0]);
438         }
439
440         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
441         if (ret != 0) {
442                 DEBUG(0, ("Unable to get dbids from local node\n"));
443                 return ret;
444         }
445
446         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
447         if (ret != 0) {
448                 DEBUG(0, ("Unable to get nodemap from local node\n"));
449                 return ret;
450         }
451
452         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
453         if (ret != 0) {
454                 DEBUG(0, ("Unable to get vnnmap from local node\n"));
455                 return ret;
456         }
457
458         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
459         if (pnn == -1) {
460                 DEBUG(0, ("Unable to get pnn from local node\n"));
461                 return -1;
462         }
463         ctdb->pnn = pnn;
464
465         for (i=0;i<dbmap->num;i++) {
466                 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap, 
467                                    dbmap->dbs[i].persistent, vacuum_limit) != 0) {
468                         DEBUG(0,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
469                         return -1;
470                 }
471         }
472
473         return 0;
474 }
475
476 struct traverse_state {
477         bool error;
478         struct tdb_context *dest_db;
479 };
480
481 /*
482   traverse function for repacking
483  */
484 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
485 {
486         struct traverse_state *state = (struct traverse_state *)private;
487         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
488                 state->error = true;
489                 return -1;
490         }
491         return 0;
492 }
493
494 /*
495   repack a tdb
496  */
497 static int ctdb_repack_tdb(struct tdb_context *tdb)
498 {
499         struct tdb_context *tmp_db;
500         struct traverse_state state;
501
502         if (tdb_transaction_start(tdb) != 0) {
503                 DEBUG(0,(__location__ " Failed to start transaction\n"));
504                 return -1;
505         }
506
507         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
508         if (tmp_db == NULL) {
509                 DEBUG(0,(__location__ " Failed to create tmp_db\n"));
510                 tdb_transaction_cancel(tdb);
511                 return -1;
512         }
513
514         state.error = false;
515         state.dest_db = tmp_db;
516
517         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
518                 DEBUG(0,(__location__ " Failed to traverse copying out\n"));
519                 tdb_transaction_cancel(tdb);
520                 tdb_close(tmp_db);
521                 return -1;              
522         }
523
524         if (state.error) {
525                 DEBUG(0,(__location__ " Error during traversal\n"));
526                 tdb_transaction_cancel(tdb);
527                 tdb_close(tmp_db);
528                 return -1;
529         }
530
531         if (tdb_wipe_all(tdb) != 0) {
532                 DEBUG(0,(__location__ " Failed to wipe database\n"));
533                 tdb_transaction_cancel(tdb);
534                 tdb_close(tmp_db);
535                 return -1;
536         }
537
538         state.error = false;
539         state.dest_db = tdb;
540
541         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
542                 DEBUG(0,(__location__ " Failed to traverse copying back\n"));
543                 tdb_transaction_cancel(tdb);
544                 tdb_close(tmp_db);
545                 return -1;              
546         }
547
548         if (state.error) {
549                 DEBUG(0,(__location__ " Error during second traversal\n"));
550                 tdb_transaction_cancel(tdb);
551                 tdb_close(tmp_db);
552                 return -1;
553         }
554
555         tdb_close(tmp_db);
556
557         if (tdb_transaction_commit(tdb) != 0) {
558                 DEBUG(0,(__location__ " Failed to commit\n"));
559                 return -1;
560         }
561
562         return 0;
563 }
564
565
566 /* repack one database */
567 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id, 
568                           bool persistent, uint32_t repack_limit)
569 {
570         struct ctdb_db_context *ctdb_db;
571         const char *name;
572         int size;
573
574         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
575                 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
576                 return -1;
577         }
578
579         ctdb_db = ctdb_attach(ctdb, name, persistent);
580         if (ctdb_db == NULL) {
581                 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
582                 return -1;
583         }
584
585         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
586         if (size == -1) {
587                 DEBUG(0,(__location__ " Failed to get freelist size for '%s'\n", name));
588                 return -1;
589         }
590
591         if (size <= repack_limit) {
592                 return 0;
593         }
594
595         printf("Repacking %s with %u freelist entries\n", name, size);
596
597         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
598                 DEBUG(0,(__location__ " Failed to repack '%s'\n", name));
599                 return -1;
600         }
601
602         return 0;
603 }
604
605
606 /*
607   repack all our databases
608  */
609 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
610 {
611         struct ctdb_dbid_map *dbmap=NULL;
612         int ret, i;
613         /* a reasonable default limit to prevent us using too much memory */
614         uint32_t repack_limit = 10000; 
615
616         if (argc > 0) {
617                 repack_limit = atoi(argv[0]);
618         }
619
620         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
621         if (ret != 0) {
622                 DEBUG(0, ("Unable to get dbids from local node\n"));
623                 return ret;
624         }
625
626         for (i=0;i<dbmap->num;i++) {
627                 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid, 
628                                    dbmap->dbs[i].persistent, repack_limit) != 0) {
629                         DEBUG(0,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
630                         return -1;
631                 }
632         }
633
634         return 0;
635 }