vacuum: refactor new add_record_to_vacuum_fetch_list() out of vacuum_traverse().
[garming/samba-autobuild/.git] / ctdb / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/tevent/tevent.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
35
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
37
38 struct ctdb_vacuum_child_context {
39         struct ctdb_vacuum_child_context *next, *prev;
40         struct ctdb_vacuum_handle *vacuum_handle;
41         /* fd child writes status to */
42         int fd[2];
43         pid_t child_pid;
44         enum vacuum_child_status status;
45         struct timeval start_time;
46 };
47
48 struct ctdb_vacuum_handle {
49         struct ctdb_db_context *ctdb_db;
50         struct ctdb_vacuum_child_context *child_ctx;
51 };
52
53
54 /*  a list of records to possibly delete */
55 struct vacuum_data {
56         uint32_t vacuum_limit;
57         uint32_t repack_limit;
58         struct ctdb_context *ctdb;
59         struct ctdb_db_context *ctdb_db;
60         struct tdb_context *dest_db;
61         trbt_tree_t *delete_tree;
62         uint32_t delete_count;
63         struct ctdb_marshall_buffer **list;
64         struct timeval start;
65         bool traverse_error;
66         bool vacuum;
67         uint32_t total;
68         uint32_t vacuumed;
69         uint32_t copied;
70 };
71
72 /* tuning information stored for every db */
73 struct vacuum_tuning_data {
74         uint32_t last_num_repack;
75         uint32_t last_num_empty;
76         uint32_t last_interval;
77         uint32_t new_interval;
78         struct timeval last_start;
79         double   last_duration;
80 };
81
82 /* this structure contains the information for one record to be deleted */
83 struct delete_record_data {
84         struct ctdb_context *ctdb;
85         struct ctdb_db_context *ctdb_db;
86         struct ctdb_ltdb_header hdr;
87         TDB_DATA key;
88 };
89
90 struct delete_records_list {
91         struct ctdb_marshall_buffer *records;
92 };
93
94 /**
95  * Add a record to the list of records to be sent
96  * to their lmaster with VACUUM_FETCH.
97  */
98 static int add_record_to_vacuum_fetch_list(struct vacuum_data *vdata,
99                                            TDB_DATA key)
100 {
101         struct ctdb_context *ctdb = vdata->ctdb;
102         struct ctdb_rec_data *rec;
103         uint32_t lmaster;
104         size_t old_size;
105
106         lmaster = ctdb_lmaster(ctdb, &key);
107
108         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
109         if (rec == NULL) {
110                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
111                 vdata->traverse_error = true;
112                 return -1;
113         }
114
115         old_size = talloc_get_size(vdata->list[lmaster]);
116         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster],
117                                                    old_size + rec->length);
118         if (vdata->list[lmaster] == NULL) {
119                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
120                 vdata->traverse_error = true;
121                 return -1;
122         }
123
124         vdata->list[lmaster]->count++;
125         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
126         talloc_free(rec);
127
128         vdata->total++;
129
130         return 0;
131 }
132
133
134 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, 
135                                                           struct timeval t, void *private_data);
136
137
138 /*
139  * traverse function for gathering the records that can be deleted
140  */
141 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
142 {
143         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
144         struct ctdb_context *ctdb = vdata->ctdb;
145         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
146         uint32_t lmaster;
147         struct ctdb_ltdb_header *hdr;
148         int res;
149
150         lmaster = ctdb_lmaster(ctdb, &key);
151         if (lmaster >= ctdb->num_nodes) {
152                 DEBUG(DEBUG_CRIT, (__location__
153                                    " lmaster[%u] >= ctdb->num_nodes[%u] for key"
154                                    " with hash[%u]!\n",
155                                    (unsigned)lmaster,
156                                    (unsigned)ctdb->num_nodes,
157                                    (unsigned)ctdb_hash(&key)));
158                 return -1;
159         }
160
161         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
162                 /* its not a deleted record */
163                 return 0;
164         }
165
166         hdr = (struct ctdb_ltdb_header *)data.dptr;
167
168         if (hdr->dmaster != ctdb->pnn) {
169                 return 0;
170         }
171
172         /* Is this a record we could possibly delete? I.e.
173            if the record is empty and also we are both lmaster
174            and dmaster for the record we should be able to delete it
175         */
176         if (lmaster == ctdb->pnn) {
177                 uint32_t hash;
178
179                 hash = ctdb_hash(&key);
180                 if (trbt_lookup32(vdata->delete_tree, hash)) {
181                         DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
182                 } 
183                 else {
184                         struct delete_record_data *dd;
185
186                         /* store key and header indexed by the key hash */
187                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
188                         if (dd == NULL) {
189                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
190                                 return -1;
191                         }
192                         dd->ctdb      = ctdb;
193                         dd->ctdb_db   = ctdb_db;
194                         dd->key.dsize = key.dsize;
195                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
196                         if (dd->key.dptr == NULL) {
197                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
198                                 return -1;
199                         }
200
201                         dd->hdr = *hdr;
202         
203                         trbt_insert32(vdata->delete_tree, hash, dd);
204
205                         vdata->delete_count++;
206                 }
207         }
208
209         /* add the record to the blob ready to send to the nodes */
210         res = add_record_to_vacuum_fetch_list(vdata, key);
211
212         return res;
213 }
214
215 /*
216  * traverse the tree of records to delete and marshall them into
217  * a blob
218  */
219 static void delete_traverse(void *param, void *data)
220 {
221         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
222         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
223         struct ctdb_rec_data *rec;
224         size_t old_size;
225
226         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
227         if (rec == NULL) {
228                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
229                 return;
230         }
231
232         old_size = talloc_get_size(recs->records);
233         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
234         if (recs->records == NULL) {
235                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
236                 return;
237         }
238         recs->records->count++;
239         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
240 }
241
242 /* 
243  * read-only traverse the database in order to find
244  * records that can be deleted and try to delete these
245  * records on the other nodes
246  * this executes in the child context
247  */
248 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
249 {
250         struct ctdb_context *ctdb = ctdb_db->ctdb;
251         const char *name = ctdb_db->db_name;
252         int ret, i, pnn;
253
254         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
255         if (ret != 0) {
256                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
257                 return ret;
258         }
259
260         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
261         if (pnn == -1) {
262                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
263                 return -1;
264         }
265
266         ctdb->pnn = pnn;
267         /* the list needs to be of length num_nodes */
268         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->num_nodes);
269         if (vdata->list == NULL) {
270                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
271                 return -1;
272         }
273         for (i = 0; i < ctdb->num_nodes; i++) {
274                 vdata->list[i] = (struct ctdb_marshall_buffer *)
275                         talloc_zero_size(vdata->list, 
276                                                          offsetof(struct ctdb_marshall_buffer, data));
277                 if (vdata->list[i] == NULL) {
278                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
279                         return -1;
280                 }
281                 vdata->list[i]->db_id = ctdb_db->db_id;
282         }
283
284         /* read-only traverse, looking for records that might be able to be vacuumed */
285         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
286             vdata->traverse_error) {
287                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
288                 return -1;              
289         }
290
291         /*
292          * For records where we are not the lmaster,
293          * tell the lmaster to fetch the record.
294          */
295         for (i = 0; i < ctdb->num_nodes; i++) {
296                 TDB_DATA data;
297
298                 if (ctdb->nodes[i]->pnn == ctdb->pnn) {
299                         continue;
300                 }
301
302                 if (vdata->list[i]->count == 0) {
303                         continue;
304                 }
305
306                 DEBUG(DEBUG_INFO, ("Found %u records for lmaster %u in '%s'\n",
307                                    vdata->list[i]->count, ctdb->nodes[i]->pnn,
308                                    name));
309
310                 data.dsize = talloc_get_size(vdata->list[i]);
311                 data.dptr  = (void *)vdata->list[i];
312                 if (ctdb_client_send_message(ctdb, ctdb->nodes[i]->pnn, CTDB_SRVID_VACUUM_FETCH, data) != 0) {
313                         DEBUG(DEBUG_ERR, (__location__ " Failed to send vacuum "
314                                           "fetch message to %u\n",
315                                           ctdb->nodes[i]->pnn));
316                         return -1;
317                 }
318         }       
319
320         /* Process all records we can delete (if any) */
321         if (vdata->delete_count > 0) {
322                 struct delete_records_list *recs;
323                 TDB_DATA indata, outdata;
324                 int32_t res;
325                 struct ctdb_node_map *nodemap;
326                 uint32_t *active_nodes;
327                 int num_active_nodes;
328
329                 recs = talloc_zero(vdata, struct delete_records_list);
330                 if (recs == NULL) {
331                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
332                         return -1;
333                 }
334                 recs->records = (struct ctdb_marshall_buffer *)
335                         talloc_zero_size(vdata, 
336                                     offsetof(struct ctdb_marshall_buffer, data));
337                 if (recs->records == NULL) {
338                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
339                         return -1;
340                 }
341                 recs->records->db_id = ctdb_db->db_id;
342
343                 /* 
344                  * traverse the tree of all records we want to delete and
345                  * create a blob we can send to the other nodes.
346                  */
347                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
348
349                 indata.dsize = talloc_get_size(recs->records);
350                 indata.dptr  = (void *)recs->records;
351
352                 /* 
353                  * now tell all the active nodes to delete all these records
354                  * (if possible)
355                  */
356
357                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
358                                            CTDB_CURRENT_NODE,
359                                            recs, /* talloc context */
360                                            &nodemap);
361                 if (ret != 0) {
362                         DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
363                         return -1;
364                 }
365
366                 active_nodes = list_of_active_nodes(ctdb, nodemap,
367                                                     nodemap, /* talloc context */
368                                                     false /* include self */);
369                 /* yuck! ;-) */
370                 num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
371
372                 for (i = 0; i < num_active_nodes; i++) {
373                         struct ctdb_marshall_buffer *records;
374                         struct ctdb_rec_data *rec;
375
376                         ret = ctdb_control(ctdb, active_nodes[i], 0,
377                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
378                                         indata, recs, &outdata, &res,
379                                         NULL, NULL);
380                         if (ret != 0 || res != 0) {
381                                 DEBUG(DEBUG_ERR, ("Failed to delete records on "
382                                                   "node %u: ret[%d] res[%d]\n",
383                                                   active_nodes[i], ret, res));
384                                 return -1;
385                         }
386
387                         /* 
388                          * outdata countains the list of records coming back
389                          * from the node which the node could not delete
390                          */
391                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
392                         rec = (struct ctdb_rec_data *)&records->data[0];
393                         while (records->count-- > 1) {
394                                 TDB_DATA reckey, recdata;
395                                 struct ctdb_ltdb_header *rechdr;
396
397                                 reckey.dptr = &rec->data[0];
398                                 reckey.dsize = rec->keylen;
399                                 recdata.dptr = &rec->data[reckey.dsize];
400                                 recdata.dsize = rec->datalen;
401
402                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
403                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
404                                         return -1;
405                                 }
406                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
407                                 recdata.dptr += sizeof(*rechdr);
408                                 recdata.dsize -= sizeof(*rechdr);
409
410                                 /* 
411                                  * that other node couldnt delete the record
412                                  * so we should delete it and thereby remove it from the tree
413                                  */
414                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
415
416                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
417                         }           
418                 }
419
420                 /* free nodemap and active_nodes */
421                 talloc_free(nodemap);
422
423                 /* 
424                  * The only records remaining in the tree would be those
425                  * records where all other nodes could successfully
426                  * delete them, so we can safely delete them on the
427                  * lmaster as well. Deletion implictely happens while
428                  * we repack the database. The repack algorithm revisits 
429                  * the tree in order to find the records that don't need
430                  * to be copied / repacked.
431                  */
432         }
433
434         /* this ensures we run our event queue */
435         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
436
437         return 0;
438 }
439
440
441 /*
442  * traverse function for repacking
443  */
444 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
445 {
446         struct vacuum_data *vdata = (struct vacuum_data *)private;
447
448         if (vdata->vacuum) {
449                 uint32_t hash = ctdb_hash(&key);
450                 struct delete_record_data *kd;
451                 /*
452                  * check if we can ignore this record because it's in the delete_tree
453                  */
454                 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
455                 /*
456                  * there might be hash collisions so we have to compare the keys here to be sure
457                  */
458                 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
459                         struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
460                         /*
461                          * we have to check if the record hasn't changed in the meantime in order to
462                          * savely remove it from the database
463                          */
464                         if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
465                                 hdr->dmaster == kd->ctdb->pnn &&
466                                 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
467                                 kd->hdr.rsn == hdr->rsn) {
468                                 vdata->vacuumed++;
469                                 return 0;
470                         }
471                 }
472         }
473         if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
474                 vdata->traverse_error = true;
475                 return -1;
476         }
477         vdata->copied++;
478         return 0;
479 }
480
481 /*
482  * repack a tdb
483  */
484 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
485 {
486         struct tdb_context *tmp_db;
487
488         if (tdb_transaction_start(tdb) != 0) {
489                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
490                 return -1;
491         }
492
493         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
494                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
495                           O_RDWR|O_CREAT, 0);
496         if (tmp_db == NULL) {
497                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
498                 tdb_transaction_cancel(tdb);
499                 return -1;
500         }
501
502         vdata->traverse_error = false;
503         vdata->dest_db = tmp_db;
504         vdata->vacuum = true;
505         vdata->vacuumed = 0;
506         vdata->copied = 0;
507
508         /*
509          * repack and vacuum on-the-fly by not writing the records that are
510          * no longer needed
511          */
512         if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
513                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
514                 tdb_transaction_cancel(tdb);
515                 tdb_close(tmp_db);
516                 return -1;              
517         }
518
519         DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
520         
521         if (vdata->traverse_error) {
522                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
523                 tdb_transaction_cancel(tdb);
524                 tdb_close(tmp_db);
525                 return -1;
526         }
527
528         if (tdb_wipe_all(tdb) != 0) {
529                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
530                 tdb_transaction_cancel(tdb);
531                 tdb_close(tmp_db);
532                 return -1;
533         }
534
535         vdata->traverse_error = false;
536         vdata->dest_db = tdb;
537         vdata->vacuum = false;
538         vdata->copied = 0;
539
540         if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
541                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
542                 tdb_transaction_cancel(tdb);
543                 tdb_close(tmp_db);
544                 return -1;              
545         }
546
547         if (vdata->traverse_error) {
548                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
549                 tdb_transaction_cancel(tdb);
550                 tdb_close(tmp_db);
551                 return -1;
552         }
553
554         tdb_close(tmp_db);
555
556
557         if (tdb_transaction_commit(tdb) != 0) {
558                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
559                 return -1;
560         }
561         DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
562
563         return 0;
564 }
565
566 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
567 {
568         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
569         TDB_CONTEXT *tune_tdb;
570         TDB_DATA key, value;
571         struct vacuum_tuning_data tdata;
572         struct vacuum_tuning_data *tptr;
573         char *vac_dbname;
574         int flags;
575
576         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
577                                      ctdb_db->ctdb->db_directory_state,
578                                      TUNINGDBNAME, ctdb_db->ctdb->pnn);
579         if (vac_dbname == NULL) {
580                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
581                 talloc_free(tmp_ctx);
582                 return -1;
583         }
584
585         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
586         flags |= TDB_DISALLOW_NESTING;
587         tune_tdb = tdb_open(vac_dbname, 0,
588                             flags,
589                             O_RDWR|O_CREAT, 0600);
590         if (tune_tdb == NULL) {
591                 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
592                 talloc_free(tmp_ctx);
593                 return -1;
594         }
595         
596         if (tdb_transaction_start(tune_tdb) != 0) {
597                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
598                 tdb_close(tune_tdb);
599                 return -1;
600         }
601         key.dptr = discard_const(ctdb_db->db_name);
602         key.dsize = strlen(ctdb_db->db_name);
603         value = tdb_fetch(tune_tdb, key);
604
605         if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
606                 tptr = (struct vacuum_tuning_data *)value.dptr;
607                 tdata = *tptr;
608
609                 /*
610                  * re-calc new vacuum interval:
611                  * in case no limit was reached we continously increase the interval
612                  * until vacuum_max_interval is reached
613                  * in case a limit was reached we divide the current interval by 2
614                  * unless vacuum_min_interval is reached
615                  */
616                 if (freelist < vdata->repack_limit &&
617                     vdata->delete_count < vdata->vacuum_limit) {
618                         if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
619                                 tdata.new_interval = tdata.last_interval * 110 / 100;
620                                 DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
621                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
622                         }
623                 } else {
624                         tdata.new_interval = tdata.last_interval / 2;
625                         if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
626                                 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
627                                 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
628                         }               
629                         DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
630                                          tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
631                 }
632                 tdata.last_interval = tdata.new_interval;
633         } else {
634                 DEBUG(DEBUG_DEBUG,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
635                 tdata.last_num_repack = freelist;
636                 tdata.last_num_empty = vdata->delete_count;
637                 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
638         }
639
640         if (value.dptr != NULL) {
641                 free(value.dptr);
642         }
643
644         tdata.last_start = vdata->start;
645         tdata.last_duration = timeval_elapsed(&vdata->start);
646
647         value.dptr = (unsigned char *)&tdata;
648         value.dsize = sizeof(tdata);
649
650         if (tdb_store(tune_tdb, key, value, 0) != 0) {
651                 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
652                 tdb_transaction_cancel(tune_tdb);
653                 tdb_close(tune_tdb);
654                 talloc_free(tmp_ctx);
655                 return -1;
656         }
657         tdb_transaction_commit(tune_tdb);
658         tdb_close(tune_tdb);
659         talloc_free(tmp_ctx);
660
661         return 0;
662 }
663
664 /*
665  * repack and vaccum a db
666  * called from the child context
667  */
668 static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
669                                      TALLOC_CTX *mem_ctx)
670 {
671         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
672         uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
673         const char *name = ctdb_db->db_name;
674         int size;
675         struct vacuum_data *vdata;
676
677         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
678         if (size == -1) {
679                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
680                 return -1;
681         }
682
683         vdata = talloc_zero(mem_ctx, struct vacuum_data);
684         if (vdata == NULL) {
685                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
686                 return -1;
687         }
688
689         vdata->ctdb = ctdb_db->ctdb;
690         vdata->vacuum_limit = vacuum_limit;
691         vdata->repack_limit = repack_limit;
692         vdata->delete_tree = trbt_create(vdata, 0);
693         vdata->ctdb_db = ctdb_db;
694         if (vdata->delete_tree == NULL) {
695                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
696                 talloc_free(vdata);
697                 return -1;
698         }
699
700         vdata->start = timeval_current();
701  
702         /*
703          * gather all records that can be deleted in vdata
704          */
705         if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
706                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
707         }
708
709         /*
710          * decide if a repack is necessary
711          */
712         if (size < repack_limit && vdata->delete_count < vacuum_limit) {
713                 update_tuning_db(ctdb_db, vdata, size);
714                 talloc_free(vdata);
715                 return 0;
716         }
717
718         DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
719                         name, size, vdata->delete_count));
720
721         /*
722          * repack and implicitely get rid of the records we can delete
723          */
724         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
725                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
726                 update_tuning_db(ctdb_db, vdata, size);
727                 talloc_free(vdata);
728                 return -1;
729         }
730         update_tuning_db(ctdb_db, vdata, size);
731         talloc_free(vdata);
732
733         return 0;
734 }
735
736 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
737 {
738         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
739         TDB_CONTEXT *tdb;
740         TDB_DATA key, value;
741         char *vac_dbname;
742         uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
743         struct ctdb_context *ctdb = ctdb_db->ctdb;
744         int flags;
745
746         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
747         if (vac_dbname == NULL) {
748                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
749                 talloc_free(tmp_ctx);
750                 return interval;
751         }
752
753         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
754         flags |= TDB_DISALLOW_NESTING;
755         tdb = tdb_open(vac_dbname, 0,
756                        flags,
757                        O_RDWR|O_CREAT, 0600);
758         if (!tdb) {
759                 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval. Errno : %s (%d)\n", vac_dbname, strerror(errno), errno));
760                 talloc_free(tmp_ctx);
761                 return interval;
762         }
763
764         key.dptr = discard_const(ctdb_db->db_name);
765         key.dsize = strlen(ctdb_db->db_name);
766
767         value = tdb_fetch(tdb, key);
768
769         if (value.dptr != NULL) {
770                 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
771                         struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
772
773                         interval = tptr->new_interval;
774
775                         if (interval < ctdb->tunable.vacuum_min_interval) {
776                                 interval = ctdb->tunable.vacuum_min_interval;
777                         } 
778                         if (interval > ctdb->tunable.vacuum_max_interval) {
779                                 interval = ctdb->tunable.vacuum_max_interval;
780                         }
781                 }
782                 free(value.dptr);
783         }
784         tdb_close(tdb);
785
786         talloc_free(tmp_ctx);
787
788         return interval;
789 }
790
791 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
792 {
793         double l = timeval_elapsed(&child_ctx->start_time);
794         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
795         struct ctdb_context *ctdb = ctdb_db->ctdb;
796
797         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
798
799         if (child_ctx->child_pid != -1) {
800                 kill(child_ctx->child_pid, SIGKILL);
801         }
802
803         DLIST_REMOVE(ctdb->vacuumers, child_ctx);
804
805         event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
806                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
807                         ctdb_vacuum_event, child_ctx->vacuum_handle);
808
809         return 0;
810 }
811
812 /*
813  * this event is generated when a vacuum child process times out
814  */
815 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
816                                          struct timeval t, void *private_data)
817 {
818         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
819
820         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
821
822         child_ctx->status = VACUUM_TIMEOUT;
823
824         talloc_free(child_ctx);
825 }
826
827
828 /*
829  * this event is generated when a vacuum child process has completed
830  */
831 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
832                              uint16_t flags, void *private_data)
833 {
834         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
835         char c = 0;
836         int ret;
837
838         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
839         child_ctx->child_pid = -1;
840
841         ret = read(child_ctx->fd[0], &c, 1);
842         if (ret != 1 || c != 0) {
843                 child_ctx->status = VACUUM_ERROR;
844                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
845         } else {
846                 child_ctx->status = VACUUM_OK;
847         }
848
849         talloc_free(child_ctx);
850 }
851
852 /*
853  * this event is called every time we need to start a new vacuum process
854  */
855 static void
856 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
857                                struct timeval t, void *private_data)
858 {
859         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
860         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
861         struct ctdb_context *ctdb = ctdb_db->ctdb;
862         struct ctdb_vacuum_child_context *child_ctx;
863         struct tevent_fd *fde;
864         int ret;
865
866         /* we dont vacuum if we are in recovery mode, or db frozen */
867         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
868             ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
869                 DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
870                                    ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
871                                    : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
872                                    ? "freeze pending"
873                                    : "frozen"));
874                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
875                 return;
876         }
877
878         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
879         if (child_ctx == NULL) {
880                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
881                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
882         }
883
884
885         ret = pipe(child_ctx->fd);
886         if (ret != 0) {
887                 talloc_free(child_ctx);
888                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
889                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
890                 return;
891         }
892
893         child_ctx->child_pid = ctdb_fork(ctdb);
894         if (child_ctx->child_pid == (pid_t)-1) {
895                 close(child_ctx->fd[0]);
896                 close(child_ctx->fd[1]);
897                 talloc_free(child_ctx);
898                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
899                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
900                 return;
901         }
902
903
904         if (child_ctx->child_pid == 0) {
905                 char cc = 0;
906                 close(child_ctx->fd[0]);
907
908                 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
909         
910                 if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
911                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
912                         _exit(1);
913                 }
914
915                 /* 
916                  * repack the db
917                  */
918                 cc = ctdb_vacuum_and_repack_db(ctdb_db, child_ctx);
919
920                 write(child_ctx->fd[1], &cc, 1);
921                 _exit(0);
922         }
923
924         set_close_on_exec(child_ctx->fd[0]);
925         close(child_ctx->fd[1]);
926
927         child_ctx->status = VACUUM_RUNNING;
928         child_ctx->start_time = timeval_current();
929
930         DLIST_ADD(ctdb->vacuumers, child_ctx);
931         talloc_set_destructor(child_ctx, vacuum_child_destructor);
932
933         /*
934          * Clear the fastpath vacuuming list in the parent.
935          */
936         talloc_free(ctdb_db->delete_queue);
937         ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
938         if (ctdb_db->delete_queue == NULL) {
939                 /* fatal here? ... */
940                 ctdb_fatal(ctdb, "Out of memory when re-creating vacuum tree "
941                                  "in parent context. Shutting down\n");
942         }
943
944         event_add_timed(ctdb->ev, child_ctx,
945                 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
946                 vacuum_child_timeout, child_ctx);
947
948         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
949
950         fde = event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
951                            EVENT_FD_READ, vacuum_child_handler, child_ctx);
952         tevent_fd_set_auto_close(fde);
953
954         vacuum_handle->child_ctx = child_ctx;
955         child_ctx->vacuum_handle = vacuum_handle;
956 }
957
958 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
959 {
960         /* Simply free them all. */
961         while (ctdb->vacuumers) {
962                 DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
963                            ctdb->vacuumers->vacuum_handle->ctdb_db->db_name,
964                            (int)ctdb->vacuumers->child_pid));
965                 /* vacuum_child_destructor kills it, removes from list */
966                 talloc_free(ctdb->vacuumers);
967         }
968 }
969
970 /* this function initializes the vacuuming context for a database
971  * starts the vacuuming events
972  */
973 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
974 {
975         if (ctdb_db->persistent != 0) {
976                 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
977                 return 0;
978         }
979
980         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
981         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
982
983         ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
984
985         event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
986                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
987                         ctdb_vacuum_event, ctdb_db->vacuum_handle);
988
989         return 0;
990 }