vacuum: reduce indentation in add_record_to_delete_tree()
[garming/samba-autobuild/.git] / ctdb / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/tevent/tevent.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
35
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
37
38 struct ctdb_vacuum_child_context {
39         struct ctdb_vacuum_child_context *next, *prev;
40         struct ctdb_vacuum_handle *vacuum_handle;
41         /* fd child writes status to */
42         int fd[2];
43         pid_t child_pid;
44         enum vacuum_child_status status;
45         struct timeval start_time;
46 };
47
48 struct ctdb_vacuum_handle {
49         struct ctdb_db_context *ctdb_db;
50         struct ctdb_vacuum_child_context *child_ctx;
51 };
52
53
54 /*  a list of records to possibly delete */
55 struct vacuum_data {
56         uint32_t vacuum_limit;
57         uint32_t repack_limit;
58         struct ctdb_context *ctdb;
59         struct ctdb_db_context *ctdb_db;
60         struct tdb_context *dest_db;
61         trbt_tree_t *delete_tree;
62         uint32_t delete_count;
63         struct ctdb_marshall_buffer **list;
64         struct timeval start;
65         bool traverse_error;
66         bool vacuum;
67         uint32_t total;
68         uint32_t vacuumed;
69         uint32_t copied;
70 };
71
72 /* tuning information stored for every db */
73 struct vacuum_tuning_data {
74         uint32_t last_num_repack;
75         uint32_t last_num_empty;
76         uint32_t last_interval;
77         uint32_t new_interval;
78         struct timeval last_start;
79         double   last_duration;
80 };
81
82 /* this structure contains the information for one record to be deleted */
83 struct delete_record_data {
84         struct ctdb_context *ctdb;
85         struct ctdb_db_context *ctdb_db;
86         struct ctdb_ltdb_header hdr;
87         TDB_DATA key;
88 };
89
90 struct delete_records_list {
91         struct ctdb_marshall_buffer *records;
92 };
93
94
95 static int add_record_to_delete_tree(struct vacuum_data *vdata, TDB_DATA key,
96                                      struct ctdb_ltdb_header *hdr)
97 {
98         struct ctdb_context *ctdb = vdata->ctdb;
99         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
100         uint32_t hash;
101         struct delete_record_data *dd;
102
103         hash = ctdb_hash(&key);
104
105         if (trbt_lookup32(vdata->delete_tree, hash)) {
106                 DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
107                 return 0;
108         }
109
110         /* store key and header indexed by the key hash */
111         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
112         if (dd == NULL) {
113                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
114                 return -1;
115         }
116         dd->ctdb      = ctdb;
117         dd->ctdb_db   = ctdb_db;
118         dd->key.dsize = key.dsize;
119         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
120         if (dd->key.dptr == NULL) {
121                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
122                 return -1;
123         }
124
125         dd->hdr = *hdr;
126
127         trbt_insert32(vdata->delete_tree, hash, dd);
128
129         vdata->delete_count++;
130
131         return 0;
132 }
133
134 /**
135  * Add a record to the list of records to be sent
136  * to their lmaster with VACUUM_FETCH.
137  */
138 static int add_record_to_vacuum_fetch_list(struct vacuum_data *vdata,
139                                            TDB_DATA key)
140 {
141         struct ctdb_context *ctdb = vdata->ctdb;
142         struct ctdb_rec_data *rec;
143         uint32_t lmaster;
144         size_t old_size;
145
146         lmaster = ctdb_lmaster(ctdb, &key);
147
148         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
149         if (rec == NULL) {
150                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
151                 vdata->traverse_error = true;
152                 return -1;
153         }
154
155         old_size = talloc_get_size(vdata->list[lmaster]);
156         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster],
157                                                    old_size + rec->length);
158         if (vdata->list[lmaster] == NULL) {
159                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
160                 vdata->traverse_error = true;
161                 return -1;
162         }
163
164         vdata->list[lmaster]->count++;
165         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
166         talloc_free(rec);
167
168         vdata->total++;
169
170         return 0;
171 }
172
173
174 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, 
175                                                           struct timeval t, void *private_data);
176
177
178 /*
179  * traverse function for gathering the records that can be deleted
180  */
181 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
182 {
183         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
184         struct ctdb_context *ctdb = vdata->ctdb;
185         uint32_t lmaster;
186         struct ctdb_ltdb_header *hdr;
187         int res = 0;
188
189         lmaster = ctdb_lmaster(ctdb, &key);
190         if (lmaster >= ctdb->num_nodes) {
191                 DEBUG(DEBUG_CRIT, (__location__
192                                    " lmaster[%u] >= ctdb->num_nodes[%u] for key"
193                                    " with hash[%u]!\n",
194                                    (unsigned)lmaster,
195                                    (unsigned)ctdb->num_nodes,
196                                    (unsigned)ctdb_hash(&key)));
197                 return -1;
198         }
199
200         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
201                 /* its not a deleted record */
202                 return 0;
203         }
204
205         hdr = (struct ctdb_ltdb_header *)data.dptr;
206
207         if (hdr->dmaster != ctdb->pnn) {
208                 return 0;
209         }
210
211         if (lmaster == ctdb->pnn) {
212                 /*
213                  * We are both lmaster and dmaster, and the record * is empty.
214                  * So we should be able to delete it.
215                  */
216                 res = add_record_to_delete_tree(vdata, key, hdr);
217         } else {
218                 /*
219                  * We are not lmaster.
220                  * Add the record to the blob ready to send to the nodes.
221                  */
222                 res = add_record_to_vacuum_fetch_list(vdata, key);
223         }
224
225         return res;
226 }
227
228 /*
229  * traverse the tree of records to delete and marshall them into
230  * a blob
231  */
232 static void delete_traverse(void *param, void *data)
233 {
234         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
235         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
236         struct ctdb_rec_data *rec;
237         size_t old_size;
238
239         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
240         if (rec == NULL) {
241                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
242                 return;
243         }
244
245         old_size = talloc_get_size(recs->records);
246         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
247         if (recs->records == NULL) {
248                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
249                 return;
250         }
251         recs->records->count++;
252         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
253 }
254
255 /* 
256  * read-only traverse the database in order to find
257  * records that can be deleted and try to delete these
258  * records on the other nodes
259  * this executes in the child context
260  */
261 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
262 {
263         struct ctdb_context *ctdb = ctdb_db->ctdb;
264         const char *name = ctdb_db->db_name;
265         int ret, i, pnn;
266
267         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
268         if (ret != 0) {
269                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
270                 return ret;
271         }
272
273         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
274         if (pnn == -1) {
275                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
276                 return -1;
277         }
278
279         ctdb->pnn = pnn;
280         /* the list needs to be of length num_nodes */
281         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->num_nodes);
282         if (vdata->list == NULL) {
283                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
284                 return -1;
285         }
286         for (i = 0; i < ctdb->num_nodes; i++) {
287                 vdata->list[i] = (struct ctdb_marshall_buffer *)
288                         talloc_zero_size(vdata->list, 
289                                                          offsetof(struct ctdb_marshall_buffer, data));
290                 if (vdata->list[i] == NULL) {
291                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
292                         return -1;
293                 }
294                 vdata->list[i]->db_id = ctdb_db->db_id;
295         }
296
297         /* read-only traverse, looking for records that might be able to be vacuumed */
298         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
299             vdata->traverse_error) {
300                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
301                 return -1;              
302         }
303
304         /*
305          * For records where we are not the lmaster,
306          * tell the lmaster to fetch the record.
307          */
308         for (i = 0; i < ctdb->num_nodes; i++) {
309                 TDB_DATA data;
310
311                 if (ctdb->nodes[i]->pnn == ctdb->pnn) {
312                         continue;
313                 }
314
315                 if (vdata->list[i]->count == 0) {
316                         continue;
317                 }
318
319                 DEBUG(DEBUG_INFO, ("Found %u records for lmaster %u in '%s'\n",
320                                    vdata->list[i]->count, ctdb->nodes[i]->pnn,
321                                    name));
322
323                 data.dsize = talloc_get_size(vdata->list[i]);
324                 data.dptr  = (void *)vdata->list[i];
325                 if (ctdb_client_send_message(ctdb, ctdb->nodes[i]->pnn, CTDB_SRVID_VACUUM_FETCH, data) != 0) {
326                         DEBUG(DEBUG_ERR, (__location__ " Failed to send vacuum "
327                                           "fetch message to %u\n",
328                                           ctdb->nodes[i]->pnn));
329                         return -1;
330                 }
331         }       
332
333         /* Process all records we can delete (if any) */
334         if (vdata->delete_count > 0) {
335                 struct delete_records_list *recs;
336                 TDB_DATA indata, outdata;
337                 int32_t res;
338                 struct ctdb_node_map *nodemap;
339                 uint32_t *active_nodes;
340                 int num_active_nodes;
341
342                 recs = talloc_zero(vdata, struct delete_records_list);
343                 if (recs == NULL) {
344                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
345                         return -1;
346                 }
347                 recs->records = (struct ctdb_marshall_buffer *)
348                         talloc_zero_size(vdata, 
349                                     offsetof(struct ctdb_marshall_buffer, data));
350                 if (recs->records == NULL) {
351                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
352                         return -1;
353                 }
354                 recs->records->db_id = ctdb_db->db_id;
355
356                 /* 
357                  * traverse the tree of all records we want to delete and
358                  * create a blob we can send to the other nodes.
359                  */
360                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
361
362                 indata.dsize = talloc_get_size(recs->records);
363                 indata.dptr  = (void *)recs->records;
364
365                 /* 
366                  * now tell all the active nodes to delete all these records
367                  * (if possible)
368                  */
369
370                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
371                                            CTDB_CURRENT_NODE,
372                                            recs, /* talloc context */
373                                            &nodemap);
374                 if (ret != 0) {
375                         DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
376                         return -1;
377                 }
378
379                 active_nodes = list_of_active_nodes(ctdb, nodemap,
380                                                     nodemap, /* talloc context */
381                                                     false /* include self */);
382                 /* yuck! ;-) */
383                 num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
384
385                 for (i = 0; i < num_active_nodes; i++) {
386                         struct ctdb_marshall_buffer *records;
387                         struct ctdb_rec_data *rec;
388
389                         ret = ctdb_control(ctdb, active_nodes[i], 0,
390                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
391                                         indata, recs, &outdata, &res,
392                                         NULL, NULL);
393                         if (ret != 0 || res != 0) {
394                                 DEBUG(DEBUG_ERR, ("Failed to delete records on "
395                                                   "node %u: ret[%d] res[%d]\n",
396                                                   active_nodes[i], ret, res));
397                                 return -1;
398                         }
399
400                         /* 
401                          * outdata countains the list of records coming back
402                          * from the node which the node could not delete
403                          */
404                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
405                         rec = (struct ctdb_rec_data *)&records->data[0];
406                         while (records->count-- > 1) {
407                                 TDB_DATA reckey, recdata;
408                                 struct ctdb_ltdb_header *rechdr;
409
410                                 reckey.dptr = &rec->data[0];
411                                 reckey.dsize = rec->keylen;
412                                 recdata.dptr = &rec->data[reckey.dsize];
413                                 recdata.dsize = rec->datalen;
414
415                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
416                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
417                                         return -1;
418                                 }
419                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
420                                 recdata.dptr += sizeof(*rechdr);
421                                 recdata.dsize -= sizeof(*rechdr);
422
423                                 /* 
424                                  * that other node couldnt delete the record
425                                  * so we should delete it and thereby remove it from the tree
426                                  */
427                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
428
429                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
430                         }           
431                 }
432
433                 /* free nodemap and active_nodes */
434                 talloc_free(nodemap);
435
436                 /* 
437                  * The only records remaining in the tree would be those
438                  * records where all other nodes could successfully
439                  * delete them, so we can safely delete them on the
440                  * lmaster as well. Deletion implictely happens while
441                  * we repack the database. The repack algorithm revisits 
442                  * the tree in order to find the records that don't need
443                  * to be copied / repacked.
444                  */
445         }
446
447         /* this ensures we run our event queue */
448         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
449
450         return 0;
451 }
452
453
454 /*
455  * traverse function for repacking
456  */
457 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
458 {
459         struct vacuum_data *vdata = (struct vacuum_data *)private;
460
461         if (vdata->vacuum) {
462                 uint32_t hash = ctdb_hash(&key);
463                 struct delete_record_data *kd;
464                 /*
465                  * check if we can ignore this record because it's in the delete_tree
466                  */
467                 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
468                 /*
469                  * there might be hash collisions so we have to compare the keys here to be sure
470                  */
471                 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
472                         struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
473                         /*
474                          * we have to check if the record hasn't changed in the meantime in order to
475                          * savely remove it from the database
476                          */
477                         if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
478                                 hdr->dmaster == kd->ctdb->pnn &&
479                                 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
480                                 kd->hdr.rsn == hdr->rsn) {
481                                 vdata->vacuumed++;
482                                 return 0;
483                         }
484                 }
485         }
486         if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
487                 vdata->traverse_error = true;
488                 return -1;
489         }
490         vdata->copied++;
491         return 0;
492 }
493
494 /*
495  * repack a tdb
496  */
497 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
498 {
499         struct tdb_context *tmp_db;
500
501         if (tdb_transaction_start(tdb) != 0) {
502                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
503                 return -1;
504         }
505
506         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
507                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
508                           O_RDWR|O_CREAT, 0);
509         if (tmp_db == NULL) {
510                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
511                 tdb_transaction_cancel(tdb);
512                 return -1;
513         }
514
515         vdata->traverse_error = false;
516         vdata->dest_db = tmp_db;
517         vdata->vacuum = true;
518         vdata->vacuumed = 0;
519         vdata->copied = 0;
520
521         /*
522          * repack and vacuum on-the-fly by not writing the records that are
523          * no longer needed
524          */
525         if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
526                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
527                 tdb_transaction_cancel(tdb);
528                 tdb_close(tmp_db);
529                 return -1;              
530         }
531
532         DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
533         
534         if (vdata->traverse_error) {
535                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
536                 tdb_transaction_cancel(tdb);
537                 tdb_close(tmp_db);
538                 return -1;
539         }
540
541         if (tdb_wipe_all(tdb) != 0) {
542                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
543                 tdb_transaction_cancel(tdb);
544                 tdb_close(tmp_db);
545                 return -1;
546         }
547
548         vdata->traverse_error = false;
549         vdata->dest_db = tdb;
550         vdata->vacuum = false;
551         vdata->copied = 0;
552
553         if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
554                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
555                 tdb_transaction_cancel(tdb);
556                 tdb_close(tmp_db);
557                 return -1;              
558         }
559
560         if (vdata->traverse_error) {
561                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
562                 tdb_transaction_cancel(tdb);
563                 tdb_close(tmp_db);
564                 return -1;
565         }
566
567         tdb_close(tmp_db);
568
569
570         if (tdb_transaction_commit(tdb) != 0) {
571                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
572                 return -1;
573         }
574         DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
575
576         return 0;
577 }
578
579 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
580 {
581         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
582         TDB_CONTEXT *tune_tdb;
583         TDB_DATA key, value;
584         struct vacuum_tuning_data tdata;
585         struct vacuum_tuning_data *tptr;
586         char *vac_dbname;
587         int flags;
588
589         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
590                                      ctdb_db->ctdb->db_directory_state,
591                                      TUNINGDBNAME, ctdb_db->ctdb->pnn);
592         if (vac_dbname == NULL) {
593                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
594                 talloc_free(tmp_ctx);
595                 return -1;
596         }
597
598         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
599         flags |= TDB_DISALLOW_NESTING;
600         tune_tdb = tdb_open(vac_dbname, 0,
601                             flags,
602                             O_RDWR|O_CREAT, 0600);
603         if (tune_tdb == NULL) {
604                 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
605                 talloc_free(tmp_ctx);
606                 return -1;
607         }
608         
609         if (tdb_transaction_start(tune_tdb) != 0) {
610                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
611                 tdb_close(tune_tdb);
612                 return -1;
613         }
614         key.dptr = discard_const(ctdb_db->db_name);
615         key.dsize = strlen(ctdb_db->db_name);
616         value = tdb_fetch(tune_tdb, key);
617
618         if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
619                 tptr = (struct vacuum_tuning_data *)value.dptr;
620                 tdata = *tptr;
621
622                 /*
623                  * re-calc new vacuum interval:
624                  * in case no limit was reached we continously increase the interval
625                  * until vacuum_max_interval is reached
626                  * in case a limit was reached we divide the current interval by 2
627                  * unless vacuum_min_interval is reached
628                  */
629                 if (freelist < vdata->repack_limit &&
630                     vdata->delete_count < vdata->vacuum_limit) {
631                         if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
632                                 tdata.new_interval = tdata.last_interval * 110 / 100;
633                                 DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
634                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
635                         }
636                 } else {
637                         tdata.new_interval = tdata.last_interval / 2;
638                         if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
639                                 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
640                                 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
641                         }               
642                         DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
643                                          tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
644                 }
645                 tdata.last_interval = tdata.new_interval;
646         } else {
647                 DEBUG(DEBUG_DEBUG,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
648                 tdata.last_num_repack = freelist;
649                 tdata.last_num_empty = vdata->delete_count;
650                 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
651         }
652
653         if (value.dptr != NULL) {
654                 free(value.dptr);
655         }
656
657         tdata.last_start = vdata->start;
658         tdata.last_duration = timeval_elapsed(&vdata->start);
659
660         value.dptr = (unsigned char *)&tdata;
661         value.dsize = sizeof(tdata);
662
663         if (tdb_store(tune_tdb, key, value, 0) != 0) {
664                 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
665                 tdb_transaction_cancel(tune_tdb);
666                 tdb_close(tune_tdb);
667                 talloc_free(tmp_ctx);
668                 return -1;
669         }
670         tdb_transaction_commit(tune_tdb);
671         tdb_close(tune_tdb);
672         talloc_free(tmp_ctx);
673
674         return 0;
675 }
676
677 /*
678  * repack and vaccum a db
679  * called from the child context
680  */
681 static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
682                                      TALLOC_CTX *mem_ctx)
683 {
684         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
685         uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
686         const char *name = ctdb_db->db_name;
687         int size;
688         struct vacuum_data *vdata;
689
690         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
691         if (size == -1) {
692                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
693                 return -1;
694         }
695
696         vdata = talloc_zero(mem_ctx, struct vacuum_data);
697         if (vdata == NULL) {
698                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
699                 return -1;
700         }
701
702         vdata->ctdb = ctdb_db->ctdb;
703         vdata->vacuum_limit = vacuum_limit;
704         vdata->repack_limit = repack_limit;
705         vdata->delete_tree = trbt_create(vdata, 0);
706         vdata->ctdb_db = ctdb_db;
707         if (vdata->delete_tree == NULL) {
708                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
709                 talloc_free(vdata);
710                 return -1;
711         }
712
713         vdata->start = timeval_current();
714  
715         /*
716          * gather all records that can be deleted in vdata
717          */
718         if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
719                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
720         }
721
722         /*
723          * decide if a repack is necessary
724          */
725         if (size < repack_limit && vdata->delete_count < vacuum_limit) {
726                 update_tuning_db(ctdb_db, vdata, size);
727                 talloc_free(vdata);
728                 return 0;
729         }
730
731         DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
732                         name, size, vdata->delete_count));
733
734         /*
735          * repack and implicitely get rid of the records we can delete
736          */
737         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
738                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
739                 update_tuning_db(ctdb_db, vdata, size);
740                 talloc_free(vdata);
741                 return -1;
742         }
743         update_tuning_db(ctdb_db, vdata, size);
744         talloc_free(vdata);
745
746         return 0;
747 }
748
749 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
750 {
751         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
752         TDB_CONTEXT *tdb;
753         TDB_DATA key, value;
754         char *vac_dbname;
755         uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
756         struct ctdb_context *ctdb = ctdb_db->ctdb;
757         int flags;
758
759         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
760         if (vac_dbname == NULL) {
761                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
762                 talloc_free(tmp_ctx);
763                 return interval;
764         }
765
766         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
767         flags |= TDB_DISALLOW_NESTING;
768         tdb = tdb_open(vac_dbname, 0,
769                        flags,
770                        O_RDWR|O_CREAT, 0600);
771         if (!tdb) {
772                 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval. Errno : %s (%d)\n", vac_dbname, strerror(errno), errno));
773                 talloc_free(tmp_ctx);
774                 return interval;
775         }
776
777         key.dptr = discard_const(ctdb_db->db_name);
778         key.dsize = strlen(ctdb_db->db_name);
779
780         value = tdb_fetch(tdb, key);
781
782         if (value.dptr != NULL) {
783                 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
784                         struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
785
786                         interval = tptr->new_interval;
787
788                         if (interval < ctdb->tunable.vacuum_min_interval) {
789                                 interval = ctdb->tunable.vacuum_min_interval;
790                         } 
791                         if (interval > ctdb->tunable.vacuum_max_interval) {
792                                 interval = ctdb->tunable.vacuum_max_interval;
793                         }
794                 }
795                 free(value.dptr);
796         }
797         tdb_close(tdb);
798
799         talloc_free(tmp_ctx);
800
801         return interval;
802 }
803
804 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
805 {
806         double l = timeval_elapsed(&child_ctx->start_time);
807         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
808         struct ctdb_context *ctdb = ctdb_db->ctdb;
809
810         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
811
812         if (child_ctx->child_pid != -1) {
813                 kill(child_ctx->child_pid, SIGKILL);
814         }
815
816         DLIST_REMOVE(ctdb->vacuumers, child_ctx);
817
818         event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
819                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
820                         ctdb_vacuum_event, child_ctx->vacuum_handle);
821
822         return 0;
823 }
824
825 /*
826  * this event is generated when a vacuum child process times out
827  */
828 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
829                                          struct timeval t, void *private_data)
830 {
831         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
832
833         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
834
835         child_ctx->status = VACUUM_TIMEOUT;
836
837         talloc_free(child_ctx);
838 }
839
840
841 /*
842  * this event is generated when a vacuum child process has completed
843  */
844 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
845                              uint16_t flags, void *private_data)
846 {
847         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
848         char c = 0;
849         int ret;
850
851         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
852         child_ctx->child_pid = -1;
853
854         ret = read(child_ctx->fd[0], &c, 1);
855         if (ret != 1 || c != 0) {
856                 child_ctx->status = VACUUM_ERROR;
857                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
858         } else {
859                 child_ctx->status = VACUUM_OK;
860         }
861
862         talloc_free(child_ctx);
863 }
864
865 /*
866  * this event is called every time we need to start a new vacuum process
867  */
868 static void
869 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
870                                struct timeval t, void *private_data)
871 {
872         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
873         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
874         struct ctdb_context *ctdb = ctdb_db->ctdb;
875         struct ctdb_vacuum_child_context *child_ctx;
876         struct tevent_fd *fde;
877         int ret;
878
879         /* we dont vacuum if we are in recovery mode, or db frozen */
880         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
881             ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
882                 DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
883                                    ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
884                                    : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
885                                    ? "freeze pending"
886                                    : "frozen"));
887                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
888                 return;
889         }
890
891         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
892         if (child_ctx == NULL) {
893                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
894                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
895         }
896
897
898         ret = pipe(child_ctx->fd);
899         if (ret != 0) {
900                 talloc_free(child_ctx);
901                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
902                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
903                 return;
904         }
905
906         child_ctx->child_pid = ctdb_fork(ctdb);
907         if (child_ctx->child_pid == (pid_t)-1) {
908                 close(child_ctx->fd[0]);
909                 close(child_ctx->fd[1]);
910                 talloc_free(child_ctx);
911                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
912                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
913                 return;
914         }
915
916
917         if (child_ctx->child_pid == 0) {
918                 char cc = 0;
919                 close(child_ctx->fd[0]);
920
921                 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
922         
923                 if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
924                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
925                         _exit(1);
926                 }
927
928                 /* 
929                  * repack the db
930                  */
931                 cc = ctdb_vacuum_and_repack_db(ctdb_db, child_ctx);
932
933                 write(child_ctx->fd[1], &cc, 1);
934                 _exit(0);
935         }
936
937         set_close_on_exec(child_ctx->fd[0]);
938         close(child_ctx->fd[1]);
939
940         child_ctx->status = VACUUM_RUNNING;
941         child_ctx->start_time = timeval_current();
942
943         DLIST_ADD(ctdb->vacuumers, child_ctx);
944         talloc_set_destructor(child_ctx, vacuum_child_destructor);
945
946         /*
947          * Clear the fastpath vacuuming list in the parent.
948          */
949         talloc_free(ctdb_db->delete_queue);
950         ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
951         if (ctdb_db->delete_queue == NULL) {
952                 /* fatal here? ... */
953                 ctdb_fatal(ctdb, "Out of memory when re-creating vacuum tree "
954                                  "in parent context. Shutting down\n");
955         }
956
957         event_add_timed(ctdb->ev, child_ctx,
958                 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
959                 vacuum_child_timeout, child_ctx);
960
961         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
962
963         fde = event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
964                            EVENT_FD_READ, vacuum_child_handler, child_ctx);
965         tevent_fd_set_auto_close(fde);
966
967         vacuum_handle->child_ctx = child_ctx;
968         child_ctx->vacuum_handle = vacuum_handle;
969 }
970
971 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
972 {
973         /* Simply free them all. */
974         while (ctdb->vacuumers) {
975                 DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
976                            ctdb->vacuumers->vacuum_handle->ctdb_db->db_name,
977                            (int)ctdb->vacuumers->child_pid));
978                 /* vacuum_child_destructor kills it, removes from list */
979                 talloc_free(ctdb->vacuumers);
980         }
981 }
982
983 /* this function initializes the vacuuming context for a database
984  * starts the vacuuming events
985  */
986 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
987 {
988         if (ctdb_db->persistent != 0) {
989                 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
990                 return 0;
991         }
992
993         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
994         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
995
996         ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
997
998         event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
999                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
1000                         ctdb_vacuum_event, ctdb_db->vacuum_handle);
1001
1002         return 0;
1003 }