Merge commit 'rusty/vacuum-fix-master'
[metze/ctdb/wip.git] / server / ctdb_traverse.c
1 /* 
2    efficient async ctdb traverse
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "db_wrap.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28
29 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
30
31 /*
32   handle returned to caller - freeing this handler will kill the child and 
33   terminate the traverse
34  */
35 struct ctdb_traverse_local_handle {
36         struct ctdb_traverse_local_handle *next, *prev;
37         struct ctdb_db_context *ctdb_db;
38         int fd[2];
39         pid_t child;
40         uint64_t srvid;
41         uint32_t client_reqid;
42         void *private_data;
43         ctdb_traverse_fn_t callback;
44         struct timeval start_time;
45         struct ctdb_queue *queue;
46 };
47
48 /*
49   called when data is available from the child
50  */
51 static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
52 {
53         struct ctdb_traverse_local_handle *h = talloc_get_type(private_data, 
54                                                                struct ctdb_traverse_local_handle);
55         TDB_DATA key, data;
56         ctdb_traverse_fn_t callback = h->callback;
57         void *p = h->private_data;
58         struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
59
60         if (rawdata == NULL || length < 4 || length != tdata->length) {
61                 /* end of traverse */
62                 talloc_free(h);
63                 callback(p, tdb_null, tdb_null);
64                 return;
65         }
66
67         key.dsize = tdata->keylen;
68         key.dptr  = &tdata->data[0];
69         data.dsize = tdata->datalen;
70         data.dptr = &tdata->data[tdata->keylen];
71
72         callback(p, key, data); 
73 }
74
75 /*
76   destroy a in-flight traverse operation
77  */
78 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
79 {
80         DLIST_REMOVE(h->ctdb_db->traverse, h);
81         kill(h->child, SIGKILL);
82         return 0;
83 }
84
85 /*
86   callback from tdb_traverse_read()
87  */
88 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
89 {
90         struct ctdb_traverse_local_handle *h = talloc_get_type(p, 
91                                                                struct ctdb_traverse_local_handle);
92         struct ctdb_rec_data *d;
93         struct ctdb_ltdb_header *hdr;
94
95         hdr = (struct ctdb_ltdb_header *)data.dptr;
96
97         if (h->ctdb_db->persistent == 0) {
98                 /* filter out zero-length records */
99                 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
100                         return 0;
101                 }
102
103                 /* filter out non-authoritative records */
104                 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
105                         return 0;
106                 }
107         }
108
109         d = ctdb_marshall_record(h, 0, key, NULL, data);
110         if (d == NULL) {
111                 /* error handling is tricky in this child code .... */
112                 return -1;
113         }
114
115         if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
116                 return -1;
117         }
118         return 0;
119 }
120
121 struct traverse_all_state {
122         struct ctdb_context *ctdb;
123         struct ctdb_traverse_local_handle *h;
124         uint32_t reqid;
125         uint32_t srcnode;
126         uint32_t client_reqid;
127         uint64_t srvid;
128 };
129
130 /*
131   setup a non-blocking traverse of a local ltdb. The callback function
132   will be called on every record in the local ltdb. To stop the
133   travserse, talloc_free() the travserse_handle.
134
135   The traverse is finished when the callback is called with tdb_null for key and data
136  */
137 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
138                                                               ctdb_traverse_fn_t callback,
139                                                               struct traverse_all_state *all_state)
140 {
141         struct ctdb_traverse_local_handle *h;
142         int ret;
143
144         h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
145         if (h == NULL) {
146                 return NULL;
147         }
148
149         ret = pipe(h->fd);
150
151         if (ret != 0) {
152                 talloc_free(h);
153                 return NULL;
154         }
155
156         h->child = fork();
157
158         if (h->child == (pid_t)-1) {
159                 close(h->fd[0]);
160                 close(h->fd[1]);
161                 talloc_free(h);
162                 return NULL;
163         }
164
165         h->callback = callback;
166         h->private_data = all_state;
167         h->ctdb_db = ctdb_db;
168         h->client_reqid = all_state->client_reqid;
169         h->srvid = all_state->srvid;
170
171         if (h->child == 0) {
172                 /* start the traverse in the child */
173                 close(h->fd[0]);
174                 tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
175                 _exit(0);
176         }
177
178         close(h->fd[1]);
179         set_close_on_exec(h->fd[0]);
180
181         talloc_set_destructor(h, traverse_local_destructor);
182
183         DLIST_ADD(ctdb_db->traverse, h);
184
185         /*
186           setup a packet queue between the child and the parent. This
187           copes with all the async and packet boundary issues
188          */
189         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child traverse\n", h->fd[0]));
190
191         h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h,
192                                     "to-ctdbd");
193         if (h->queue == NULL) {
194                 talloc_free(h);
195                 return NULL;
196         }
197
198         h->start_time = timeval_current();
199
200         return h;
201 }
202
203
204 struct ctdb_traverse_all_handle {
205         struct ctdb_context *ctdb;
206         struct ctdb_db_context *ctdb_db;
207         uint32_t reqid;
208         ctdb_traverse_fn_t callback;
209         void *private_data;
210         uint32_t null_count;
211 };
212
213 /*
214   destroy a traverse_all op
215  */
216 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
217 {
218         ctdb_reqid_remove(state->ctdb, state->reqid);
219         return 0;
220 }
221
222 struct ctdb_traverse_all {
223         uint32_t db_id;
224         uint32_t reqid;
225         uint32_t pnn;
226         uint32_t client_reqid;
227         uint64_t srvid;
228 };
229
230 /* called when a traverse times out */
231 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te, 
232                                       struct timeval t, void *private_data)
233 {
234         struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
235
236         DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
237         state->ctdb->statistics.timeouts.traverse++;
238
239         state->callback(state->private_data, tdb_null, tdb_null);
240 }
241
242
243 struct traverse_start_state {
244         struct ctdb_context *ctdb;
245         struct ctdb_traverse_all_handle *h;
246         uint32_t srcnode;
247         uint32_t reqid;
248         uint32_t db_id;
249         uint64_t srvid;
250 };
251
252
253 /*
254   setup a cluster-wide non-blocking traverse of a ctdb. The
255   callback function will be called on every record in the local
256   ltdb. To stop the travserse, talloc_free() the traverse_handle.
257
258   The traverse is finished when the callback is called with tdb_null
259   for key and data
260  */
261 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
262                                                                  ctdb_traverse_fn_t callback,
263                                                                  struct traverse_start_state *start_state)
264 {
265         struct ctdb_traverse_all_handle *state;
266         struct ctdb_context *ctdb = ctdb_db->ctdb;
267         int ret;
268         TDB_DATA data;
269         struct ctdb_traverse_all r;
270         uint32_t destination;
271
272         state = talloc(start_state, struct ctdb_traverse_all_handle);
273         if (state == NULL) {
274                 return NULL;
275         }
276
277         state->ctdb         = ctdb;
278         state->ctdb_db      = ctdb_db;
279         state->reqid        = ctdb_reqid_new(ctdb_db->ctdb, state);
280         state->callback     = callback;
281         state->private_data = start_state;
282         state->null_count   = 0;
283         
284         talloc_set_destructor(state, ctdb_traverse_all_destructor);
285
286         r.db_id = ctdb_db->db_id;
287         r.reqid = state->reqid;
288         r.pnn   = ctdb->pnn;
289         r.client_reqid = start_state->reqid;
290         r.srvid = start_state->srvid;
291
292         data.dptr = (uint8_t *)&r;
293         data.dsize = sizeof(r);
294
295         if (ctdb_db->persistent == 0) {
296                 /* normal database, traverse all nodes */         
297                 destination = CTDB_BROADCAST_VNNMAP;
298         } else {
299                 int i;
300                 /* persistent database, traverse one node, preferably
301                  * the local one
302                  */
303                 destination = ctdb->pnn;
304                 /* check we are in the vnnmap */
305                 for (i=0; i < ctdb->vnn_map->size; i++) {
306                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
307                                 break;
308                         }
309                 }
310                 /* if we are not in the vnn map we just pick the first
311                  * node instead
312                  */
313                 if (i == ctdb->vnn_map->size) {
314                         destination = ctdb->vnn_map->map[0];
315                 }
316         }
317
318         /* tell all the nodes in the cluster to start sending records to this
319          * node, or if it is a persistent database, just tell the local
320          * node
321          */
322         ret = ctdb_daemon_send_control(ctdb, destination, 0, 
323                                CTDB_CONTROL_TRAVERSE_ALL,
324                                0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
325
326         if (ret != 0) {
327                 talloc_free(state);
328                 return NULL;
329         }
330
331         /* timeout the traverse */
332         event_add_timed(ctdb->ev, state, 
333                         timeval_current_ofs(ctdb->tunable.traverse_timeout, 0), 
334                         ctdb_traverse_all_timeout, state);
335
336         return state;
337 }
338
339 /*
340   called for each record during a traverse all 
341  */
342 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
343 {
344         struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
345         int ret;
346         struct ctdb_rec_data *d;
347         TDB_DATA cdata;
348
349         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
350         if (d == NULL) {
351                 /* darn .... */
352                 DEBUG(DEBUG_ERR,("Out of memory in traverse_all_callback\n"));
353                 return;
354         }
355
356         cdata.dptr = (uint8_t *)d;
357         cdata.dsize = d->length;
358
359         ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
360                                        0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
361         if (ret != 0) {
362                 DEBUG(DEBUG_ERR,("Failed to send traverse data\n"));
363         }
364
365         if (key.dsize == 0 && data.dsize == 0) {
366                 /* we're done */
367                 talloc_free(state);
368         }
369 }
370
371 /*
372   called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
373   setup a traverse of our local ltdb, sending the records as
374   CTDB_CONTROL_TRAVERSE_DATA records back to the originator
375  */
376 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
377 {
378         struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
379         struct traverse_all_state *state;
380         struct ctdb_db_context *ctdb_db;
381
382         if (data.dsize != sizeof(struct ctdb_traverse_all)) {
383                 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
384                 return -1;
385         }
386
387         ctdb_db = find_ctdb_db(ctdb, c->db_id);
388         if (ctdb_db == NULL) {
389                 return -1;
390         }
391
392         if (ctdb_db->unhealthy_reason) {
393                 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
394                         DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
395                                         ctdb_db->db_name, ctdb_db->unhealthy_reason));
396                         return -1;
397                 }
398                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
399                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
400         }
401
402         state = talloc(ctdb_db, struct traverse_all_state);
403         if (state == NULL) {
404                 return -1;
405         }
406
407         state->reqid = c->reqid;
408         state->srcnode = c->pnn;
409         state->ctdb = ctdb;
410         state->client_reqid = c->client_reqid;
411         state->srvid = c->srvid;
412
413         state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
414         if (state->h == NULL) {
415                 talloc_free(state);
416                 return -1;
417         }
418
419         return 0;
420 }
421
422
423 /*
424   called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
425   call the traverse_all callback with the record
426  */
427 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
428 {
429         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
430         struct ctdb_traverse_all_handle *state;
431         TDB_DATA key;
432         ctdb_traverse_fn_t callback;
433         void *private_data;
434
435         if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
436                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
437                 return -1;
438         }
439
440         state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
441         if (state == NULL || d->reqid != state->reqid) {
442                 /* traverse might have been terminated already */
443                 return -1;
444         }
445
446         key.dsize = d->keylen;
447         key.dptr  = &d->data[0];
448         data.dsize = d->datalen;
449         data.dptr = &d->data[d->keylen];
450
451         if (key.dsize == 0 && data.dsize == 0) {
452                 state->null_count++;
453                 /* Persistent databases are only scanned on one node (the local
454                  * node)
455                  */
456                 if (state->ctdb_db->persistent == 0) {
457                         if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
458                                 return 0;
459                         }
460                 }
461         }
462
463         callback = state->callback;
464         private_data = state->private_data;
465
466         callback(private_data, key, data);
467         return 0;
468 }       
469
470 /*
471   kill a in-progress traverse, used when a client disconnects
472  */
473 int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data, 
474                                    TDB_DATA *outdata, uint32_t srcnode)
475 {
476         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
477         struct ctdb_db_context *ctdb_db;
478         struct ctdb_traverse_local_handle *t;
479
480         ctdb_db = find_ctdb_db(ctdb, d->db_id);
481         if (ctdb_db == NULL) {
482                 return -1;
483         }
484
485         for (t=ctdb_db->traverse; t; t=t->next) {
486                 if (t->client_reqid == d->reqid &&
487                     t->srvid == d->srvid) {
488                         talloc_free(t);
489                         break;
490                 }
491         }
492
493         return 0;
494 }
495
496
497 /*
498   this is called when a client disconnects during a traverse
499   we need to notify all the nodes taking part in the search that they
500   should kill their traverse children
501  */
502 static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
503 {
504         struct ctdb_traverse_start r;
505         TDB_DATA data;
506
507         DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
508         r.db_id = state->db_id;
509         r.reqid = state->reqid;
510         r.srvid = state->srvid;
511
512         data.dptr = (uint8_t *)&r;
513         data.dsize = sizeof(r);
514
515         ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0, 
516                                  CTDB_CONTROL_TRAVERSE_KILL, 
517                                  0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
518         return 0;
519 }
520
521 /*
522   callback which sends records as messages to the client
523  */
524 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
525 {
526         struct traverse_start_state *state;
527         struct ctdb_rec_data *d;
528         TDB_DATA cdata;
529
530         state = talloc_get_type(p, struct traverse_start_state);
531
532         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
533         if (d == NULL) {
534                 return;
535         }
536
537         cdata.dptr = (uint8_t *)d;
538         cdata.dsize = d->length;
539
540         ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
541         if (key.dsize == 0 && data.dsize == 0) {
542                 /* end of traverse */
543                 talloc_set_destructor(state, NULL);
544                 talloc_free(state);
545         }
546 }
547
548
549 /*
550   start a traverse_all - called as a control from a client
551  */
552 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data, 
553                                     TDB_DATA *outdata, uint32_t srcnode, uint32_t client_id)
554 {
555         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
556         struct traverse_start_state *state;
557         struct ctdb_db_context *ctdb_db;
558         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
559
560         if (client == NULL) {
561                 DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
562                 return -1;              
563         }
564
565         if (data.dsize != sizeof(*d)) {
566                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
567                 return -1;
568         }
569
570         ctdb_db = find_ctdb_db(ctdb, d->db_id);
571         if (ctdb_db == NULL) {
572                 return -1;
573         }
574
575         if (ctdb_db->unhealthy_reason) {
576                 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
577                         DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
578                                         ctdb_db->db_name, ctdb_db->unhealthy_reason));
579                         return -1;
580                 }
581                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
582                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
583         }
584
585         state = talloc(client, struct traverse_start_state);
586         if (state == NULL) {
587                 return -1;
588         }
589         
590         state->srcnode = srcnode;
591         state->reqid = d->reqid;
592         state->srvid = d->srvid;
593         state->db_id = d->db_id;
594         state->ctdb = ctdb;
595
596         state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
597         if (state->h == NULL) {
598                 talloc_free(state);
599                 return -1;
600         }
601
602         talloc_set_destructor(state, ctdb_traverse_start_destructor);
603
604         return 0;
605 }