ctdb: Remove an unnecessary cast
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5    Copyright (C) Michael Adam 2010-2013
6    Copyright (C) Stefan Metzmacher 2010-2011
7
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "replace.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/time.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/util/sys_rw.h"
35 #include "lib/util/util_process.h"
36
37 #include "ctdb_private.h"
38 #include "ctdb_client.h"
39
40 #include "protocol/protocol_private.h"
41
42 #include "common/rb_tree.h"
43 #include "common/common.h"
44 #include "common/logging.h"
45
46 #include "protocol/protocol_api.h"
47
48 #define TIMELIMIT() timeval_current_ofs(10, 0)
49
50 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
51
52 struct ctdb_vacuum_child_context {
53         struct ctdb_vacuum_handle *vacuum_handle;
54         /* fd child writes status to */
55         int fd[2];
56         pid_t child_pid;
57         enum vacuum_child_status status;
58         struct timeval start_time;
59         bool scheduled;
60 };
61
62 struct ctdb_vacuum_handle {
63         struct ctdb_db_context *ctdb_db;
64         uint32_t fast_path_count;
65         uint32_t vacuum_interval;
66 };
67
68
69 /*  a list of records to possibly delete */
70 struct vacuum_data {
71         struct ctdb_context *ctdb;
72         struct ctdb_db_context *ctdb_db;
73         struct tdb_context *dest_db;
74         trbt_tree_t *delete_list;
75         struct ctdb_marshall_buffer **vacuum_fetch_list;
76         struct timeval start;
77         bool traverse_error;
78         bool vacuum;
79         struct {
80                 struct {
81                         uint32_t added_to_vacuum_fetch_list;
82                         uint32_t added_to_delete_list;
83                         uint32_t deleted;
84                         uint32_t skipped;
85                         uint32_t error;
86                         uint32_t total;
87                 } delete_queue;
88                 struct {
89                         uint32_t scheduled;
90                         uint32_t skipped;
91                         uint32_t error;
92                         uint32_t total;
93                 } db_traverse;
94                 struct {
95                         uint32_t total;
96                         uint32_t remote_error;
97                         uint32_t local_error;
98                         uint32_t deleted;
99                         uint32_t skipped;
100                         uint32_t left;
101                 } delete_list;
102                 struct {
103                         uint32_t vacuumed;
104                         uint32_t copied;
105                 } repack;
106         } count;
107 };
108
109 /* this structure contains the information for one record to be deleted */
110 struct delete_record_data {
111         struct ctdb_context *ctdb;
112         struct ctdb_db_context *ctdb_db;
113         struct ctdb_ltdb_header hdr;
114         uint32_t remote_fail_count;
115         TDB_DATA key;
116         uint8_t keydata[1];
117 };
118
119 struct delete_records_list {
120         struct ctdb_marshall_buffer *records;
121         struct vacuum_data *vdata;
122 };
123
124 struct fetch_record_data {
125         TDB_DATA key;
126         uint8_t keydata[1];
127 };
128
129 static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
130                                            const struct ctdb_ltdb_header *hdr,
131                                            TDB_DATA key);
132
133 /**
134  * Store key and header in a tree, indexed by the key hash.
135  */
136 static int insert_delete_record_data_into_tree(struct ctdb_context *ctdb,
137                                                struct ctdb_db_context *ctdb_db,
138                                                trbt_tree_t *tree,
139                                                const struct ctdb_ltdb_header *hdr,
140                                                TDB_DATA key)
141 {
142         struct delete_record_data *dd;
143         uint32_t hash;
144         size_t len;
145
146         len = offsetof(struct delete_record_data, keydata) + key.dsize;
147
148         dd = (struct delete_record_data *)talloc_size(tree, len);
149         if (dd == NULL) {
150                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
151                 return -1;
152         }
153         talloc_set_name_const(dd, "struct delete_record_data");
154
155         dd->ctdb      = ctdb;
156         dd->ctdb_db   = ctdb_db;
157         dd->key.dsize = key.dsize;
158         dd->key.dptr  = dd->keydata;
159         memcpy(dd->keydata, key.dptr, key.dsize);
160
161         dd->hdr = *hdr;
162         dd->remote_fail_count = 0;
163
164         hash = ctdb_hash(&key);
165
166         trbt_insert32(tree, hash, dd);
167
168         return 0;
169 }
170
171 static int add_record_to_delete_list(struct vacuum_data *vdata, TDB_DATA key,
172                                      struct ctdb_ltdb_header *hdr)
173 {
174         struct ctdb_context *ctdb = vdata->ctdb;
175         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
176         uint32_t hash;
177         int ret;
178
179         hash = ctdb_hash(&key);
180
181         if (trbt_lookup32(vdata->delete_list, hash)) {
182                 DEBUG(DEBUG_INFO, (__location__ " Hash collision when vacuuming, skipping this record.\n"));
183                 return 0;
184         }
185
186         ret = insert_delete_record_data_into_tree(ctdb, ctdb_db,
187                                                   vdata->delete_list,
188                                                   hdr, key);
189         if (ret != 0) {
190                 return -1;
191         }
192
193         vdata->count.delete_list.total++;
194
195         return 0;
196 }
197
198 /**
199  * Add a record to the list of records to be sent
200  * to their lmaster with VACUUM_FETCH.
201  */
202 static int add_record_to_vacuum_fetch_list(struct vacuum_data *vdata,
203                                            TDB_DATA key)
204 {
205         struct ctdb_context *ctdb = vdata->ctdb;
206         uint32_t lmaster;
207         struct ctdb_marshall_buffer *vfl;
208
209         lmaster = ctdb_lmaster(ctdb, &key);
210
211         vfl = vdata->vacuum_fetch_list[lmaster];
212
213         vfl = ctdb_marshall_add(ctdb, vfl, vfl->db_id, ctdb->pnn,
214                                 key, NULL, tdb_null);
215         if (vfl == NULL) {
216                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
217                 vdata->traverse_error = true;
218                 return -1;
219         }
220
221         vdata->vacuum_fetch_list[lmaster] = vfl;
222
223         return 0;
224 }
225
226
227 static void ctdb_vacuum_event(struct tevent_context *ev,
228                               struct tevent_timer *te,
229                               struct timeval t, void *private_data);
230
231 static int vacuum_record_parser(TDB_DATA key, TDB_DATA data, void *private_data)
232 {
233         struct ctdb_ltdb_header *header =
234                 (struct ctdb_ltdb_header *)private_data;
235
236         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
237                 return -1;
238         }
239
240         *header = *(struct ctdb_ltdb_header *)data.dptr;
241
242         return 0;
243 }
244
245 /*
246  * traverse function for gathering the records that can be deleted
247  */
248 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
249                            void *private_data)
250 {
251         struct vacuum_data *vdata = talloc_get_type(private_data,
252                                                     struct vacuum_data);
253         struct ctdb_context *ctdb = vdata->ctdb;
254         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
255         uint32_t lmaster;
256         struct ctdb_ltdb_header *hdr;
257         int res = 0;
258
259         vdata->count.db_traverse.total++;
260
261         lmaster = ctdb_lmaster(ctdb, &key);
262         if (lmaster >= ctdb->num_nodes) {
263                 vdata->count.db_traverse.error++;
264                 DEBUG(DEBUG_CRIT, (__location__
265                                    " lmaster[%u] >= ctdb->num_nodes[%u] for key"
266                                    " with hash[%u]!\n",
267                                    (unsigned)lmaster,
268                                    (unsigned)ctdb->num_nodes,
269                                    (unsigned)ctdb_hash(&key)));
270                 return -1;
271         }
272
273         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
274                 /* it is not a deleted record */
275                 vdata->count.db_traverse.skipped++;
276                 return 0;
277         }
278
279         hdr = (struct ctdb_ltdb_header *)data.dptr;
280
281         if (hdr->dmaster != ctdb->pnn) {
282                 vdata->count.db_traverse.skipped++;
283                 return 0;
284         }
285
286         /*
287          * Add the record to this process's delete_queue for processing
288          * in the subsequent traverse in the fast vacuum run.
289          */
290         res = insert_record_into_delete_queue(ctdb_db, hdr, key);
291         if (res != 0) {
292                 vdata->count.db_traverse.error++;
293         } else {
294                 vdata->count.db_traverse.scheduled++;
295         }
296
297         return 0;
298 }
299
300 /*
301  * traverse the tree of records to delete and marshall them into
302  * a blob
303  */
304 static int delete_marshall_traverse(void *param, void *data)
305 {
306         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
307         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
308         struct ctdb_marshall_buffer *m;
309
310         m = ctdb_marshall_add(recs, recs->records, recs->records->db_id,
311                               recs->records->db_id,
312                               dd->key, &dd->hdr, tdb_null);
313         if (m == NULL) {
314                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
315                 return -1;
316         }
317
318         recs->records = m;
319         return 0;
320 }
321
322 struct fetch_queue_state {
323         struct ctdb_db_context *ctdb_db;
324         int count;
325 };
326
327 struct fetch_record_migrate_state {
328         struct fetch_queue_state *fetch_queue;
329         TDB_DATA key;
330 };
331
332 static void fetch_record_migrate_callback(struct ctdb_client_call_state *state)
333 {
334         struct fetch_record_migrate_state *fetch = talloc_get_type_abort(
335                 state->async.private_data, struct fetch_record_migrate_state);
336         struct fetch_queue_state *fetch_queue = fetch->fetch_queue;
337         struct ctdb_ltdb_header hdr;
338         struct ctdb_call call = { 0 };
339         int ret;
340
341         ret = ctdb_call_recv(state, &call);
342         fetch_queue->count--;
343         if (ret != 0) {
344                 D_ERR("Failed to migrate record for vacuuming\n");
345                 goto done;
346         }
347
348         ret = tdb_chainlock_nonblock(fetch_queue->ctdb_db->ltdb->tdb,
349                                      fetch->key);
350         if (ret != 0) {
351                 goto done;
352         }
353
354         ret = tdb_parse_record(fetch_queue->ctdb_db->ltdb->tdb,
355                                fetch->key,
356                                vacuum_record_parser,
357                                &hdr);
358
359         tdb_chainunlock(fetch_queue->ctdb_db->ltdb->tdb, fetch->key);
360
361         if (ret != 0) {
362                 goto done;
363         }
364
365         D_INFO("Vacuum Fetch record, key=%.*s\n",
366                (int)fetch->key.dsize,
367                fetch->key.dptr);
368
369         (void) ctdb_local_schedule_for_deletion(fetch_queue->ctdb_db,
370                                                 &hdr,
371                                                 fetch->key);
372
373 done:
374         talloc_free(fetch);
375 }
376
377 static int fetch_record_parser(TDB_DATA key, TDB_DATA data, void *private_data)
378 {
379         struct ctdb_ltdb_header *header =
380                 (struct ctdb_ltdb_header *)private_data;
381
382         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
383                 return -1;
384         }
385
386         memcpy(header, data.dptr, sizeof(*header));
387         return 0;
388 }
389
390 /**
391  * traverse function for the traversal of the fetch_queue.
392  *
393  * Send a record migration request.
394  */
395 static int fetch_queue_traverse(void *param, void *data)
396 {
397         struct fetch_record_data *rd = talloc_get_type_abort(
398                 data, struct fetch_record_data);
399         struct fetch_queue_state *fetch_queue =
400                 (struct fetch_queue_state *)param;
401         struct ctdb_db_context *ctdb_db = fetch_queue->ctdb_db;
402         struct ctdb_client_call_state *state;
403         struct fetch_record_migrate_state *fetch;
404         struct ctdb_call call = { 0 };
405         struct ctdb_ltdb_header header;
406         int ret;
407
408         ret = tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, rd->key);
409         if (ret != 0) {
410                 return 0;
411         }
412
413         ret = tdb_parse_record(ctdb_db->ltdb->tdb,
414                                rd->key,
415                                fetch_record_parser,
416                                &header);
417
418         tdb_chainunlock(ctdb_db->ltdb->tdb, rd->key);
419
420         if (ret != 0) {
421                 goto skipped;
422         }
423
424         if (header.dmaster == ctdb_db->ctdb->pnn) {
425                 /* If the record is already migrated, skip */
426                 goto skipped;
427         }
428
429         fetch = talloc_zero(ctdb_db, struct fetch_record_migrate_state);
430         if (fetch == NULL) {
431                 D_ERR("Failed to setup fetch record migrate state\n");
432                 return 0;
433         }
434
435         fetch->fetch_queue = fetch_queue;
436
437         fetch->key.dsize = rd->key.dsize;
438         fetch->key.dptr = talloc_memdup(fetch, rd->key.dptr, rd->key.dsize);
439         if (fetch->key.dptr == NULL) {
440                 D_ERR("Memory error in fetch_queue_traverse\n");
441                 talloc_free(fetch);
442                 return 0;
443         }
444
445         call.call_id = CTDB_NULL_FUNC;
446         call.flags = CTDB_IMMEDIATE_MIGRATION |
447                      CTDB_CALL_FLAG_VACUUM_MIGRATION;
448         call.key = fetch->key;
449
450         state = ctdb_call_send(ctdb_db, &call);
451         if (state == NULL) {
452                 DEBUG(DEBUG_ERR, ("Failed to setup vacuum fetch call\n"));
453                 talloc_free(fetch);
454                 return 0;
455         }
456
457         state->async.fn = fetch_record_migrate_callback;
458         state->async.private_data = fetch;
459
460         fetch_queue->count++;
461
462         return 0;
463
464 skipped:
465         D_INFO("Skipped Fetch record, key=%.*s\n",
466                (int)rd->key.dsize,
467                rd->key.dptr);
468         return 0;
469 }
470
471 /**
472  * Traverse the fetch.
473  * Records are migrated to the local node and
474  * added to delete queue for further processing.
475  */
476 static void ctdb_process_fetch_queue(struct ctdb_db_context *ctdb_db)
477 {
478         struct fetch_queue_state state;
479         int ret;
480
481         state.ctdb_db = ctdb_db;
482         state.count = 0;
483
484         ret = trbt_traversearray32(ctdb_db->fetch_queue, 1,
485                                    fetch_queue_traverse, &state);
486         if (ret != 0) {
487                 DEBUG(DEBUG_ERR, (__location__ " Error traversing "
488                       "the fetch queue.\n"));
489         }
490
491         /* Wait for all migrations to complete */
492         while (state.count > 0) {
493                 tevent_loop_once(ctdb_db->ctdb->ev);
494         }
495 }
496
497 /**
498  * traverse function for the traversal of the delete_queue,
499  * the fast-path vacuuming list.
500  *
501  *  - If the record has been migrated off the node
502  *    or has been revived (filled with data) on the node,
503  *    then skip the record.
504  *
505  *  - If the current node is the record's lmaster and it is
506  *    a record that has never been migrated with data, then
507  *    delete the record from the local tdb.
508  *
509  *  - If the current node is the record's lmaster and it has
510  *    been migrated with data, then schedule it for the normal
511  *    vacuuming procedure (i.e. add it to the delete_list).
512  *
513  *  - If the current node is NOT the record's lmaster then
514  *    add it to the list of records that are to be sent to
515  *    the lmaster with the VACUUM_FETCH message.
516  */
517 static int delete_queue_traverse(void *param, void *data)
518 {
519         struct delete_record_data *dd =
520                 talloc_get_type(data, struct delete_record_data);
521         struct vacuum_data *vdata = talloc_get_type(param, struct vacuum_data);
522         struct ctdb_db_context *ctdb_db = dd->ctdb_db;
523         struct ctdb_context *ctdb = ctdb_db->ctdb; /* or dd->ctdb ??? */
524         int res;
525         struct ctdb_ltdb_header header;
526         uint32_t lmaster;
527         uint32_t hash = ctdb_hash(&(dd->key));
528
529         vdata->count.delete_queue.total++;
530
531         res = tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key);
532         if (res != 0) {
533                 vdata->count.delete_queue.error++;
534                 return 0;
535         }
536
537         res = tdb_parse_record(ctdb_db->ltdb->tdb, dd->key,
538                                vacuum_record_parser, &header);
539         if (res != 0) {
540                 goto skipped;
541         }
542
543         if (header.dmaster != ctdb->pnn) {
544                 /* The record has been migrated off the node. Skip. */
545                 goto skipped;
546         }
547
548         if (header.rsn != dd->hdr.rsn) {
549                 /*
550                  * The record has been migrated off the node and back again.
551                  * But not requeued for deletion. Skip it.
552                  */
553                 goto skipped;
554         }
555
556         /*
557          * We are dmaster, and the record has no data, and it has
558          * not been migrated after it has been queued for deletion.
559          *
560          * At this stage, the record could still have been revived locally
561          * and last been written with empty data. This can only be
562          * fixed with the addition of an active or delete flag. (TODO)
563          */
564
565         lmaster = ctdb_lmaster(ctdb_db->ctdb, &dd->key);
566
567         if (lmaster != ctdb->pnn) {
568                 res = add_record_to_vacuum_fetch_list(vdata, dd->key);
569
570                 if (res != 0) {
571                         DEBUG(DEBUG_ERR,
572                               (__location__ " Error adding record to list "
573                                "of records to send to lmaster.\n"));
574                         vdata->count.delete_queue.error++;
575                 } else {
576                         vdata->count.delete_queue.added_to_vacuum_fetch_list++;
577                 }
578                 goto done;
579         }
580
581         /* use header->flags or dd->hdr.flags ?? */
582         if (dd->hdr.flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
583                 res = add_record_to_delete_list(vdata, dd->key, &dd->hdr);
584
585                 if (res != 0) {
586                         DEBUG(DEBUG_ERR,
587                               (__location__ " Error adding record to list "
588                                "of records for deletion on lmaster.\n"));
589                         vdata->count.delete_queue.error++;
590                 } else {
591                         vdata->count.delete_queue.added_to_delete_list++;
592                 }
593         } else {
594                 res = tdb_delete(ctdb_db->ltdb->tdb, dd->key);
595
596                 if (res != 0) {
597                         DEBUG(DEBUG_ERR,
598                               (__location__ " Error deleting record with key "
599                                "hash [0x%08x] from local data base db[%s].\n",
600                                hash, ctdb_db->db_name));
601                         vdata->count.delete_queue.error++;
602                         goto done;
603                 }
604
605                 DEBUG(DEBUG_DEBUG,
606                       (__location__ " Deleted record with key hash "
607                        "[0x%08x] from local data base db[%s].\n",
608                        hash, ctdb_db->db_name));
609                 vdata->count.delete_queue.deleted++;
610         }
611
612         goto done;
613
614 skipped:
615         vdata->count.delete_queue.skipped++;
616
617 done:
618         tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
619
620         return 0;
621 }
622
623 /**
624  * Delete the records that we are lmaster and dmaster for and
625  * that could be deleted on all other nodes via the TRY_DELETE_RECORDS
626  * control.
627  */
628 static int delete_record_traverse(void *param, void *data)
629 {
630         struct delete_record_data *dd =
631                 talloc_get_type(data, struct delete_record_data);
632         struct vacuum_data *vdata = talloc_get_type(param, struct vacuum_data);
633         struct ctdb_db_context *ctdb_db = dd->ctdb_db;
634         struct ctdb_context *ctdb = ctdb_db->ctdb;
635         int res;
636         struct ctdb_ltdb_header header;
637         uint32_t lmaster;
638         uint32_t hash = ctdb_hash(&(dd->key));
639
640         if (dd->remote_fail_count > 0) {
641                 vdata->count.delete_list.remote_error++;
642                 vdata->count.delete_list.left--;
643                 talloc_free(dd);
644                 return 0;
645         }
646
647         res = tdb_chainlock(ctdb_db->ltdb->tdb, dd->key);
648         if (res != 0) {
649                 DEBUG(DEBUG_ERR,
650                       (__location__ " Error getting chainlock on record with "
651                        "key hash [0x%08x] on database db[%s].\n",
652                        hash, ctdb_db->db_name));
653                 vdata->count.delete_list.local_error++;
654                 vdata->count.delete_list.left--;
655                 talloc_free(dd);
656                 return 0;
657         }
658
659         /*
660          * Verify that the record is still empty, its RSN has not
661          * changed and that we are still its lmaster and dmaster.
662          */
663
664         res = tdb_parse_record(ctdb_db->ltdb->tdb, dd->key,
665                                vacuum_record_parser, &header);
666         if (res != 0) {
667                 goto skip;
668         }
669
670         if (header.flags & CTDB_REC_RO_FLAGS) {
671                 DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
672                                    "on database db[%s] has read-only flags. "
673                                    "skipping.\n",
674                                    hash, ctdb_db->db_name));
675                 goto skip;
676         }
677
678         if (header.dmaster != ctdb->pnn) {
679                 DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
680                                    "on database db[%s] has been migrated away. "
681                                    "skipping.\n",
682                                    hash, ctdb_db->db_name));
683                 goto skip;
684         }
685
686         if (header.rsn != dd->hdr.rsn) {
687                 /*
688                  * The record has been migrated off the node and back again.
689                  * But not requeued for deletion. Skip it.
690                  */
691                 DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
692                                    "on database db[%s] seems to have been "
693                                    "migrated away and back again (with empty "
694                                    "data). skipping.\n",
695                                    hash, ctdb_db->db_name));
696                 goto skip;
697         }
698
699         lmaster = ctdb_lmaster(ctdb_db->ctdb, &dd->key);
700
701         if (lmaster != ctdb->pnn) {
702                 DEBUG(DEBUG_INFO, (__location__ ": not lmaster for record in "
703                                    "delete list (key hash [0x%08x], db[%s]). "
704                                    "Strange! skipping.\n",
705                                    hash, ctdb_db->db_name));
706                 goto skip;
707         }
708
709         res = tdb_delete(ctdb_db->ltdb->tdb, dd->key);
710
711         if (res != 0) {
712                 DEBUG(DEBUG_ERR,
713                       (__location__ " Error deleting record with key hash "
714                        "[0x%08x] from local data base db[%s].\n",
715                        hash, ctdb_db->db_name));
716                 vdata->count.delete_list.local_error++;
717                 goto done;
718         }
719
720         DEBUG(DEBUG_DEBUG,
721               (__location__ " Deleted record with key hash [0x%08x] from "
722                "local data base db[%s].\n", hash, ctdb_db->db_name));
723
724         vdata->count.delete_list.deleted++;
725         goto done;
726
727 skip:
728         vdata->count.delete_list.skipped++;
729
730 done:
731         tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
732
733         talloc_free(dd);
734         vdata->count.delete_list.left--;
735
736         return 0;
737 }
738
739 /**
740  * Traverse the delete_queue.
741  * Records are either deleted directly or filled
742  * into the delete list or the vacuum fetch lists
743  * for further processing.
744  */
745 static void ctdb_process_delete_queue(struct ctdb_db_context *ctdb_db,
746                                       struct vacuum_data *vdata)
747 {
748         uint32_t sum;
749         int ret;
750
751         ret = trbt_traversearray32(ctdb_db->delete_queue, 1,
752                                    delete_queue_traverse, vdata);
753
754         if (ret != 0) {
755                 DEBUG(DEBUG_ERR, (__location__ " Error traversing "
756                       "the delete queue.\n"));
757         }
758
759         sum = vdata->count.delete_queue.deleted
760             + vdata->count.delete_queue.skipped
761             + vdata->count.delete_queue.error
762             + vdata->count.delete_queue.added_to_delete_list
763             + vdata->count.delete_queue.added_to_vacuum_fetch_list;
764
765         if (vdata->count.delete_queue.total != sum) {
766                 DEBUG(DEBUG_ERR, (__location__ " Inconsistency in fast vacuum "
767                       "counts for db[%s]: total[%u] != sum[%u]\n",
768                       ctdb_db->db_name,
769                       (unsigned)vdata->count.delete_queue.total,
770                       (unsigned)sum));
771         }
772
773         if (vdata->count.delete_queue.total > 0) {
774                 DEBUG(DEBUG_INFO,
775                       (__location__
776                        " fast vacuuming delete_queue traverse statistics: "
777                        "db[%s] "
778                        "total[%u] "
779                        "del[%u] "
780                        "skp[%u] "
781                        "err[%u] "
782                        "adl[%u] "
783                        "avf[%u]\n",
784                        ctdb_db->db_name,
785                        (unsigned)vdata->count.delete_queue.total,
786                        (unsigned)vdata->count.delete_queue.deleted,
787                        (unsigned)vdata->count.delete_queue.skipped,
788                        (unsigned)vdata->count.delete_queue.error,
789                        (unsigned)vdata->count.delete_queue.added_to_delete_list,
790                        (unsigned)vdata->count.delete_queue.added_to_vacuum_fetch_list));
791         }
792
793         return;
794 }
795
796 /**
797  * read-only traverse of the database, looking for records that
798  * might be able to be vacuumed.
799  *
800  * This is not done each time but only every tunable
801  * VacuumFastPathCount times.
802  */
803 static void ctdb_vacuum_traverse_db(struct ctdb_db_context *ctdb_db,
804                                     struct vacuum_data *vdata)
805 {
806         int ret;
807
808         ret = tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata);
809         if (ret == -1 || vdata->traverse_error) {
810                 DEBUG(DEBUG_ERR, (__location__ " Traverse error in vacuuming "
811                                   "'%s'\n", ctdb_db->db_name));
812                 return;
813         }
814
815         if (vdata->count.db_traverse.total > 0) {
816                 DEBUG(DEBUG_INFO,
817                       (__location__
818                        " full vacuuming db traverse statistics: "
819                        "db[%s] "
820                        "total[%u] "
821                        "skp[%u] "
822                        "err[%u] "
823                        "sched[%u]\n",
824                        ctdb_db->db_name,
825                        (unsigned)vdata->count.db_traverse.total,
826                        (unsigned)vdata->count.db_traverse.skipped,
827                        (unsigned)vdata->count.db_traverse.error,
828                        (unsigned)vdata->count.db_traverse.scheduled));
829         }
830
831         return;
832 }
833
834 /**
835  * Process the vacuum fetch lists:
836  * For records for which we are not the lmaster, tell the lmaster to
837  * fetch the record.
838  */
839 static void ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
840                                             struct vacuum_data *vdata)
841 {
842         unsigned int i;
843         struct ctdb_context *ctdb = ctdb_db->ctdb;
844         int ret, res;
845
846         for (i = 0; i < ctdb->num_nodes; i++) {
847                 TDB_DATA data;
848                 struct ctdb_marshall_buffer *vfl = vdata->vacuum_fetch_list[i];
849
850                 if (ctdb->nodes[i]->pnn == ctdb->pnn) {
851                         continue;
852                 }
853
854                 if (vfl->count == 0) {
855                         continue;
856                 }
857
858                 DEBUG(DEBUG_INFO, ("Found %u records for lmaster %u in '%s'\n",
859                                    vfl->count, ctdb->nodes[i]->pnn,
860                                    ctdb_db->db_name));
861
862                 data = ctdb_marshall_finish(vfl);
863
864                 ret = ctdb_control(ctdb, ctdb->nodes[i]->pnn, 0,
865                                    CTDB_CONTROL_VACUUM_FETCH, 0,
866                                    data, NULL, NULL, &res, NULL, NULL);
867                 if (ret != 0 || res != 0) {
868                         DEBUG(DEBUG_ERR, ("Failed to send vacuum "
869                                           "fetch control to node %u\n",
870                                           ctdb->nodes[i]->pnn));
871                 }
872         }
873 }
874
875 /**
876  * Process the delete list:
877  *
878  * This is the last step of vacuuming that consistently deletes
879  * those records that have been migrated with data and can hence
880  * not be deleted when leaving a node.
881  *
882  * In this step, the lmaster does the final deletion of those empty
883  * records that it is also dmaster for. It has usually received
884  * at least some of these records previously from the former dmasters
885  * with the vacuum fetch message.
886  *
887  *  1) Send the records to all active nodes with the TRY_DELETE_RECORDS
888  *     control. The remote notes delete their local copy.
889  *  2) The lmaster locally deletes its copies of all records that
890  *     could successfully be deleted remotely in step #2.
891  */
892 static void ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
893                                      struct vacuum_data *vdata)
894 {
895         int ret, i;
896         struct ctdb_context *ctdb = ctdb_db->ctdb;
897         struct delete_records_list *recs;
898         TDB_DATA indata;
899         struct ctdb_node_map_old *nodemap;
900         uint32_t *active_nodes;
901         int num_active_nodes;
902         TALLOC_CTX *tmp_ctx;
903         uint32_t sum;
904
905         if (vdata->count.delete_list.total == 0) {
906                 return;
907         }
908
909         tmp_ctx = talloc_new(vdata);
910         if (tmp_ctx == NULL) {
911                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
912                 return;
913         }
914
915         vdata->count.delete_list.left = vdata->count.delete_list.total;
916
917         /*
918          * get the list of currently active nodes
919          */
920
921         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
922                                    CTDB_CURRENT_NODE,
923                                    tmp_ctx,
924                                    &nodemap);
925         if (ret != 0) {
926                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
927                 goto done;
928         }
929
930         active_nodes = list_of_active_nodes(ctdb, nodemap,
931                                             nodemap, /* talloc context */
932                                             false /* include self */);
933         /* yuck! ;-) */
934         num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
935
936         /*
937          * Now delete the records all active nodes in a two-phase process:
938          * 1) tell all active remote nodes to delete all their copy
939          * 2) if all remote nodes deleted their record copy, delete it locally
940          */
941
942         recs = talloc_zero(tmp_ctx, struct delete_records_list);
943         if (recs == NULL) {
944                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
945                 goto done;
946         }
947
948         /*
949          * Step 1:
950          * Send all records to all active nodes for deletion.
951          */
952
953         /*
954          * Create a marshall blob from the remaining list of records to delete.
955          */
956
957         recs->records = (struct ctdb_marshall_buffer *)
958                 talloc_zero_size(recs,
959                                  offsetof(struct ctdb_marshall_buffer, data));
960         if (recs->records == NULL) {
961                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
962                 goto done;
963         }
964         recs->records->db_id = ctdb_db->db_id;
965
966         ret = trbt_traversearray32(vdata->delete_list, 1,
967                                    delete_marshall_traverse, recs);
968         if (ret != 0) {
969                 DEBUG(DEBUG_ERR, (__location__ " Error traversing the "
970                       "delete list for second marshalling.\n"));
971                 goto done;
972         }
973
974         indata = ctdb_marshall_finish(recs->records);
975
976         for (i = 0; i < num_active_nodes; i++) {
977                 struct ctdb_marshall_buffer *records;
978                 struct ctdb_rec_data_old *rec;
979                 int32_t res;
980                 TDB_DATA outdata;
981
982                 ret = ctdb_control(ctdb, active_nodes[i], 0,
983                                 CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
984                                 indata, recs, &outdata, &res,
985                                 NULL, NULL);
986                 if (ret != 0 || res != 0) {
987                         DEBUG(DEBUG_ERR, ("Failed to delete records on "
988                                           "node %u: ret[%d] res[%d]\n",
989                                           active_nodes[i], ret, res));
990                         goto done;
991                 }
992
993                 /*
994                  * outdata contains the list of records coming back
995                  * from the node: These are the records that the
996                  * remote node could not delete. We remove these from
997                  * the list to delete locally.
998                  */
999                 records = (struct ctdb_marshall_buffer *)outdata.dptr;
1000                 rec = (struct ctdb_rec_data_old *)&records->data[0];
1001                 while (records->count-- > 0) {
1002                         TDB_DATA reckey, recdata;
1003                         struct ctdb_ltdb_header *rechdr;
1004                         struct delete_record_data *dd;
1005
1006                         reckey.dptr = &rec->data[0];
1007                         reckey.dsize = rec->keylen;
1008                         recdata.dptr = &rec->data[reckey.dsize];
1009                         recdata.dsize = rec->datalen;
1010
1011                         if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
1012                                 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
1013                                 goto done;
1014                         }
1015                         rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
1016                         recdata.dptr += sizeof(*rechdr);
1017                         recdata.dsize -= sizeof(*rechdr);
1018
1019                         dd = (struct delete_record_data *)trbt_lookup32(
1020                                         vdata->delete_list,
1021                                         ctdb_hash(&reckey));
1022                         if (dd != NULL) {
1023                                 /*
1024                                  * The remote node could not delete the
1025                                  * record.  Since other remote nodes can
1026                                  * also fail, we just mark the record.
1027                                  */
1028                                 dd->remote_fail_count++;
1029                         } else {
1030                                 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1031                                       "find record with hash 0x%08x coming "
1032                                       "back from TRY_DELETE_RECORDS "
1033                                       "control in delete list.\n",
1034                                       ctdb_hash(&reckey)));
1035                         }
1036
1037                         rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1038                 }
1039         }
1040
1041         /*
1042          * Step 2:
1043          * Delete the remaining records locally.
1044          *
1045          * These records have successfully been deleted on all
1046          * active remote nodes.
1047          */
1048
1049         ret = trbt_traversearray32(vdata->delete_list, 1,
1050                                    delete_record_traverse, vdata);
1051         if (ret != 0) {
1052                 DEBUG(DEBUG_ERR, (__location__ " Error traversing the "
1053                       "delete list for deletion.\n"));
1054         }
1055
1056         if (vdata->count.delete_list.left != 0) {
1057                 DEBUG(DEBUG_ERR, (__location__ " Vacuum db[%s] error: "
1058                       "there are %u records left for deletion after "
1059                       "processing delete list\n",
1060                       ctdb_db->db_name,
1061                       (unsigned)vdata->count.delete_list.left));
1062         }
1063
1064         sum = vdata->count.delete_list.deleted
1065             + vdata->count.delete_list.skipped
1066             + vdata->count.delete_list.remote_error
1067             + vdata->count.delete_list.local_error
1068             + vdata->count.delete_list.left;
1069
1070         if (vdata->count.delete_list.total != sum) {
1071                 DEBUG(DEBUG_ERR, (__location__ " Inconsistency in vacuum "
1072                       "delete list counts for db[%s]: total[%u] != sum[%u]\n",
1073                       ctdb_db->db_name,
1074                       (unsigned)vdata->count.delete_list.total,
1075                       (unsigned)sum));
1076         }
1077
1078         if (vdata->count.delete_list.total > 0) {
1079                 DEBUG(DEBUG_INFO,
1080                       (__location__
1081                        " vacuum delete list statistics: "
1082                        "db[%s] "
1083                        "total[%u] "
1084                        "del[%u] "
1085                        "skip[%u] "
1086                        "rem.err[%u] "
1087                        "loc.err[%u] "
1088                        "left[%u]\n",
1089                        ctdb_db->db_name,
1090                        (unsigned)vdata->count.delete_list.total,
1091                        (unsigned)vdata->count.delete_list.deleted,
1092                        (unsigned)vdata->count.delete_list.skipped,
1093                        (unsigned)vdata->count.delete_list.remote_error,
1094                        (unsigned)vdata->count.delete_list.local_error,
1095                        (unsigned)vdata->count.delete_list.left));
1096         }
1097
1098 done:
1099         talloc_free(tmp_ctx);
1100
1101         return;
1102 }
1103
1104 /**
1105  * initialize the vacuum_data
1106  */
1107 static struct vacuum_data *ctdb_vacuum_init_vacuum_data(
1108                                         struct ctdb_db_context *ctdb_db,
1109                                         TALLOC_CTX *mem_ctx)
1110 {
1111         unsigned int i;
1112         struct ctdb_context *ctdb = ctdb_db->ctdb;
1113         struct vacuum_data *vdata;
1114
1115         vdata = talloc_zero(mem_ctx, struct vacuum_data);
1116         if (vdata == NULL) {
1117                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1118                 return NULL;
1119         }
1120
1121         vdata->ctdb = ctdb_db->ctdb;
1122         vdata->ctdb_db = ctdb_db;
1123         vdata->delete_list = trbt_create(vdata, 0);
1124         if (vdata->delete_list == NULL) {
1125                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1126                 goto fail;
1127         }
1128
1129         vdata->start = timeval_current();
1130
1131         vdata->count.delete_queue.added_to_delete_list = 0;
1132         vdata->count.delete_queue.added_to_vacuum_fetch_list = 0;
1133         vdata->count.delete_queue.deleted = 0;
1134         vdata->count.delete_queue.skipped = 0;
1135         vdata->count.delete_queue.error = 0;
1136         vdata->count.delete_queue.total = 0;
1137         vdata->count.db_traverse.scheduled = 0;
1138         vdata->count.db_traverse.skipped = 0;
1139         vdata->count.db_traverse.error = 0;
1140         vdata->count.db_traverse.total = 0;
1141         vdata->count.delete_list.total = 0;
1142         vdata->count.delete_list.left = 0;
1143         vdata->count.delete_list.remote_error = 0;
1144         vdata->count.delete_list.local_error = 0;
1145         vdata->count.delete_list.skipped = 0;
1146         vdata->count.delete_list.deleted = 0;
1147
1148         /* the list needs to be of length num_nodes */
1149         vdata->vacuum_fetch_list = talloc_zero_array(vdata,
1150                                                 struct ctdb_marshall_buffer *,
1151                                                 ctdb->num_nodes);
1152         if (vdata->vacuum_fetch_list == NULL) {
1153                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1154                 goto fail;
1155         }
1156         for (i = 0; i < ctdb->num_nodes; i++) {
1157                 vdata->vacuum_fetch_list[i] = (struct ctdb_marshall_buffer *)
1158                         talloc_zero_size(vdata->vacuum_fetch_list,
1159                                          offsetof(struct ctdb_marshall_buffer, data));
1160                 if (vdata->vacuum_fetch_list[i] == NULL) {
1161                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1162                         talloc_free(vdata);
1163                         return NULL;
1164                 }
1165                 vdata->vacuum_fetch_list[i]->db_id = ctdb_db->db_id;
1166         }
1167
1168         return vdata;
1169
1170 fail:
1171         talloc_free(vdata);
1172         return NULL;
1173 }
1174
1175 /**
1176  * Vacuum a DB:
1177  *  - Always do the fast vacuuming run, which traverses
1178  *    - the in-memory fetch queue: these records have been
1179  *      scheduled for migration
1180  *    - the in-memory delete queue: these records have been
1181  *      scheduled for deletion.
1182  *  - Only if explicitly requested, the database is traversed
1183  *    in order to use the traditional heuristics on empty records
1184  *    to trigger deletion.
1185  *    This is done only every VacuumFastPathCount'th vacuuming run.
1186  *
1187  * The traverse runs fill two lists:
1188  *
1189  * - The delete_list:
1190  *   This is the list of empty records the current
1191  *   node is lmaster and dmaster for. These records are later
1192  *   deleted first on other nodes and then locally.
1193  *
1194  *   The fast vacuuming run has a short cut for those records
1195  *   that have never been migrated with data: these records
1196  *   are immediately deleted locally, since they have left
1197  *   no trace on other nodes.
1198  *
1199  * - The vacuum_fetch lists
1200  *   (one for each other lmaster node):
1201  *   The records in this list are sent for deletion to
1202  *   their lmaster in a bulk VACUUM_FETCH control.
1203  *
1204  *   The lmaster then migrates all these records to itelf
1205  *   so that they can be vacuumed there.
1206  *
1207  * This executes in the child context.
1208  */
1209 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db,
1210                           bool full_vacuum_run)
1211 {
1212         struct ctdb_context *ctdb = ctdb_db->ctdb;
1213         int ret, pnn;
1214         struct vacuum_data *vdata;
1215         TALLOC_CTX *tmp_ctx;
1216
1217         DEBUG(DEBUG_INFO, (__location__ " Entering %s vacuum run for db "
1218                            "%s db_id[0x%08x]\n",
1219                            full_vacuum_run ? "full" : "fast",
1220                            ctdb_db->db_name, ctdb_db->db_id));
1221
1222         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
1223         if (ret != 0) {
1224                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
1225                 return ret;
1226         }
1227
1228         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1229         if (pnn == -1) {
1230                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
1231                 return -1;
1232         }
1233
1234         ctdb->pnn = pnn;
1235
1236         tmp_ctx = talloc_new(ctdb_db);
1237         if (tmp_ctx == NULL) {
1238                 DEBUG(DEBUG_ERR, ("Out of memory!\n"));
1239                 return -1;
1240         }
1241
1242         vdata = ctdb_vacuum_init_vacuum_data(ctdb_db, tmp_ctx);
1243         if (vdata == NULL) {
1244                 talloc_free(tmp_ctx);
1245                 return -1;
1246         }
1247
1248         if (full_vacuum_run) {
1249                 ctdb_vacuum_traverse_db(ctdb_db, vdata);
1250         }
1251
1252         ctdb_process_fetch_queue(ctdb_db);
1253
1254         ctdb_process_delete_queue(ctdb_db, vdata);
1255
1256         ctdb_process_vacuum_fetch_lists(ctdb_db, vdata);
1257
1258         ctdb_process_delete_list(ctdb_db, vdata);
1259
1260         talloc_free(tmp_ctx);
1261
1262         return 0;
1263 }
1264
1265 /*
1266  * repack and vacuum a db
1267  * called from the child context
1268  */
1269 static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
1270                                      bool full_vacuum_run)
1271 {
1272         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
1273         const char *name = ctdb_db->db_name;
1274         int freelist_size = 0;
1275         int ret;
1276
1277         if (ctdb_vacuum_db(ctdb_db, full_vacuum_run) != 0) {
1278                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
1279         }
1280
1281         freelist_size = tdb_freelist_size(ctdb_db->ltdb->tdb);
1282         if (freelist_size == -1) {
1283                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
1284                 return -1;
1285         }
1286
1287         /*
1288          * decide if a repack is necessary
1289          */
1290         if ((repack_limit == 0 || (uint32_t)freelist_size < repack_limit))
1291         {
1292                 return 0;
1293         }
1294
1295         D_NOTICE("Repacking %s with %u freelist entries\n",
1296                  name,
1297                  freelist_size);
1298
1299         ret = tdb_repack(ctdb_db->ltdb->tdb);
1300         if (ret != 0) {
1301                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
1302                 return -1;
1303         }
1304
1305         return 0;
1306 }
1307
1308 static uint32_t get_vacuum_interval(struct ctdb_db_context *ctdb_db)
1309 {
1310         uint32_t interval = ctdb_db->ctdb->tunable.vacuum_interval;
1311
1312         return interval;
1313 }
1314
1315 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
1316 {
1317         double l = timeval_elapsed(&child_ctx->start_time);
1318         struct ctdb_vacuum_handle *vacuum_handle = child_ctx->vacuum_handle;
1319         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
1320         struct ctdb_context *ctdb = ctdb_db->ctdb;
1321
1322         CTDB_UPDATE_DB_LATENCY(ctdb_db, "vacuum", vacuum.latency, l);
1323         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
1324
1325         if (child_ctx->child_pid != -1) {
1326                 ctdb_kill(ctdb, child_ctx->child_pid, SIGKILL);
1327         } else {
1328                 /* Bump the number of successful fast-path runs. */
1329                 vacuum_handle->fast_path_count++;
1330         }
1331
1332         ctdb->vacuumer = NULL;
1333
1334         if (child_ctx->scheduled) {
1335                 vacuum_handle->vacuum_interval = get_vacuum_interval(ctdb_db);
1336
1337                 tevent_add_timer(
1338                         ctdb->ev,
1339                         vacuum_handle,
1340                         timeval_current_ofs(vacuum_handle->vacuum_interval, 0),
1341                         ctdb_vacuum_event,
1342                         vacuum_handle);
1343         }
1344
1345         return 0;
1346 }
1347
1348 /*
1349  * this event is generated when a vacuum child process times out
1350  */
1351 static void vacuum_child_timeout(struct tevent_context *ev,
1352                                  struct tevent_timer *te,
1353                                  struct timeval t, void *private_data)
1354 {
1355         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
1356
1357         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
1358
1359         child_ctx->status = VACUUM_TIMEOUT;
1360
1361         talloc_free(child_ctx);
1362 }
1363
1364
1365 /*
1366  * this event is generated when a vacuum child process has completed
1367  */
1368 static void vacuum_child_handler(struct tevent_context *ev,
1369                                  struct tevent_fd *fde,
1370                                  uint16_t flags, void *private_data)
1371 {
1372         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
1373         char c = 0;
1374         int ret;
1375
1376         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
1377         child_ctx->child_pid = -1;
1378
1379         ret = sys_read(child_ctx->fd[0], &c, 1);
1380         if (ret != 1 || c != 0) {
1381                 child_ctx->status = VACUUM_ERROR;
1382                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
1383         } else {
1384                 child_ctx->status = VACUUM_OK;
1385         }
1386
1387         talloc_free(child_ctx);
1388 }
1389
1390 /*
1391  * this event is called every time we need to start a new vacuum process
1392  */
1393 static int vacuum_db_child(TALLOC_CTX *mem_ctx,
1394                            struct ctdb_db_context *ctdb_db,
1395                            bool scheduled,
1396                            bool full_vacuum_run,
1397                            struct ctdb_vacuum_child_context **out)
1398 {
1399         struct ctdb_context *ctdb = ctdb_db->ctdb;
1400         struct ctdb_vacuum_child_context *child_ctx;
1401         struct tevent_fd *fde;
1402         int ret;
1403
1404         /* we don't vacuum if we are in recovery mode, or db frozen */
1405         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
1406             ctdb_db_frozen(ctdb_db)) {
1407                 D_INFO("Not vacuuming %s (%s)\n", ctdb_db->db_name,
1408                        ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ?
1409                        "in recovery" : "frozen");
1410                 return EAGAIN;
1411         }
1412
1413         /* Do not allow multiple vacuuming child processes to be active at the
1414          * same time.  If there is vacuuming child process active, delay
1415          * new vacuuming event to stagger vacuuming events.
1416          */
1417         if (ctdb->vacuumer != NULL) {
1418                 return EBUSY;
1419         }
1420
1421         child_ctx = talloc_zero(mem_ctx, struct ctdb_vacuum_child_context);
1422         if (child_ctx == NULL) {
1423                 DBG_ERR("Failed to allocate child context for vacuuming of %s\n",
1424                         ctdb_db->db_name);
1425                 return ENOMEM;
1426         }
1427
1428
1429         ret = pipe(child_ctx->fd);
1430         if (ret != 0) {
1431                 talloc_free(child_ctx);
1432                 D_ERR("Failed to create pipe for vacuum child process.\n");
1433                 return EAGAIN;
1434         }
1435
1436         child_ctx->child_pid = ctdb_fork(ctdb);
1437         if (child_ctx->child_pid == (pid_t)-1) {
1438                 close(child_ctx->fd[0]);
1439                 close(child_ctx->fd[1]);
1440                 talloc_free(child_ctx);
1441                 D_ERR("Failed to fork vacuum child process.\n");
1442                 return EAGAIN;
1443         }
1444
1445
1446         if (child_ctx->child_pid == 0) {
1447                 char cc = 0;
1448                 close(child_ctx->fd[0]);
1449
1450                 D_INFO("Vacuuming child process %d for db %s started\n",
1451                        getpid(),
1452                        ctdb_db->db_name);
1453                 prctl_set_comment("ctdb_vacuum");
1454                 ret = switch_from_server_to_client(ctdb);
1455                 if (ret != 0) {
1456                         DBG_ERR("ERROR: failed to switch vacuum daemon "
1457                                 "into client mode.\n");
1458                         return EIO;
1459                 }
1460
1461                 cc = ctdb_vacuum_and_repack_db(ctdb_db, full_vacuum_run);
1462
1463                 sys_write(child_ctx->fd[1], &cc, 1);
1464                 _exit(0);
1465         }
1466
1467         set_close_on_exec(child_ctx->fd[0]);
1468         close(child_ctx->fd[1]);
1469
1470         child_ctx->status = VACUUM_RUNNING;
1471         child_ctx->scheduled = scheduled;
1472         child_ctx->start_time = timeval_current();
1473
1474         ctdb->vacuumer = child_ctx;
1475         talloc_set_destructor(child_ctx, vacuum_child_destructor);
1476
1477         /*
1478          * Clear the fastpath vacuuming list in the parent.
1479          */
1480         talloc_free(ctdb_db->delete_queue);
1481         ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
1482         if (ctdb_db->delete_queue == NULL) {
1483                 DBG_ERR("Out of memory when re-creating vacuum tree\n");
1484                 return ENOMEM;
1485         }
1486
1487         talloc_free(ctdb_db->fetch_queue);
1488         ctdb_db->fetch_queue = trbt_create(ctdb_db, 0);
1489         if (ctdb_db->fetch_queue == NULL) {
1490                 ctdb_fatal(ctdb, "Out of memory when re-create fetch queue "
1491                                  " in parent context. Shutting down\n");
1492         }
1493
1494         tevent_add_timer(ctdb->ev, child_ctx,
1495                          timeval_current_ofs(ctdb->tunable.vacuum_max_run_time,
1496                                              0),
1497                          vacuum_child_timeout, child_ctx);
1498
1499         DBG_DEBUG(" Created PIPE FD:%d to child vacuum process\n",
1500                   child_ctx->fd[0]);
1501
1502         fde = tevent_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
1503                             TEVENT_FD_READ, vacuum_child_handler, child_ctx);
1504         tevent_fd_set_auto_close(fde);
1505
1506         child_ctx->vacuum_handle = ctdb_db->vacuum_handle;
1507
1508         *out = child_ctx;
1509         return 0;
1510 }
1511
1512 static void ctdb_vacuum_event(struct tevent_context *ev,
1513                               struct tevent_timer *te,
1514                               struct timeval t, void *private_data)
1515 {
1516         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(
1517                 private_data, struct ctdb_vacuum_handle);
1518         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
1519         struct ctdb_context *ctdb = ctdb_db->ctdb;
1520         struct ctdb_vacuum_child_context *child_ctx = NULL;
1521         uint32_t fast_path_max = ctdb->tunable.vacuum_fast_path_count;
1522         uint32_t vacuum_interval = get_vacuum_interval(ctdb_db);
1523         bool full_vacuum_run = false;
1524         int ret;
1525
1526         if (vacuum_interval > vacuum_handle->vacuum_interval) {
1527                 uint32_t d = vacuum_interval - vacuum_handle->vacuum_interval;
1528
1529                 DBG_INFO("Vacuum interval increased from "
1530                          "%"PRIu32" to %"PRIu32", rescheduling\n",
1531                          vacuum_handle->vacuum_interval,
1532                          vacuum_interval);
1533                 vacuum_handle->vacuum_interval = vacuum_interval;
1534                 tevent_add_timer(ctdb->ev,
1535                                  vacuum_handle,
1536                                  timeval_current_ofs(d, 0),
1537                                  ctdb_vacuum_event,
1538                                  vacuum_handle);
1539                 return;
1540         }
1541
1542         vacuum_handle->vacuum_interval = vacuum_interval;
1543
1544         if (vacuum_handle->fast_path_count >= fast_path_max) {
1545                 if (fast_path_max > 0) {
1546                         full_vacuum_run = true;
1547                 }
1548                 vacuum_handle->fast_path_count = 0;
1549         }
1550
1551         ret = vacuum_db_child(vacuum_handle,
1552                               ctdb_db,
1553                               true,
1554                               full_vacuum_run,
1555                               &child_ctx);
1556
1557         if (ret == 0) {
1558                 return;
1559         }
1560
1561         switch (ret) {
1562         case EBUSY:
1563                 /* Stagger */
1564                 tevent_add_timer(ctdb->ev,
1565                                  vacuum_handle,
1566                                  timeval_current_ofs(0, 500*1000),
1567                                  ctdb_vacuum_event,
1568                                  vacuum_handle);
1569                 break;
1570
1571         default:
1572                 /* Temporary failure, schedule next attempt */
1573                 tevent_add_timer(ctdb->ev,
1574                                  vacuum_handle,
1575                                  timeval_current_ofs(
1576                                          vacuum_handle->vacuum_interval, 0),
1577                                  ctdb_vacuum_event,
1578                                  vacuum_handle);
1579         }
1580
1581 }
1582
1583 struct vacuum_control_state {
1584         struct ctdb_vacuum_child_context *child_ctx;
1585         struct ctdb_req_control_old *c;
1586         struct ctdb_context *ctdb;
1587 };
1588
1589 static int vacuum_control_state_destructor(struct vacuum_control_state *state)
1590 {
1591         struct ctdb_vacuum_child_context *child_ctx = state->child_ctx;
1592         int32_t status;
1593
1594         status = (child_ctx->status == VACUUM_OK ? 0 : -1);
1595         ctdb_request_control_reply(state->ctdb, state->c, NULL, status, NULL);
1596
1597         return 0;
1598 }
1599
1600 int32_t ctdb_control_db_vacuum(struct ctdb_context *ctdb,
1601                                struct ctdb_req_control_old *c,
1602                                TDB_DATA indata,
1603                                bool *async_reply)
1604 {
1605         struct ctdb_db_context *ctdb_db;
1606         struct ctdb_vacuum_child_context *child_ctx = NULL;
1607         struct ctdb_db_vacuum *db_vacuum;
1608         struct vacuum_control_state *state;
1609         size_t np;
1610         int ret;
1611
1612         ret = ctdb_db_vacuum_pull(indata.dptr,
1613                                   indata.dsize,
1614                                   ctdb,
1615                                   &db_vacuum,
1616                                   &np);
1617         if (ret != 0) {
1618                 DBG_ERR("Invalid data\n");
1619                 return -1;
1620         }
1621
1622         ctdb_db = find_ctdb_db(ctdb, db_vacuum->db_id);
1623         if (ctdb_db == NULL) {
1624                 DBG_ERR("Unknown db id 0x%08x\n", db_vacuum->db_id);
1625                 talloc_free(db_vacuum);
1626                 return -1;
1627         }
1628
1629         state = talloc(ctdb, struct vacuum_control_state);
1630         if (state == NULL) {
1631                 DBG_ERR("Memory allocation error\n");
1632                 return -1;
1633         }
1634
1635         ret = vacuum_db_child(ctdb_db,
1636                               ctdb_db,
1637                               false,
1638                               db_vacuum->full_vacuum_run,
1639                               &child_ctx);
1640
1641         talloc_free(db_vacuum);
1642
1643         if (ret == 0) {
1644                 (void) talloc_steal(child_ctx, state);
1645
1646                 state->child_ctx = child_ctx;
1647                 state->c = talloc_steal(state, c);
1648                 state->ctdb = ctdb;
1649
1650                 talloc_set_destructor(state, vacuum_control_state_destructor);
1651
1652                 *async_reply = true;
1653                 return 0;
1654         }
1655
1656         talloc_free(state);
1657
1658         switch (ret) {
1659         case EBUSY:
1660                 DBG_WARNING("Vacuuming collision\n");
1661                 break;
1662
1663         default:
1664                 DBG_ERR("Temporary vacuuming failure, ret=%d\n", ret);
1665         }
1666
1667         return -1;
1668 }
1669
1670 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
1671 {
1672         if (ctdb->vacuumer != NULL) {
1673                 D_INFO("Aborting vacuuming for %s (%i)\n",
1674                        ctdb->vacuumer->vacuum_handle->ctdb_db->db_name,
1675                        (int)ctdb->vacuumer->child_pid);
1676                 /* vacuum_child_destructor kills it, removes from list */
1677                 talloc_free(ctdb->vacuumer);
1678         }
1679 }
1680
1681 /* this function initializes the vacuuming context for a database
1682  * starts the vacuuming events
1683  */
1684 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
1685 {
1686         struct ctdb_vacuum_handle *vacuum_handle;
1687
1688         if (! ctdb_db_volatile(ctdb_db)) {
1689                 DEBUG(DEBUG_ERR,
1690                       ("Vacuuming is disabled for non-volatile database %s\n",
1691                        ctdb_db->db_name));
1692                 return 0;
1693         }
1694
1695         vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
1696         if (vacuum_handle == NULL) {
1697                 DBG_ERR("Memory allocation error\n");
1698                 return -1;
1699         }
1700
1701         vacuum_handle->ctdb_db = ctdb_db;
1702         vacuum_handle->fast_path_count = 0;
1703         vacuum_handle->vacuum_interval = get_vacuum_interval(ctdb_db);
1704
1705         ctdb_db->vacuum_handle = vacuum_handle;
1706
1707         tevent_add_timer(ctdb_db->ctdb->ev,
1708                          vacuum_handle,
1709                          timeval_current_ofs(vacuum_handle->vacuum_interval, 0),
1710                          ctdb_vacuum_event,
1711                          vacuum_handle);
1712
1713         return 0;
1714 }
1715
1716 static void remove_record_from_delete_queue(struct ctdb_db_context *ctdb_db,
1717                                             const struct ctdb_ltdb_header *hdr,
1718                                             const TDB_DATA key)
1719 {
1720         struct delete_record_data *kd;
1721         uint32_t hash;
1722
1723         hash = (uint32_t)ctdb_hash(&key);
1724
1725         DEBUG(DEBUG_DEBUG, (__location__
1726                             " remove_record_from_delete_queue: "
1727                             "db[%s] "
1728                             "db_id[0x%08x] "
1729                             "key_hash[0x%08x] "
1730                             "lmaster[%u] "
1731                             "migrated_with_data[%s]\n",
1732                              ctdb_db->db_name, ctdb_db->db_id,
1733                              hash,
1734                              ctdb_lmaster(ctdb_db->ctdb, &key),
1735                              hdr->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA ? "yes" : "no"));
1736
1737         kd = (struct delete_record_data *)trbt_lookup32(ctdb_db->delete_queue, hash);
1738         if (kd == NULL) {
1739                 DEBUG(DEBUG_DEBUG, (__location__
1740                                     " remove_record_from_delete_queue: "
1741                                     "record not in queue (hash[0x%08x])\n.",
1742                                     hash));
1743                 return;
1744         }
1745
1746         if ((kd->key.dsize != key.dsize) ||
1747             (memcmp(kd->key.dptr, key.dptr, key.dsize) != 0))
1748         {
1749                 DEBUG(DEBUG_DEBUG, (__location__
1750                                     " remove_record_from_delete_queue: "
1751                                     "hash collision for key with hash[0x%08x] "
1752                                     "in db[%s] - skipping\n",
1753                                     hash, ctdb_db->db_name));
1754                 return;
1755         }
1756
1757         DEBUG(DEBUG_DEBUG, (__location__
1758                             " remove_record_from_delete_queue: "
1759                             "removing key with hash[0x%08x]\n",
1760                              hash));
1761
1762         talloc_free(kd);
1763
1764         return;
1765 }
1766
1767 /**
1768  * Insert a record into the ctdb_db context's delete queue,
1769  * handling hash collisions.
1770  */
1771 static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
1772                                            const struct ctdb_ltdb_header *hdr,
1773                                            TDB_DATA key)
1774 {
1775         struct delete_record_data *kd;
1776         uint32_t hash;
1777         int ret;
1778
1779         hash = (uint32_t)ctdb_hash(&key);
1780
1781         DEBUG(DEBUG_DEBUG, (__location__ " schedule for deletion: db[%s] "
1782                             "db_id[0x%08x] "
1783                             "key_hash[0x%08x] "
1784                             "lmaster[%u] "
1785                             "migrated_with_data[%s]\n",
1786                             ctdb_db->db_name, ctdb_db->db_id,
1787                             hash,
1788                             ctdb_lmaster(ctdb_db->ctdb, &key),
1789                             hdr->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA ? "yes" : "no"));
1790
1791         kd = (struct delete_record_data *)trbt_lookup32(ctdb_db->delete_queue, hash);
1792         if (kd != NULL) {
1793                 if ((kd->key.dsize != key.dsize) ||
1794                     (memcmp(kd->key.dptr, key.dptr, key.dsize) != 0))
1795                 {
1796                         DEBUG(DEBUG_INFO,
1797                               (__location__ " schedule for deletion: "
1798                                "hash collision for key hash [0x%08x]. "
1799                                "Skipping the record.\n", hash));
1800                         return 0;
1801                 } else {
1802                         DEBUG(DEBUG_DEBUG,
1803                               (__location__ " schedule for deletion: "
1804                                "updating entry for key with hash [0x%08x].\n",
1805                                hash));
1806                 }
1807         }
1808
1809         ret = insert_delete_record_data_into_tree(ctdb_db->ctdb, ctdb_db,
1810                                                   ctdb_db->delete_queue,
1811                                                   hdr, key);
1812         if (ret != 0) {
1813                 DEBUG(DEBUG_INFO,
1814                       (__location__ " schedule for deletion: error "
1815                        "inserting key with hash [0x%08x] into delete queue\n",
1816                        hash));
1817                 return -1;
1818         }
1819
1820         return 0;
1821 }
1822
1823 /**
1824  * Schedule a record for deletion.
1825  * Called from the parent context.
1826  */
1827 int32_t ctdb_control_schedule_for_deletion(struct ctdb_context *ctdb,
1828                                            TDB_DATA indata)
1829 {
1830         struct ctdb_control_schedule_for_deletion *dd;
1831         struct ctdb_db_context *ctdb_db;
1832         int ret;
1833         TDB_DATA key;
1834
1835         dd = (struct ctdb_control_schedule_for_deletion *)indata.dptr;
1836
1837         ctdb_db = find_ctdb_db(ctdb, dd->db_id);
1838         if (ctdb_db == NULL) {
1839                 DEBUG(DEBUG_ERR, (__location__ " Unknown db id 0x%08x\n",
1840                                   dd->db_id));
1841                 return -1;
1842         }
1843
1844         key.dsize = dd->keylen;
1845         key.dptr = dd->key;
1846
1847         ret = insert_record_into_delete_queue(ctdb_db, &dd->hdr, key);
1848
1849         return ret;
1850 }
1851
1852 int32_t ctdb_local_schedule_for_deletion(struct ctdb_db_context *ctdb_db,
1853                                          const struct ctdb_ltdb_header *hdr,
1854                                          TDB_DATA key)
1855 {
1856         int ret;
1857         struct ctdb_control_schedule_for_deletion *dd;
1858         TDB_DATA indata;
1859         int32_t status;
1860
1861         if (ctdb_db->ctdb->ctdbd_pid == getpid()) {
1862                 /* main daemon - directly queue */
1863                 ret = insert_record_into_delete_queue(ctdb_db, hdr, key);
1864
1865                 return ret;
1866         }
1867
1868         /* if we don't have a connection to the daemon we can not send
1869            a control. For example sometimes from update_record control child
1870            process.
1871         */
1872         if (!ctdb_db->ctdb->can_send_controls) {
1873                 return -1;
1874         }
1875
1876
1877         /* child process: send the main daemon a control */
1878         indata.dsize = offsetof(struct ctdb_control_schedule_for_deletion, key) + key.dsize;
1879         indata.dptr = talloc_zero_array(ctdb_db, uint8_t, indata.dsize);
1880         if (indata.dptr == NULL) {
1881                 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
1882                 return -1;
1883         }
1884         dd = (struct ctdb_control_schedule_for_deletion *)(void *)indata.dptr;
1885         dd->db_id = ctdb_db->db_id;
1886         dd->hdr = *hdr;
1887         dd->keylen = key.dsize;
1888         memcpy(dd->key, key.dptr, key.dsize);
1889
1890         ret = ctdb_control(ctdb_db->ctdb,
1891                            CTDB_CURRENT_NODE,
1892                            ctdb_db->db_id,
1893                            CTDB_CONTROL_SCHEDULE_FOR_DELETION,
1894                            CTDB_CTRL_FLAG_NOREPLY, /* flags */
1895                            indata,
1896                            NULL, /* mem_ctx */
1897                            NULL, /* outdata */
1898                            &status,
1899                            NULL, /* timeout : NULL == wait forever */
1900                            NULL); /* error message */
1901
1902         talloc_free(indata.dptr);
1903
1904         if (ret != 0 || status != 0) {
1905                 DEBUG(DEBUG_ERR, (__location__ " Error sending "
1906                                   "SCHEDULE_FOR_DELETION "
1907                                   "control.\n"));
1908                 if (status != 0) {
1909                         ret = -1;
1910                 }
1911         }
1912
1913         return ret;
1914 }
1915
1916 void ctdb_local_remove_from_delete_queue(struct ctdb_db_context *ctdb_db,
1917                                          const struct ctdb_ltdb_header *hdr,
1918                                          const TDB_DATA key)
1919 {
1920         if (ctdb_db->ctdb->ctdbd_pid != getpid()) {
1921                 /*
1922                  * Only remove the record from the delete queue if called
1923                  * in the main daemon.
1924                  */
1925                 return;
1926         }
1927
1928         remove_record_from_delete_queue(ctdb_db, hdr, key);
1929
1930         return;
1931 }
1932
1933 static int vacuum_fetch_parser(uint32_t reqid,
1934                                struct ctdb_ltdb_header *header,
1935                                TDB_DATA key, TDB_DATA data,
1936                                void *private_data)
1937 {
1938         struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1939                 private_data, struct ctdb_db_context);
1940         struct fetch_record_data *rd;
1941         size_t len;
1942         uint32_t hash;
1943
1944         len = offsetof(struct fetch_record_data, keydata) + key.dsize;
1945
1946         rd = (struct fetch_record_data *)talloc_size(ctdb_db->fetch_queue,
1947                                                      len);
1948         if (rd == NULL) {
1949                 DEBUG(DEBUG_ERR, (__location__ " Memory error\n"));
1950                 return -1;
1951         }
1952         talloc_set_name_const(rd, "struct fetch_record_data");
1953
1954         rd->key.dsize = key.dsize;
1955         rd->key.dptr = rd->keydata;
1956         memcpy(rd->keydata, key.dptr, key.dsize);
1957
1958         hash = ctdb_hash(&key);
1959
1960         trbt_insert32(ctdb_db->fetch_queue, hash, rd);
1961
1962         return 0;
1963 }
1964
1965 int32_t ctdb_control_vacuum_fetch(struct ctdb_context *ctdb, TDB_DATA indata)
1966 {
1967         struct ctdb_rec_buffer *recbuf;
1968         struct ctdb_db_context *ctdb_db;
1969         size_t npull;
1970         int ret;
1971
1972         ret = ctdb_rec_buffer_pull(indata.dptr, indata.dsize, ctdb, &recbuf,
1973                                    &npull);
1974         if (ret != 0) {
1975                 DEBUG(DEBUG_ERR, ("Invalid data in vacuum_fetch\n"));
1976                 return -1;
1977         }
1978
1979         ctdb_db = find_ctdb_db(ctdb, recbuf->db_id);
1980         if (ctdb_db == NULL) {
1981                 talloc_free(recbuf);
1982                 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1983                                   recbuf->db_id));
1984                 return -1;
1985         }
1986
1987         ret = ctdb_rec_buffer_traverse(recbuf, vacuum_fetch_parser, ctdb_db);
1988         talloc_free(recbuf);
1989         return ret;
1990 }