Merge commit 'rusty/ports-from-1.0.112' into foo
[metze/ctdb/wip.git] / server / ctdb_traverse.c
1 /* 
2    efficient async ctdb traverse
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "db_wrap.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28
29 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
30
31 /*
32   handle returned to caller - freeing this handler will kill the child and 
33   terminate the traverse
34  */
35 struct ctdb_traverse_local_handle {
36         struct ctdb_traverse_local_handle *next, *prev;
37         struct ctdb_db_context *ctdb_db;
38         int fd[2];
39         pid_t child;
40         uint64_t srvid;
41         uint32_t client_reqid;
42         void *private_data;
43         ctdb_traverse_fn_t callback;
44         struct timeval start_time;
45         struct ctdb_queue *queue;
46 };
47
48 /*
49   called when data is available from the child
50  */
51 static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
52 {
53         struct ctdb_traverse_local_handle *h = talloc_get_type(private_data, 
54                                                                struct ctdb_traverse_local_handle);
55         TDB_DATA key, data;
56         ctdb_traverse_fn_t callback = h->callback;
57         void *p = h->private_data;
58         struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
59
60         if (rawdata == NULL || length < 4 || length != tdata->length) {
61                 /* end of traverse */
62                 talloc_free(h);
63                 callback(p, tdb_null, tdb_null);
64                 return;
65         }
66
67         key.dsize = tdata->keylen;
68         key.dptr  = &tdata->data[0];
69         data.dsize = tdata->datalen;
70         data.dptr = &tdata->data[tdata->keylen];
71
72         callback(p, key, data); 
73 }
74
75 /*
76   destroy a in-flight traverse operation
77  */
78 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
79 {
80         DLIST_REMOVE(h->ctdb_db->traverse, h);
81         kill(h->child, SIGKILL);
82         return 0;
83 }
84
85 /*
86   callback from tdb_traverse_read()
87  */
88 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
89 {
90         struct ctdb_traverse_local_handle *h = talloc_get_type(p, 
91                                                                struct ctdb_traverse_local_handle);
92         struct ctdb_rec_data *d;
93         struct ctdb_ltdb_header *hdr;
94
95         hdr = (struct ctdb_ltdb_header *)data.dptr;
96
97         if (h->ctdb_db->persistent == 0) {
98                 /* filter out zero-length records */
99                 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
100                         return 0;
101                 }
102
103                 /* filter out non-authoritative records */
104                 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
105                         return 0;
106                 }
107         }
108
109         d = ctdb_marshall_record(h, 0, key, NULL, data);
110         if (d == NULL) {
111                 /* error handling is tricky in this child code .... */
112                 return -1;
113         }
114
115         if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
116                 return -1;
117         }
118         return 0;
119 }
120
121 struct traverse_all_state {
122         struct ctdb_context *ctdb;
123         struct ctdb_traverse_local_handle *h;
124         uint32_t reqid;
125         uint32_t srcnode;
126         uint32_t client_reqid;
127         uint64_t srvid;
128 };
129
130 /*
131   setup a non-blocking traverse of a local ltdb. The callback function
132   will be called on every record in the local ltdb. To stop the
133   travserse, talloc_free() the travserse_handle.
134
135   The traverse is finished when the callback is called with tdb_null for key and data
136  */
137 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
138                                                               ctdb_traverse_fn_t callback,
139                                                               struct traverse_all_state *all_state)
140 {
141         struct ctdb_traverse_local_handle *h;
142         int ret;
143
144         h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
145         if (h == NULL) {
146                 return NULL;
147         }
148
149         ret = pipe(h->fd);
150
151         if (ret != 0) {
152                 talloc_free(h);
153                 return NULL;
154         }
155
156         h->child = fork();
157
158         if (h->child == (pid_t)-1) {
159                 close(h->fd[0]);
160                 close(h->fd[1]);
161                 talloc_free(h);
162                 return NULL;
163         }
164
165         h->callback = callback;
166         h->private_data = all_state;
167         h->ctdb_db = ctdb_db;
168         h->client_reqid = all_state->client_reqid;
169         h->srvid = all_state->srvid;
170
171         if (h->child == 0) {
172                 /* start the traverse in the child */
173                 close(h->fd[0]);
174                 debug_extra = talloc_asprintf(NULL, "traverse_local-%s:",
175                                               ctdb_db->db_name);
176                 tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
177                 _exit(0);
178         }
179
180         close(h->fd[1]);
181         set_close_on_exec(h->fd[0]);
182
183         talloc_set_destructor(h, traverse_local_destructor);
184
185         DLIST_ADD(ctdb_db->traverse, h);
186
187         /*
188           setup a packet queue between the child and the parent. This
189           copes with all the async and packet boundary issues
190          */
191         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child traverse\n", h->fd[0]));
192
193         h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h,
194                                     "to-ctdbd");
195         if (h->queue == NULL) {
196                 talloc_free(h);
197                 return NULL;
198         }
199
200         h->start_time = timeval_current();
201
202         return h;
203 }
204
205
206 struct ctdb_traverse_all_handle {
207         struct ctdb_context *ctdb;
208         struct ctdb_db_context *ctdb_db;
209         uint32_t reqid;
210         ctdb_traverse_fn_t callback;
211         void *private_data;
212         uint32_t null_count;
213 };
214
215 /*
216   destroy a traverse_all op
217  */
218 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
219 {
220         ctdb_reqid_remove(state->ctdb, state->reqid);
221         return 0;
222 }
223
224 struct ctdb_traverse_all {
225         uint32_t db_id;
226         uint32_t reqid;
227         uint32_t pnn;
228         uint32_t client_reqid;
229         uint64_t srvid;
230 };
231
232 /* called when a traverse times out */
233 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te, 
234                                       struct timeval t, void *private_data)
235 {
236         struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
237
238         DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
239         state->ctdb->statistics.timeouts.traverse++;
240
241         state->callback(state->private_data, tdb_null, tdb_null);
242 }
243
244
245 struct traverse_start_state {
246         struct ctdb_context *ctdb;
247         struct ctdb_traverse_all_handle *h;
248         uint32_t srcnode;
249         uint32_t reqid;
250         uint32_t db_id;
251         uint64_t srvid;
252 };
253
254
255 /*
256   setup a cluster-wide non-blocking traverse of a ctdb. The
257   callback function will be called on every record in the local
258   ltdb. To stop the travserse, talloc_free() the traverse_handle.
259
260   The traverse is finished when the callback is called with tdb_null
261   for key and data
262  */
263 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
264                                                                  ctdb_traverse_fn_t callback,
265                                                                  struct traverse_start_state *start_state)
266 {
267         struct ctdb_traverse_all_handle *state;
268         struct ctdb_context *ctdb = ctdb_db->ctdb;
269         int ret;
270         TDB_DATA data;
271         struct ctdb_traverse_all r;
272         uint32_t destination;
273
274         state = talloc(start_state, struct ctdb_traverse_all_handle);
275         if (state == NULL) {
276                 return NULL;
277         }
278
279         state->ctdb         = ctdb;
280         state->ctdb_db      = ctdb_db;
281         state->reqid        = ctdb_reqid_new(ctdb_db->ctdb, state);
282         state->callback     = callback;
283         state->private_data = start_state;
284         state->null_count   = 0;
285         
286         talloc_set_destructor(state, ctdb_traverse_all_destructor);
287
288         r.db_id = ctdb_db->db_id;
289         r.reqid = state->reqid;
290         r.pnn   = ctdb->pnn;
291         r.client_reqid = start_state->reqid;
292         r.srvid = start_state->srvid;
293
294         data.dptr = (uint8_t *)&r;
295         data.dsize = sizeof(r);
296
297         if (ctdb_db->persistent == 0) {
298                 /* normal database, traverse all nodes */         
299                 destination = CTDB_BROADCAST_VNNMAP;
300         } else {
301                 int i;
302                 /* persistent database, traverse one node, preferably
303                  * the local one
304                  */
305                 destination = ctdb->pnn;
306                 /* check we are in the vnnmap */
307                 for (i=0; i < ctdb->vnn_map->size; i++) {
308                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
309                                 break;
310                         }
311                 }
312                 /* if we are not in the vnn map we just pick the first
313                  * node instead
314                  */
315                 if (i == ctdb->vnn_map->size) {
316                         destination = ctdb->vnn_map->map[0];
317                 }
318         }
319
320         /* tell all the nodes in the cluster to start sending records to this
321          * node, or if it is a persistent database, just tell the local
322          * node
323          */
324         ret = ctdb_daemon_send_control(ctdb, destination, 0, 
325                                CTDB_CONTROL_TRAVERSE_ALL,
326                                0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
327
328         if (ret != 0) {
329                 talloc_free(state);
330                 return NULL;
331         }
332
333         /* timeout the traverse */
334         event_add_timed(ctdb->ev, state, 
335                         timeval_current_ofs(ctdb->tunable.traverse_timeout, 0), 
336                         ctdb_traverse_all_timeout, state);
337
338         return state;
339 }
340
341 /*
342   called for each record during a traverse all 
343  */
344 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
345 {
346         struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
347         int ret;
348         struct ctdb_rec_data *d;
349         TDB_DATA cdata;
350
351         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
352         if (d == NULL) {
353                 /* darn .... */
354                 DEBUG(DEBUG_ERR,("Out of memory in traverse_all_callback\n"));
355                 return;
356         }
357
358         cdata.dptr = (uint8_t *)d;
359         cdata.dsize = d->length;
360
361         ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
362                                        0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
363         if (ret != 0) {
364                 DEBUG(DEBUG_ERR,("Failed to send traverse data\n"));
365         }
366
367         if (key.dsize == 0 && data.dsize == 0) {
368                 /* we're done */
369                 talloc_free(state);
370         }
371 }
372
373 /*
374   called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
375   setup a traverse of our local ltdb, sending the records as
376   CTDB_CONTROL_TRAVERSE_DATA records back to the originator
377  */
378 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
379 {
380         struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
381         struct traverse_all_state *state;
382         struct ctdb_db_context *ctdb_db;
383
384         if (data.dsize != sizeof(struct ctdb_traverse_all)) {
385                 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
386                 return -1;
387         }
388
389         ctdb_db = find_ctdb_db(ctdb, c->db_id);
390         if (ctdb_db == NULL) {
391                 return -1;
392         }
393
394         if (ctdb_db->unhealthy_reason) {
395                 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
396                         DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
397                                         ctdb_db->db_name, ctdb_db->unhealthy_reason));
398                         return -1;
399                 }
400                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
401                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
402         }
403
404         state = talloc(ctdb_db, struct traverse_all_state);
405         if (state == NULL) {
406                 return -1;
407         }
408
409         state->reqid = c->reqid;
410         state->srcnode = c->pnn;
411         state->ctdb = ctdb;
412         state->client_reqid = c->client_reqid;
413         state->srvid = c->srvid;
414
415         state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
416         if (state->h == NULL) {
417                 talloc_free(state);
418                 return -1;
419         }
420
421         return 0;
422 }
423
424
425 /*
426   called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
427   call the traverse_all callback with the record
428  */
429 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
430 {
431         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
432         struct ctdb_traverse_all_handle *state;
433         TDB_DATA key;
434         ctdb_traverse_fn_t callback;
435         void *private_data;
436
437         if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
438                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
439                 return -1;
440         }
441
442         state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
443         if (state == NULL || d->reqid != state->reqid) {
444                 /* traverse might have been terminated already */
445                 return -1;
446         }
447
448         key.dsize = d->keylen;
449         key.dptr  = &d->data[0];
450         data.dsize = d->datalen;
451         data.dptr = &d->data[d->keylen];
452
453         if (key.dsize == 0 && data.dsize == 0) {
454                 state->null_count++;
455                 /* Persistent databases are only scanned on one node (the local
456                  * node)
457                  */
458                 if (state->ctdb_db->persistent == 0) {
459                         if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
460                                 return 0;
461                         }
462                 }
463         }
464
465         callback = state->callback;
466         private_data = state->private_data;
467
468         callback(private_data, key, data);
469         return 0;
470 }       
471
472 /*
473   kill a in-progress traverse, used when a client disconnects
474  */
475 int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data, 
476                                    TDB_DATA *outdata, uint32_t srcnode)
477 {
478         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
479         struct ctdb_db_context *ctdb_db;
480         struct ctdb_traverse_local_handle *t;
481
482         ctdb_db = find_ctdb_db(ctdb, d->db_id);
483         if (ctdb_db == NULL) {
484                 return -1;
485         }
486
487         for (t=ctdb_db->traverse; t; t=t->next) {
488                 if (t->client_reqid == d->reqid &&
489                     t->srvid == d->srvid) {
490                         talloc_free(t);
491                         break;
492                 }
493         }
494
495         return 0;
496 }
497
498
499 /*
500   this is called when a client disconnects during a traverse
501   we need to notify all the nodes taking part in the search that they
502   should kill their traverse children
503  */
504 static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
505 {
506         struct ctdb_traverse_start r;
507         TDB_DATA data;
508
509         DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
510         r.db_id = state->db_id;
511         r.reqid = state->reqid;
512         r.srvid = state->srvid;
513
514         data.dptr = (uint8_t *)&r;
515         data.dsize = sizeof(r);
516
517         ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0, 
518                                  CTDB_CONTROL_TRAVERSE_KILL, 
519                                  0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
520         return 0;
521 }
522
523 /*
524   callback which sends records as messages to the client
525  */
526 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
527 {
528         struct traverse_start_state *state;
529         struct ctdb_rec_data *d;
530         TDB_DATA cdata;
531
532         state = talloc_get_type(p, struct traverse_start_state);
533
534         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
535         if (d == NULL) {
536                 return;
537         }
538
539         cdata.dptr = (uint8_t *)d;
540         cdata.dsize = d->length;
541
542         ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
543         if (key.dsize == 0 && data.dsize == 0) {
544                 /* end of traverse */
545                 talloc_set_destructor(state, NULL);
546                 talloc_free(state);
547         }
548 }
549
550
551 /*
552   start a traverse_all - called as a control from a client
553  */
554 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data, 
555                                     TDB_DATA *outdata, uint32_t srcnode, uint32_t client_id)
556 {
557         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
558         struct traverse_start_state *state;
559         struct ctdb_db_context *ctdb_db;
560         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
561
562         if (client == NULL) {
563                 DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
564                 return -1;              
565         }
566
567         if (data.dsize != sizeof(*d)) {
568                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
569                 return -1;
570         }
571
572         ctdb_db = find_ctdb_db(ctdb, d->db_id);
573         if (ctdb_db == NULL) {
574                 return -1;
575         }
576
577         if (ctdb_db->unhealthy_reason) {
578                 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
579                         DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
580                                         ctdb_db->db_name, ctdb_db->unhealthy_reason));
581                         return -1;
582                 }
583                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
584                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
585         }
586
587         state = talloc(client, struct traverse_start_state);
588         if (state == NULL) {
589                 return -1;
590         }
591         
592         state->srcnode = srcnode;
593         state->reqid = d->reqid;
594         state->srvid = d->srvid;
595         state->db_id = d->db_id;
596         state->ctdb = ctdb;
597
598         state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
599         if (state->h == NULL) {
600                 talloc_free(state);
601                 return -1;
602         }
603
604         talloc_set_destructor(state, ctdb_traverse_start_destructor);
605
606         return 0;
607 }