Merge commit 'ronnie/master'
[sahlberg/ctdb.git] / server / ctdb_traverse.c
1 /* 
2    efficient async ctdb traverse
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "db_wrap.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27
28 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
29
30 /*
31   handle returned to caller - freeing this handler will kill the child and 
32   terminate the traverse
33  */
34 struct ctdb_traverse_local_handle {
35         struct ctdb_db_context *ctdb_db;
36         int fd[2];
37         pid_t child;
38         void *private_data;
39         ctdb_traverse_fn_t callback;
40         struct timeval start_time;
41         struct ctdb_queue *queue;
42 };
43
44 /*
45   called when data is available from the child
46  */
47 static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
48 {
49         struct ctdb_traverse_local_handle *h = talloc_get_type(private_data, 
50                                                                struct ctdb_traverse_local_handle);
51         TDB_DATA key, data;
52         ctdb_traverse_fn_t callback = h->callback;
53         void *p = h->private_data;
54         struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
55
56         if (rawdata == NULL || length < 4 || length != tdata->length) {
57                 /* end of traverse */
58                 talloc_free(h);
59                 callback(p, tdb_null, tdb_null);
60                 return;
61         }
62
63         key.dsize = tdata->keylen;
64         key.dptr  = &tdata->data[0];
65         data.dsize = tdata->datalen;
66         data.dptr = &tdata->data[tdata->keylen];
67
68         callback(p, key, data); 
69 }
70
71 /*
72   destroy a in-flight traverse operation
73  */
74 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
75 {
76         kill(h->child, SIGKILL);
77         return 0;
78 }
79
80 /*
81   callback from tdb_traverse_read()
82  */
83 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
84 {
85         struct ctdb_traverse_local_handle *h = talloc_get_type(p, 
86                                                                struct ctdb_traverse_local_handle);
87         struct ctdb_rec_data *d;
88         struct ctdb_ltdb_header *hdr;
89
90         
91         hdr = (struct ctdb_ltdb_header *)data.dptr;
92
93         if (h->ctdb_db->persistent == 0) {
94                 /* filter out zero-length records */
95                 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
96                         return 0;
97                 }
98
99                 /* filter out non-authoritative records */
100                 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
101                         return 0;
102                 }
103         }
104
105         d = ctdb_marshall_record(h, 0, key, NULL, data);
106         if (d == NULL) {
107                 /* error handling is tricky in this child code .... */
108                 return -1;
109         }
110
111         if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
112                 return -1;
113         }
114         return 0;
115 }
116
117
118 /*
119   setup a non-blocking traverse of a local ltdb. The callback function
120   will be called on every record in the local ltdb. To stop the
121   travserse, talloc_free() the travserse_handle.
122
123   The traverse is finished when the callback is called with tdb_null for key and data
124  */
125 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
126                                                               ctdb_traverse_fn_t callback,
127                                                               void *private_data)
128 {
129         struct ctdb_traverse_local_handle *h;
130         int ret;
131
132         h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle);
133         if (h == NULL) {
134                 return NULL;
135         }
136
137         ret = pipe(h->fd);
138
139         if (ret != 0) {
140                 talloc_free(h);
141                 return NULL;
142         }
143
144         h->child = fork();
145
146         if (h->child == (pid_t)-1) {
147                 close(h->fd[0]);
148                 close(h->fd[1]);
149                 talloc_free(h);
150                 return NULL;
151         }
152
153         h->callback = callback;
154         h->private_data = private_data;
155         h->ctdb_db = ctdb_db;
156
157         if (h->child == 0) {
158                 /* start the traverse in the child */
159                 close(h->fd[0]);
160                 tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
161                 _exit(0);
162         }
163
164         close(h->fd[1]);
165         talloc_set_destructor(h, traverse_local_destructor);
166
167         /*
168           setup a packet queue between the child and the parent. This
169           copes with all the async and packet boundary issues
170          */
171         h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
172         if (h->queue == NULL) {
173                 talloc_free(h);
174                 return NULL;
175         }
176
177         h->start_time = timeval_current();
178
179         return h;
180 }
181
182
183 struct ctdb_traverse_all_handle {
184         struct ctdb_context *ctdb;
185         struct ctdb_db_context *ctdb_db;
186         uint32_t reqid;
187         ctdb_traverse_fn_t callback;
188         void *private_data;
189         uint32_t null_count;
190 };
191
192 /*
193   destroy a traverse_all op
194  */
195 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
196 {
197         ctdb_reqid_remove(state->ctdb, state->reqid);
198         return 0;
199 }
200
201 struct ctdb_traverse_all {
202         uint32_t db_id;
203         uint32_t reqid;
204         uint32_t pnn;
205 };
206
207 /* called when a traverse times out */
208 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te, 
209                                       struct timeval t, void *private_data)
210 {
211         struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
212
213         state->ctdb->statistics.timeouts.traverse++;
214
215         state->callback(state->private_data, tdb_null, tdb_null);
216         talloc_free(state);
217 }
218
219 /*
220   setup a cluster-wide non-blocking traverse of a ctdb. The
221   callback function will be called on every record in the local
222   ltdb. To stop the travserse, talloc_free() the traverse_handle.
223
224   The traverse is finished when the callback is called with tdb_null
225   for key and data
226  */
227 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
228                                                                  ctdb_traverse_fn_t callback,
229                                                                  void *private_data)
230 {
231         struct ctdb_traverse_all_handle *state;
232         struct ctdb_context *ctdb = ctdb_db->ctdb;
233         int ret;
234         TDB_DATA data;
235         struct ctdb_traverse_all r;
236         uint32_t destination;
237
238         state = talloc(ctdb_db, struct ctdb_traverse_all_handle);
239         if (state == NULL) {
240                 return NULL;
241         }
242
243         state->ctdb         = ctdb;
244         state->ctdb_db      = ctdb_db;
245         state->reqid        = ctdb_reqid_new(ctdb_db->ctdb, state);
246         state->callback     = callback;
247         state->private_data = private_data;
248         state->null_count   = 0;
249         
250         talloc_set_destructor(state, ctdb_traverse_all_destructor);
251
252         r.db_id = ctdb_db->db_id;
253         r.reqid = state->reqid;
254         r.pnn   = ctdb->pnn;
255
256         data.dptr = (uint8_t *)&r;
257         data.dsize = sizeof(r);
258
259         if (ctdb_db->persistent == 0) {
260                 /* normal database, traverse all nodes */         
261                 destination = CTDB_BROADCAST_VNNMAP;
262         } else {
263                 int i;
264                 /* persistent database, traverse one node, preferably
265                  * the local one
266                  */
267                 destination = ctdb->pnn;
268                 /* check we are in the vnnmap */
269                 for (i=0; i < ctdb->vnn_map->size; i++) {
270                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
271                                 break;
272                         }
273                 }
274                 /* if we are not in the vnn map we just pick the first
275                  * node instead
276                  */
277                 if (i == ctdb->vnn_map->size) {
278                         destination = ctdb->vnn_map->map[0];
279                 }
280         }
281
282         /* tell all the nodes in the cluster to start sending records to this
283          * node, or if it is a persistent database, just tell the local
284          * node
285          */
286         ret = ctdb_daemon_send_control(ctdb, destination, 0, 
287                                CTDB_CONTROL_TRAVERSE_ALL,
288                                0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
289
290         if (ret != 0) {
291                 talloc_free(state);
292                 return NULL;
293         }
294
295         /* timeout the traverse */
296         event_add_timed(ctdb->ev, state, 
297                         timeval_current_ofs(ctdb->tunable.traverse_timeout, 0), 
298                         ctdb_traverse_all_timeout, state);
299
300         return state;
301 }
302
303 struct traverse_all_state {
304         struct ctdb_context *ctdb;
305         struct ctdb_traverse_local_handle *h;
306         uint32_t reqid;
307         uint32_t srcnode;
308 };
309
310 /*
311   called for each record during a traverse all 
312  */
313 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
314 {
315         struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
316         int ret;
317         struct ctdb_rec_data *d;
318         TDB_DATA cdata;
319
320         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
321         if (d == NULL) {
322                 /* darn .... */
323                 DEBUG(DEBUG_ERR,("Out of memory in traverse_all_callback\n"));
324                 return;
325         }
326
327         cdata.dptr = (uint8_t *)d;
328         cdata.dsize = d->length;
329
330         ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
331                                        0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
332         if (ret != 0) {
333                 DEBUG(DEBUG_ERR,("Failed to send traverse data\n"));
334         }
335
336         if (key.dsize == 0 && data.dsize == 0) {
337                 /* we're done */
338                 talloc_free(state);
339         }
340 }
341
342 /*
343   called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
344   setup a traverse of our local ltdb, sending the records as
345   CTDB_CONTROL_TRAVERSE_DATA records back to the originator
346  */
347 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
348 {
349         struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
350         struct traverse_all_state *state;
351         struct ctdb_db_context *ctdb_db;
352
353         if (data.dsize != sizeof(struct ctdb_traverse_all)) {
354                 DEBUG(DEBUG_ERR,("Invalid size in ctdb_control_traverse_all\n"));
355                 return -1;
356         }
357
358         ctdb_db = find_ctdb_db(ctdb, c->db_id);
359         if (ctdb_db == NULL) {
360                 return -1;
361         }
362
363         state = talloc(ctdb_db, struct traverse_all_state);
364         if (state == NULL) {
365                 return -1;
366         }
367
368         state->reqid = c->reqid;
369         state->srcnode = c->pnn;
370         state->ctdb = ctdb;
371
372         state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
373         if (state->h == NULL) {
374                 talloc_free(state);
375                 return -1;
376         }
377
378         return 0;
379 }
380
381
382 /*
383   called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
384   call the traverse_all callback with the record
385  */
386 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
387 {
388         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
389         struct ctdb_traverse_all_handle *state;
390         TDB_DATA key;
391         ctdb_traverse_fn_t callback;
392         void *private_data;
393
394         if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
395                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
396                 return -1;
397         }
398
399         state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
400         if (state == NULL || d->reqid != state->reqid) {
401                 /* traverse might have been terminated already */
402                 return -1;
403         }
404
405         key.dsize = d->keylen;
406         key.dptr  = &d->data[0];
407         data.dsize = d->datalen;
408         data.dptr = &d->data[d->keylen];
409
410         if (key.dsize == 0 && data.dsize == 0) {
411                 state->null_count++;
412                 /* Persistent databases are only scanned on one node (the local
413                  * node)
414                  */
415                 if (state->ctdb_db->persistent == 0) {
416                         if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
417                                 return 0;
418                         }
419                 }
420         }
421
422         callback = state->callback;
423         private_data = state->private_data;
424
425         callback(private_data, key, data);
426         if (key.dsize == 0 && data.dsize == 0) {
427                 /* we've received all of the null replies, so all
428                    nodes are finished */
429                 talloc_free(state);
430         }
431         return 0;
432 }       
433
434 struct traverse_start_state {
435         struct ctdb_context *ctdb;
436         struct ctdb_traverse_all_handle *h;
437         uint32_t srcnode;
438         uint32_t reqid;
439         uint64_t srvid;
440 };
441
442 /*
443   callback which sends records as messages to the client
444  */
445 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
446 {
447         struct traverse_start_state *state;
448         struct ctdb_rec_data *d;
449         TDB_DATA cdata;
450
451         state = talloc_get_type(p, struct traverse_start_state);
452
453         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
454         if (d == NULL) {
455                 return;
456         }
457
458         cdata.dptr = (uint8_t *)d;
459         cdata.dsize = d->length;
460
461         ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
462         if (key.dsize == 0 && data.dsize == 0) {
463                 /* end of traverse */
464                 talloc_free(state);
465         }
466 }
467
468 /*
469   start a traverse_all - called as a control from a client
470  */
471 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data, 
472                                     TDB_DATA *outdata, uint32_t srcnode)
473 {
474         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
475         struct traverse_start_state *state;
476         struct ctdb_db_context *ctdb_db;
477
478         if (data.dsize != sizeof(*d)) {
479                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
480                 return -1;
481         }
482
483         ctdb_db = find_ctdb_db(ctdb, d->db_id);
484         if (ctdb_db == NULL) {
485                 return -1;
486         }
487
488         state = talloc(ctdb_db, struct traverse_start_state);
489         if (state == NULL) {
490                 return -1;
491         }
492         
493         state->srcnode = srcnode;
494         state->reqid = d->reqid;
495         state->srvid = d->srvid;
496         state->ctdb = ctdb;
497
498         state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
499         if (state->h == NULL) {
500                 talloc_free(state);
501                 return -1;
502         }
503
504         return 0;
505 }