Merge commit 'origin/master'
[sahlberg/ctdb.git] / server / ctdb_traverse.c
1 /* 
2    efficient async ctdb traverse
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "db_wrap.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28
29 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
30
31 /*
32   handle returned to caller - freeing this handler will kill the child and 
33   terminate the traverse
34  */
35 struct ctdb_traverse_local_handle {
36         struct ctdb_traverse_local_handle *next, *prev;
37         struct ctdb_db_context *ctdb_db;
38         int fd[2];
39         pid_t child;
40         uint64_t srvid;
41         uint32_t client_reqid;
42         void *private_data;
43         ctdb_traverse_fn_t callback;
44         struct timeval start_time;
45         struct ctdb_queue *queue;
46 };
47
48 /*
49   called when data is available from the child
50  */
51 static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
52 {
53         struct ctdb_traverse_local_handle *h = talloc_get_type(private_data, 
54                                                                struct ctdb_traverse_local_handle);
55         TDB_DATA key, data;
56         ctdb_traverse_fn_t callback = h->callback;
57         void *p = h->private_data;
58         struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
59
60         if (rawdata == NULL || length < 4 || length != tdata->length) {
61                 /* end of traverse */
62                 talloc_free(h);
63                 callback(p, tdb_null, tdb_null);
64                 return;
65         }
66
67         key.dsize = tdata->keylen;
68         key.dptr  = &tdata->data[0];
69         data.dsize = tdata->datalen;
70         data.dptr = &tdata->data[tdata->keylen];
71
72         callback(p, key, data); 
73 }
74
75 /*
76   destroy a in-flight traverse operation
77  */
78 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
79 {
80         DLIST_REMOVE(h->ctdb_db->traverse, h);
81         kill(h->child, SIGKILL);
82         return 0;
83 }
84
85 /*
86   callback from tdb_traverse_read()
87  */
88 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
89 {
90         struct ctdb_traverse_local_handle *h = talloc_get_type(p, 
91                                                                struct ctdb_traverse_local_handle);
92         struct ctdb_rec_data *d;
93         struct ctdb_ltdb_header *hdr;
94
95         hdr = (struct ctdb_ltdb_header *)data.dptr;
96
97         if (h->ctdb_db->persistent == 0) {
98                 /* filter out zero-length records */
99                 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
100                         return 0;
101                 }
102
103                 /* filter out non-authoritative records */
104                 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
105                         return 0;
106                 }
107         }
108
109         d = ctdb_marshall_record(h, 0, key, NULL, data);
110         if (d == NULL) {
111                 /* error handling is tricky in this child code .... */
112                 return -1;
113         }
114
115         if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
116                 return -1;
117         }
118         return 0;
119 }
120
121 struct traverse_all_state {
122         struct ctdb_context *ctdb;
123         struct ctdb_traverse_local_handle *h;
124         uint32_t reqid;
125         uint32_t srcnode;
126         uint32_t client_reqid;
127         uint64_t srvid;
128 };
129
130 /*
131   setup a non-blocking traverse of a local ltdb. The callback function
132   will be called on every record in the local ltdb. To stop the
133   travserse, talloc_free() the travserse_handle.
134
135   The traverse is finished when the callback is called with tdb_null for key and data
136  */
137 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
138                                                               ctdb_traverse_fn_t callback,
139                                                               struct traverse_all_state *all_state)
140 {
141         struct ctdb_traverse_local_handle *h;
142         int ret;
143
144         h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
145         if (h == NULL) {
146                 return NULL;
147         }
148
149         ret = pipe(h->fd);
150
151         if (ret != 0) {
152                 talloc_free(h);
153                 return NULL;
154         }
155
156         h->child = fork();
157
158         if (h->child == (pid_t)-1) {
159                 close(h->fd[0]);
160                 close(h->fd[1]);
161                 talloc_free(h);
162                 return NULL;
163         }
164
165         h->callback = callback;
166         h->private_data = all_state;
167         h->ctdb_db = ctdb_db;
168         h->client_reqid = all_state->client_reqid;
169         h->srvid = all_state->srvid;
170
171         if (h->child == 0) {
172                 /* start the traverse in the child */
173                 close(h->fd[0]);
174                 tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
175                 _exit(0);
176         }
177
178         close(h->fd[1]);
179         set_close_on_exec(h->fd[0]);
180
181         talloc_set_destructor(h, traverse_local_destructor);
182
183         DLIST_ADD(ctdb_db->traverse, h);
184
185         /*
186           setup a packet queue between the child and the parent. This
187           copes with all the async and packet boundary issues
188          */
189         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to child traverse\n", h->fd[0]));
190
191         h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
192         if (h->queue == NULL) {
193                 talloc_free(h);
194                 return NULL;
195         }
196
197         h->start_time = timeval_current();
198
199         return h;
200 }
201
202
203 struct ctdb_traverse_all_handle {
204         struct ctdb_context *ctdb;
205         struct ctdb_db_context *ctdb_db;
206         uint32_t reqid;
207         ctdb_traverse_fn_t callback;
208         void *private_data;
209         uint32_t null_count;
210 };
211
212 /*
213   destroy a traverse_all op
214  */
215 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
216 {
217         ctdb_reqid_remove(state->ctdb, state->reqid);
218         return 0;
219 }
220
221 struct ctdb_traverse_all {
222         uint32_t db_id;
223         uint32_t reqid;
224         uint32_t pnn;
225         uint32_t client_reqid;
226         uint64_t srvid;
227 };
228
229 /* called when a traverse times out */
230 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te, 
231                                       struct timeval t, void *private_data)
232 {
233         struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
234
235         DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
236         state->ctdb->statistics.timeouts.traverse++;
237
238         state->callback(state->private_data, tdb_null, tdb_null);
239 }
240
241
242 struct traverse_start_state {
243         struct ctdb_context *ctdb;
244         struct ctdb_traverse_all_handle *h;
245         uint32_t srcnode;
246         uint32_t reqid;
247         uint32_t db_id;
248         uint64_t srvid;
249 };
250
251
252 /*
253   setup a cluster-wide non-blocking traverse of a ctdb. The
254   callback function will be called on every record in the local
255   ltdb. To stop the travserse, talloc_free() the traverse_handle.
256
257   The traverse is finished when the callback is called with tdb_null
258   for key and data
259  */
260 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
261                                                                  ctdb_traverse_fn_t callback,
262                                                                  struct traverse_start_state *start_state)
263 {
264         struct ctdb_traverse_all_handle *state;
265         struct ctdb_context *ctdb = ctdb_db->ctdb;
266         int ret;
267         TDB_DATA data;
268         struct ctdb_traverse_all r;
269         uint32_t destination;
270
271         state = talloc(start_state, struct ctdb_traverse_all_handle);
272         if (state == NULL) {
273                 return NULL;
274         }
275
276         state->ctdb         = ctdb;
277         state->ctdb_db      = ctdb_db;
278         state->reqid        = ctdb_reqid_new(ctdb_db->ctdb, state);
279         state->callback     = callback;
280         state->private_data = start_state;
281         state->null_count   = 0;
282         
283         talloc_set_destructor(state, ctdb_traverse_all_destructor);
284
285         r.db_id = ctdb_db->db_id;
286         r.reqid = state->reqid;
287         r.pnn   = ctdb->pnn;
288         r.client_reqid = start_state->reqid;
289         r.srvid = start_state->srvid;
290
291         data.dptr = (uint8_t *)&r;
292         data.dsize = sizeof(r);
293
294         if (ctdb_db->persistent == 0) {
295                 /* normal database, traverse all nodes */         
296                 destination = CTDB_BROADCAST_VNNMAP;
297         } else {
298                 int i;
299                 /* persistent database, traverse one node, preferably
300                  * the local one
301                  */
302                 destination = ctdb->pnn;
303                 /* check we are in the vnnmap */
304                 for (i=0; i < ctdb->vnn_map->size; i++) {
305                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
306                                 break;
307                         }
308                 }
309                 /* if we are not in the vnn map we just pick the first
310                  * node instead
311                  */
312                 if (i == ctdb->vnn_map->size) {
313                         destination = ctdb->vnn_map->map[0];
314                 }
315         }
316
317         /* tell all the nodes in the cluster to start sending records to this
318          * node, or if it is a persistent database, just tell the local
319          * node
320          */
321         ret = ctdb_daemon_send_control(ctdb, destination, 0, 
322                                CTDB_CONTROL_TRAVERSE_ALL,
323                                0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
324
325         if (ret != 0) {
326                 talloc_free(state);
327                 return NULL;
328         }
329
330         /* timeout the traverse */
331         event_add_timed(ctdb->ev, state, 
332                         timeval_current_ofs(ctdb->tunable.traverse_timeout, 0), 
333                         ctdb_traverse_all_timeout, state);
334
335         return state;
336 }
337
338 /*
339   called for each record during a traverse all 
340  */
341 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
342 {
343         struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
344         int ret;
345         struct ctdb_rec_data *d;
346         TDB_DATA cdata;
347
348         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
349         if (d == NULL) {
350                 /* darn .... */
351                 DEBUG(DEBUG_ERR,("Out of memory in traverse_all_callback\n"));
352                 return;
353         }
354
355         cdata.dptr = (uint8_t *)d;
356         cdata.dsize = d->length;
357
358         ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
359                                        0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
360         if (ret != 0) {
361                 DEBUG(DEBUG_ERR,("Failed to send traverse data\n"));
362         }
363
364         if (key.dsize == 0 && data.dsize == 0) {
365                 /* we're done */
366                 talloc_free(state);
367         }
368 }
369
370 /*
371   called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
372   setup a traverse of our local ltdb, sending the records as
373   CTDB_CONTROL_TRAVERSE_DATA records back to the originator
374  */
375 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
376 {
377         struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
378         struct traverse_all_state *state;
379         struct ctdb_db_context *ctdb_db;
380
381         if (data.dsize != sizeof(struct ctdb_traverse_all)) {
382                 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
383                 return -1;
384         }
385
386         ctdb_db = find_ctdb_db(ctdb, c->db_id);
387         if (ctdb_db == NULL) {
388                 return -1;
389         }
390
391         state = talloc(ctdb_db, struct traverse_all_state);
392         if (state == NULL) {
393                 return -1;
394         }
395
396         state->reqid = c->reqid;
397         state->srcnode = c->pnn;
398         state->ctdb = ctdb;
399         state->client_reqid = c->client_reqid;
400         state->srvid = c->srvid;
401
402         state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
403         if (state->h == NULL) {
404                 talloc_free(state);
405                 return -1;
406         }
407
408         return 0;
409 }
410
411
412 /*
413   called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
414   call the traverse_all callback with the record
415  */
416 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
417 {
418         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
419         struct ctdb_traverse_all_handle *state;
420         TDB_DATA key;
421         ctdb_traverse_fn_t callback;
422         void *private_data;
423
424         if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
425                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
426                 return -1;
427         }
428
429         state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
430         if (state == NULL || d->reqid != state->reqid) {
431                 /* traverse might have been terminated already */
432                 return -1;
433         }
434
435         key.dsize = d->keylen;
436         key.dptr  = &d->data[0];
437         data.dsize = d->datalen;
438         data.dptr = &d->data[d->keylen];
439
440         if (key.dsize == 0 && data.dsize == 0) {
441                 state->null_count++;
442                 /* Persistent databases are only scanned on one node (the local
443                  * node)
444                  */
445                 if (state->ctdb_db->persistent == 0) {
446                         if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
447                                 return 0;
448                         }
449                 }
450         }
451
452         callback = state->callback;
453         private_data = state->private_data;
454
455         callback(private_data, key, data);
456         return 0;
457 }       
458
459 /*
460   kill a in-progress traverse, used when a client disconnects
461  */
462 int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data, 
463                                    TDB_DATA *outdata, uint32_t srcnode)
464 {
465         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
466         struct ctdb_db_context *ctdb_db;
467         struct ctdb_traverse_local_handle *t;
468
469         ctdb_db = find_ctdb_db(ctdb, d->db_id);
470         if (ctdb_db == NULL) {
471                 return -1;
472         }
473
474         for (t=ctdb_db->traverse; t; t=t->next) {
475                 if (t->client_reqid == d->reqid &&
476                     t->srvid == d->srvid) {
477                         talloc_free(t);
478                         break;
479                 }
480         }
481
482         return 0;
483 }
484
485
486 /*
487   this is called when a client disconnects during a traverse
488   we need to notify all the nodes taking part in the search that they
489   should kill their traverse children
490  */
491 static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
492 {
493         struct ctdb_traverse_start r;
494         TDB_DATA data;
495
496         DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
497         r.db_id = state->db_id;
498         r.reqid = state->reqid;
499         r.srvid = state->srvid;
500
501         data.dptr = (uint8_t *)&r;
502         data.dsize = sizeof(r);
503
504         ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0, 
505                                  CTDB_CONTROL_TRAVERSE_KILL, 
506                                  0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
507         return 0;
508 }
509
510 /*
511   callback which sends records as messages to the client
512  */
513 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
514 {
515         struct traverse_start_state *state;
516         struct ctdb_rec_data *d;
517         TDB_DATA cdata;
518
519         state = talloc_get_type(p, struct traverse_start_state);
520
521         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
522         if (d == NULL) {
523                 return;
524         }
525
526         cdata.dptr = (uint8_t *)d;
527         cdata.dsize = d->length;
528
529         ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
530         if (key.dsize == 0 && data.dsize == 0) {
531                 /* end of traverse */
532                 talloc_set_destructor(state, NULL);
533                 talloc_free(state);
534         }
535 }
536
537
538 /*
539   start a traverse_all - called as a control from a client
540  */
541 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data, 
542                                     TDB_DATA *outdata, uint32_t srcnode, uint32_t client_id)
543 {
544         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
545         struct traverse_start_state *state;
546         struct ctdb_db_context *ctdb_db;
547         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
548
549         if (client == NULL) {
550                 DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
551                 return -1;              
552         }
553
554         if (data.dsize != sizeof(*d)) {
555                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
556                 return -1;
557         }
558
559         ctdb_db = find_ctdb_db(ctdb, d->db_id);
560         if (ctdb_db == NULL) {
561                 return -1;
562         }
563
564         state = talloc(client, struct traverse_start_state);
565         if (state == NULL) {
566                 return -1;
567         }
568         
569         state->srcnode = srcnode;
570         state->reqid = d->reqid;
571         state->srvid = d->srvid;
572         state->db_id = d->db_id;
573         state->ctdb = ctdb;
574
575         state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
576         if (state->h == NULL) {
577                 talloc_free(state);
578                 return -1;
579         }
580
581         talloc_set_destructor(state, ctdb_traverse_start_destructor);
582
583         return 0;
584 }