pytdb: Add support for tdb_enable_seqnum, tdb_get_seqnum and tdb_increment_seqnum_non...
[metze/ctdb/wip.git] / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/tevent/tevent.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
35
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
37
38 struct ctdb_vacuum_child_context {
39         struct ctdb_vacuum_child_context *next, *prev;
40         struct ctdb_vacuum_handle *vacuum_handle;
41         /* fd child writes status to */
42         int fd[2];
43         pid_t child_pid;
44         enum vacuum_child_status status;
45         struct timeval start_time;
46 };
47
48 struct ctdb_vacuum_handle {
49         struct ctdb_db_context *ctdb_db;
50         struct ctdb_vacuum_child_context *child_ctx;
51 };
52
53
54 /*  a list of records to possibly delete */
55 struct vacuum_data {
56         uint32_t vacuum_limit;
57         uint32_t repack_limit;
58         struct ctdb_context *ctdb;
59         struct ctdb_db_context *ctdb_db;
60         struct tdb_context *dest_db;
61         trbt_tree_t *delete_tree;
62         uint32_t delete_count;
63         struct ctdb_marshall_buffer **list;
64         struct timeval start;
65         bool traverse_error;
66         bool vacuum;
67         uint32_t total;
68         uint32_t vacuumed;
69         uint32_t copied;
70 };
71
72 /* tuning information stored for every db */
73 struct vacuum_tuning_data {
74         uint32_t last_num_repack;
75         uint32_t last_num_empty;
76         uint32_t last_interval;
77         uint32_t new_interval;
78         struct timeval last_start;
79         double   last_duration;
80 };
81
82 /* this structure contains the information for one record to be deleted */
83 struct delete_record_data {
84         struct ctdb_context *ctdb;
85         struct ctdb_db_context *ctdb_db;
86         struct ctdb_ltdb_header hdr;
87         TDB_DATA key;
88 };
89
90 struct delete_records_list {
91         struct ctdb_marshall_buffer *records;
92 };
93
94 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, 
95                                                           struct timeval t, void *private_data);
96
97
98 /*
99  * traverse function for gathering the records that can be deleted
100  */
101 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
102 {
103         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
104         struct ctdb_context *ctdb = vdata->ctdb;
105         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
106         uint32_t lmaster;
107         struct ctdb_ltdb_header *hdr;
108         struct ctdb_rec_data *rec;
109         size_t old_size;
110                
111         lmaster = ctdb_lmaster(ctdb, &key);
112         if (lmaster >= ctdb->vnn_map->size) {
113                 return 0;
114         }
115
116         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
117                 /* its not a deleted record */
118                 return 0;
119         }
120
121         hdr = (struct ctdb_ltdb_header *)data.dptr;
122
123         if (hdr->dmaster != ctdb->pnn) {
124                 return 0;
125         }
126
127         /* is this a records we could possibly delete? I.e.
128            if the record is empty and also we are both lmaster
129            and dmaster for the record we should be able to delete it
130         */
131         if (lmaster == ctdb->pnn) {
132                 uint32_t hash;
133
134                 hash = ctdb_hash(&key);
135                 if (trbt_lookup32(vdata->delete_tree, hash)) {
136                         DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
137                 } 
138                 else {
139                         struct delete_record_data *dd;
140
141                         /* store key and header indexed by the key hash */
142                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
143                         if (dd == NULL) {
144                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
145                                 return -1;
146                         }
147                         dd->ctdb      = ctdb;
148                         dd->ctdb_db   = ctdb_db;
149                         dd->key.dsize = key.dsize;
150                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
151                         if (dd->key.dptr == NULL) {
152                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
153                                 return -1;
154                         }
155
156                         dd->hdr = *hdr;
157         
158                         trbt_insert32(vdata->delete_tree, hash, dd);
159
160                         vdata->delete_count++;
161                 }
162         }
163
164         /* add the record to the blob ready to send to the nodes */
165         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
166         if (rec == NULL) {
167                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
168                 vdata->traverse_error = true;
169                 return -1;
170         }
171         old_size = talloc_get_size(vdata->list[lmaster]);
172         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
173                                                    old_size + rec->length);
174         if (vdata->list[lmaster] == NULL) {
175                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
176                 vdata->traverse_error = true;
177                 return -1;
178         }
179         vdata->list[lmaster]->count++;
180         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
181         talloc_free(rec);
182
183         vdata->total++;
184
185         return 0;
186 }
187
188 /*
189  * traverse the tree of records to delete and marshall them into
190  * a blob
191  */
192 static void delete_traverse(void *param, void *data)
193 {
194         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
195         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
196         struct ctdb_rec_data *rec;
197         size_t old_size;
198
199         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
200         if (rec == NULL) {
201                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
202                 return;
203         }
204
205         old_size = talloc_get_size(recs->records);
206         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
207         if (recs->records == NULL) {
208                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
209                 return;
210         }
211         recs->records->count++;
212         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
213 }
214
215 /* 
216  * read-only traverse the database in order to find
217  * records that can be deleted and try to delete these
218  * records on the other nodes
219  * this executes in the child context
220  */
221 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
222 {
223         struct ctdb_context *ctdb = ctdb_db->ctdb;
224         const char *name = ctdb_db->db_name;
225         int ret, i, pnn;
226
227         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
228         if (ret != 0) {
229                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
230                 return ret;
231         }
232
233         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
234         if (pnn == -1) {
235                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
236                 return -1;
237         }
238
239         ctdb->pnn = pnn;
240         /* the list needs to be of length num_nodes */
241         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
242         if (vdata->list == NULL) {
243                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
244                 return -1;
245         }
246         for (i = 0; i < ctdb->vnn_map->size; i++) {
247                 vdata->list[i] = (struct ctdb_marshall_buffer *)
248                         talloc_zero_size(vdata->list, 
249                                                          offsetof(struct ctdb_marshall_buffer, data));
250                 if (vdata->list[i] == NULL) {
251                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
252                         return -1;
253                 }
254                 vdata->list[i]->db_id = ctdb_db->db_id;
255         }
256
257         /* read-only traverse, looking for records that might be able to be vacuumed */
258         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
259             vdata->traverse_error) {
260                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
261                 return -1;              
262         }
263
264         for ( i = 0; i < ctdb->vnn_map->size; i++) {
265                 if (vdata->list[i]->count == 0) {
266                         continue;
267                 }
268
269                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
270                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
271                         TDB_DATA data;
272                         DEBUG(DEBUG_INFO,("Found %u records for lmaster %u in '%s'\n", 
273                                                                 vdata->list[i]->count, i, name));
274
275                         data.dsize = talloc_get_size(vdata->list[i]);
276                         data.dptr  = (void *)vdata->list[i];
277                         if (ctdb_client_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
278                                 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
279                                          ctdb->vnn_map->map[i]));
280                                 return -1;              
281                         }
282                         continue;
283                 }
284         }       
285
286         /* Process all records we can delete (if any) */
287         if (vdata->delete_count > 0) {
288                 struct delete_records_list *recs;
289                 TDB_DATA indata, outdata;
290                 int32_t res;
291
292                 recs = talloc_zero(vdata, struct delete_records_list);
293                 if (recs == NULL) {
294                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
295                         return -1;
296                 }
297                 recs->records = (struct ctdb_marshall_buffer *)
298                         talloc_zero_size(vdata, 
299                                     offsetof(struct ctdb_marshall_buffer, data));
300                 if (recs->records == NULL) {
301                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
302                         return -1;
303                 }
304                 recs->records->db_id = ctdb_db->db_id;
305
306                 /* 
307                  * traverse the tree of all records we want to delete and
308                  * create a blob we can send to the other nodes.
309                  */
310                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
311
312                 indata.dsize = talloc_get_size(recs->records);
313                 indata.dptr  = (void *)recs->records;
314
315                 /* 
316                  * now tell all the other nodes to delete all these records
317                  * (if possible)
318                  */
319                 for (i = 0; i < ctdb->vnn_map->size; i++) {
320                         struct ctdb_marshall_buffer *records;
321                         struct ctdb_rec_data *rec;
322
323                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
324                                 /* we dont delete the records on the local node just yet */
325                                 continue;
326                         }
327
328                         ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
329                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
330                                         indata, recs, &outdata, &res,
331                                         NULL, NULL);
332                         if (ret != 0 || res != 0) {
333                                 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
334                                 return -1;
335                         }
336
337                         /* 
338                          * outdata countains the list of records coming back
339                          * from the node which the node could not delete
340                          */
341                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
342                         rec = (struct ctdb_rec_data *)&records->data[0];
343                         while (records->count-- > 1) {
344                                 TDB_DATA reckey, recdata;
345                                 struct ctdb_ltdb_header *rechdr;
346
347                                 reckey.dptr = &rec->data[0];
348                                 reckey.dsize = rec->keylen;
349                                 recdata.dptr = &rec->data[reckey.dsize];
350                                 recdata.dsize = rec->datalen;
351
352                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
353                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
354                                         return -1;
355                                 }
356                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
357                                 recdata.dptr += sizeof(*rechdr);
358                                 recdata.dsize -= sizeof(*rechdr);
359
360                                 /* 
361                                  * that other node couldnt delete the record
362                                  * so we should delete it and thereby remove it from the tree
363                                  */
364                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
365
366                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
367                         }           
368                 }
369
370                 /* 
371                  * The only records remaining in the tree would be those
372                  * records where all other nodes could successfully
373                  * delete them, so we can safely delete them on the
374                  * lmaster as well. Deletion implictely happens while
375                  * we repack the database. The repack algorithm revisits 
376                  * the tree in order to find the records that don't need
377                  * to be copied / repacked.
378                  */
379         }
380
381         /* this ensures we run our event queue */
382         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
383
384         return 0;
385 }
386
387
388 /*
389  * traverse function for repacking
390  */
391 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
392 {
393         struct vacuum_data *vdata = (struct vacuum_data *)private;
394
395         if (vdata->vacuum) {
396                 uint32_t hash = ctdb_hash(&key);
397                 struct delete_record_data *kd;
398                 /*
399                  * check if we can ignore this record because it's in the delete_tree
400                  */
401                 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
402                 /*
403                  * there might be hash collisions so we have to compare the keys here to be sure
404                  */
405                 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
406                         struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
407                         /*
408                          * we have to check if the record hasn't changed in the meantime in order to
409                          * savely remove it from the database
410                          */
411                         if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
412                                 hdr->dmaster == kd->ctdb->pnn &&
413                                 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
414                                 kd->hdr.rsn == hdr->rsn) {
415                                 vdata->vacuumed++;
416                                 return 0;
417                         }
418                 }
419         }
420         if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
421                 vdata->traverse_error = true;
422                 return -1;
423         }
424         vdata->copied++;
425         return 0;
426 }
427
428 /*
429  * repack a tdb
430  */
431 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
432 {
433         struct tdb_context *tmp_db;
434
435         if (tdb_transaction_start(tdb) != 0) {
436                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
437                 return -1;
438         }
439
440         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
441                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
442                           O_RDWR|O_CREAT, 0);
443         if (tmp_db == NULL) {
444                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
445                 tdb_transaction_cancel(tdb);
446                 return -1;
447         }
448
449         vdata->traverse_error = false;
450         vdata->dest_db = tmp_db;
451         vdata->vacuum = true;
452         vdata->vacuumed = 0;
453         vdata->copied = 0;
454
455         /*
456          * repack and vacuum on-the-fly by not writing the records that are
457          * no longer needed
458          */
459         if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
460                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
461                 tdb_transaction_cancel(tdb);
462                 tdb_close(tmp_db);
463                 return -1;              
464         }
465
466         DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
467         
468         if (vdata->traverse_error) {
469                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
470                 tdb_transaction_cancel(tdb);
471                 tdb_close(tmp_db);
472                 return -1;
473         }
474
475         if (tdb_wipe_all(tdb) != 0) {
476                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
477                 tdb_transaction_cancel(tdb);
478                 tdb_close(tmp_db);
479                 return -1;
480         }
481
482         vdata->traverse_error = false;
483         vdata->dest_db = tdb;
484         vdata->vacuum = false;
485         vdata->copied = 0;
486
487         if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
488                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
489                 tdb_transaction_cancel(tdb);
490                 tdb_close(tmp_db);
491                 return -1;              
492         }
493
494         if (vdata->traverse_error) {
495                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
496                 tdb_transaction_cancel(tdb);
497                 tdb_close(tmp_db);
498                 return -1;
499         }
500
501         tdb_close(tmp_db);
502
503
504         if (tdb_transaction_commit(tdb) != 0) {
505                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
506                 return -1;
507         }
508         DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
509
510         return 0;
511 }
512
513 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
514 {
515         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
516         TDB_CONTEXT *tune_tdb;
517         TDB_DATA key, value;
518         struct vacuum_tuning_data tdata;
519         struct vacuum_tuning_data *tptr;
520         char *vac_dbname;
521         int flags;
522
523         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
524                                      ctdb_db->ctdb->db_directory_state,
525                                      TUNINGDBNAME, ctdb_db->ctdb->pnn);
526         if (vac_dbname == NULL) {
527                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
528                 talloc_free(tmp_ctx);
529                 return -1;
530         }
531
532         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
533         flags |= TDB_DISALLOW_NESTING;
534         tune_tdb = tdb_open(vac_dbname, 0,
535                             flags,
536                             O_RDWR|O_CREAT, 0600);
537         if (tune_tdb == NULL) {
538                 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542         
543         if (tdb_transaction_start(tune_tdb) != 0) {
544                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
545                 tdb_close(tune_tdb);
546                 return -1;
547         }
548         key.dptr = discard_const(ctdb_db->db_name);
549         key.dsize = strlen(ctdb_db->db_name);
550         value = tdb_fetch(tune_tdb, key);
551
552         if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
553                 tptr = (struct vacuum_tuning_data *)value.dptr;
554                 tdata = *tptr;
555
556                 /*
557                  * re-calc new vacuum interval:
558                  * in case no limit was reached we continously increase the interval
559                  * until vacuum_max_interval is reached
560                  * in case a limit was reached we divide the current interval by 2
561                  * unless vacuum_min_interval is reached
562                  */
563                 if (freelist < vdata->repack_limit &&
564                     vdata->delete_count < vdata->vacuum_limit) {
565                         if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
566                                 tdata.new_interval = tdata.last_interval * 110 / 100;
567                                 DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
568                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
569                         }
570                 } else {
571                         tdata.new_interval = tdata.last_interval / 2;
572                         if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
573                                 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
574                                 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
575                         }               
576                         DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
577                                          tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
578                 }
579                 tdata.last_interval = tdata.new_interval;
580         } else {
581                 DEBUG(DEBUG_ERR,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
582                 tdata.last_num_repack = freelist;
583                 tdata.last_num_empty = vdata->delete_count;
584                 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
585         }
586
587         if (value.dptr != NULL) {
588                 free(value.dptr);
589         }
590
591         tdata.last_start = vdata->start;
592         tdata.last_duration = timeval_elapsed(&vdata->start);
593
594         value.dptr = (unsigned char *)&tdata;
595         value.dsize = sizeof(tdata);
596
597         if (tdb_store(tune_tdb, key, value, 0) != 0) {
598                 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
599                 tdb_transaction_cancel(tune_tdb);
600                 tdb_close(tune_tdb);
601                 talloc_free(tmp_ctx);
602                 return -1;
603         }
604         tdb_transaction_commit(tune_tdb);
605         tdb_close(tune_tdb);
606         talloc_free(tmp_ctx);
607
608         return 0;
609 }
610
611 /*
612  * repack and vaccum a db
613  * called from the child context
614  */
615 static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
616 {
617         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
618         uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
619         const char *name = ctdb_db->db_name;
620         int size;
621         struct vacuum_data *vdata;
622
623         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
624         if (size == -1) {
625                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
626                 return -1;
627         }
628
629         vdata = talloc_zero(mem_ctx, struct vacuum_data);
630         if (vdata == NULL) {
631                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
632                 return -1;
633         }
634
635         vdata->ctdb = ctdb_db->ctdb;
636         vdata->vacuum_limit = vacuum_limit;
637         vdata->repack_limit = repack_limit;
638         vdata->delete_tree = trbt_create(vdata, 0);
639         vdata->ctdb_db = ctdb_db;
640         if (vdata->delete_tree == NULL) {
641                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
642                 talloc_free(vdata);
643                 return -1;
644         }
645
646         vdata->start = timeval_current();
647  
648         /*
649          * gather all records that can be deleted in vdata
650          */
651         if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
652                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
653         }
654
655         /*
656          * decide if a repack is necessary
657          */
658         if (size < repack_limit && vdata->delete_count < vacuum_limit) {
659                 update_tuning_db(ctdb_db, vdata, size);
660                 talloc_free(vdata);
661                 return 0;
662         }
663
664         DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
665                         name, size, vdata->delete_count));
666
667         /*
668          * repack and implicitely get rid of the records we can delete
669          */
670         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
671                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
672                 update_tuning_db(ctdb_db, vdata, size);
673                 talloc_free(vdata);
674                 return -1;
675         }
676         update_tuning_db(ctdb_db, vdata, size);
677         talloc_free(vdata);
678
679         return 0;
680 }
681
682 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
683 {
684         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
685         TDB_CONTEXT *tdb;
686         TDB_DATA key, value;
687         char *vac_dbname;
688         uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
689         struct ctdb_context *ctdb = ctdb_db->ctdb;
690         int flags;
691
692         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
693         if (vac_dbname == NULL) {
694                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
695                 talloc_free(tmp_ctx);
696                 return interval;
697         }
698
699         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
700         flags |= TDB_DISALLOW_NESTING;
701         tdb = tdb_open(vac_dbname, 0,
702                        flags,
703                        O_RDWR|O_CREAT, 0600);
704         if (!tdb) {
705                 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval\n", vac_dbname));
706                 talloc_free(tmp_ctx);
707                 return interval;
708         }
709
710         key.dptr = discard_const(ctdb_db->db_name);
711         key.dsize = strlen(ctdb_db->db_name);
712
713         value = tdb_fetch(tdb, key);
714
715         if (value.dptr != NULL) {
716                 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
717                         struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
718
719                         interval = tptr->new_interval;
720
721                         if (interval < ctdb->tunable.vacuum_min_interval) {
722                                 interval = ctdb->tunable.vacuum_min_interval;
723                         } 
724                         if (interval > ctdb->tunable.vacuum_max_interval) {
725                                 interval = ctdb->tunable.vacuum_max_interval;
726                         }
727                 }
728                 free(value.dptr);
729         }
730         tdb_close(tdb);
731
732         talloc_free(tmp_ctx);
733
734         return interval;
735 }
736
737 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
738 {
739         double l = timeval_elapsed(&child_ctx->start_time);
740         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
741         struct ctdb_context *ctdb = ctdb_db->ctdb;
742
743         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
744
745         if (child_ctx->child_pid != -1) {
746                 kill(child_ctx->child_pid, SIGKILL);
747         }
748
749         DLIST_REMOVE(ctdb->vacuumers, child_ctx);
750
751         event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
752                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
753                         ctdb_vacuum_event, child_ctx->vacuum_handle);
754
755         return 0;
756 }
757
758 /*
759  * this event is generated when a vacuum child process times out
760  */
761 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
762                                          struct timeval t, void *private_data)
763 {
764         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
765
766         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
767
768         child_ctx->status = VACUUM_TIMEOUT;
769
770         talloc_free(child_ctx);
771 }
772
773
774 /*
775  * this event is generated when a vacuum child process has completed
776  */
777 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
778                              uint16_t flags, void *private_data)
779 {
780         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
781         char c = 0;
782         int ret;
783
784         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
785         child_ctx->child_pid = -1;
786
787         ret = read(child_ctx->fd[0], &c, 1);
788         if (ret != 1 || c != 0) {
789                 child_ctx->status = VACUUM_ERROR;
790                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
791         } else {
792                 child_ctx->status = VACUUM_OK;
793         }
794
795         talloc_free(child_ctx);
796 }
797
798 /*
799  * this event is called every time we need to start a new vacuum process
800  */
801 static void
802 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
803                                struct timeval t, void *private_data)
804 {
805         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
806         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
807         struct ctdb_context *ctdb = ctdb_db->ctdb;
808         struct ctdb_vacuum_child_context *child_ctx;
809         struct tevent_fd *fde;
810         int ret;
811
812         /* we dont vacuum if we are in recovery mode, or db frozen */
813         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
814             ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
815                 DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
816                                    ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
817                                    : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
818                                    ? "freeze pending"
819                                    : "frozen"));
820                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
821                 return;
822         }
823
824         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
825         if (child_ctx == NULL) {
826                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
827                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
828         }
829
830
831         ret = pipe(child_ctx->fd);
832         if (ret != 0) {
833                 talloc_free(child_ctx);
834                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
835                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
836                 return;
837         }
838
839         child_ctx->child_pid = fork();
840         if (child_ctx->child_pid == (pid_t)-1) {
841                 close(child_ctx->fd[0]);
842                 close(child_ctx->fd[1]);
843                 talloc_free(child_ctx);
844                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
845                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
846                 return;
847         }
848
849
850         if (child_ctx->child_pid == 0) {
851                 char cc = 0;
852                 close(child_ctx->fd[0]);
853
854                 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
855         
856                 if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
857                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
858                         _exit(1);
859                 }
860
861                 /* 
862                  * repack the db
863                  */
864                 cc = ctdb_repack_db(ctdb_db, child_ctx);
865
866                 write(child_ctx->fd[1], &cc, 1);
867                 _exit(0);
868         }
869
870         set_close_on_exec(child_ctx->fd[0]);
871         close(child_ctx->fd[1]);
872
873         child_ctx->status = VACUUM_RUNNING;
874         child_ctx->start_time = timeval_current();
875
876         DLIST_ADD(ctdb->vacuumers, child_ctx);
877         talloc_set_destructor(child_ctx, vacuum_child_destructor);
878
879         event_add_timed(ctdb->ev, child_ctx,
880                 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
881                 vacuum_child_timeout, child_ctx);
882
883         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
884
885         fde = event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
886                            EVENT_FD_READ, vacuum_child_handler, child_ctx);
887         tevent_fd_set_auto_close(fde);
888
889         vacuum_handle->child_ctx = child_ctx;
890         child_ctx->vacuum_handle = vacuum_handle;
891 }
892
893 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
894 {
895         /* Simply free them all. */
896         while (ctdb->vacuumers) {
897                 DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
898                            ctdb->vacuumers->vacuum_handle->ctdb_db->db_name,
899                            (int)ctdb->vacuumers->child_pid));
900                 /* vacuum_child_destructor kills it, removes from list */
901                 talloc_free(ctdb->vacuumers);
902         }
903 }
904
905 /* this function initializes the vacuuming context for a database
906  * starts the vacuuming events
907  */
908 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
909 {
910         if (ctdb_db->persistent != 0) {
911                 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
912                 return 0;
913         }
914
915         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
916         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
917
918         ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
919
920         event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
921                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
922                         ctdb_vacuum_event, ctdb_db->vacuum_handle);
923
924         return 0;
925 }