update lib/replace from samba4
[vlendec/samba-autobuild/.git] / ctdb / server / ctdb_traverse.c
1 /* 
2    efficient async ctdb traverse
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "db_wrap.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27
28 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
29
30 /*
31   handle returned to caller - freeing this handler will kill the child and 
32   terminate the traverse
33  */
34 struct ctdb_traverse_local_handle {
35         struct ctdb_db_context *ctdb_db;
36         int fd[2];
37         pid_t child;
38         void *private_data;
39         ctdb_traverse_fn_t callback;
40         struct timeval start_time;
41         struct ctdb_queue *queue;
42 };
43
44 /*
45   called when data is available from the child
46  */
47 static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
48 {
49         struct ctdb_traverse_local_handle *h = talloc_get_type(private_data, 
50                                                                struct ctdb_traverse_local_handle);
51         TDB_DATA key, data;
52         ctdb_traverse_fn_t callback = h->callback;
53         void *p = h->private_data;
54         struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
55
56         if (rawdata == NULL || length < 4 || length != tdata->length) {
57                 /* end of traverse */
58                 talloc_free(h);
59                 callback(p, tdb_null, tdb_null);
60                 return;
61         }
62
63         key.dsize = tdata->keylen;
64         key.dptr  = &tdata->data[0];
65         data.dsize = tdata->datalen;
66         data.dptr = &tdata->data[tdata->keylen];
67
68         callback(p, key, data); 
69 }
70
71 /*
72   destroy a in-flight traverse operation
73  */
74 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
75 {
76         kill(h->child, SIGKILL);
77         waitpid(h->child, NULL, 0);
78         return 0;
79 }
80
81 /*
82   callback from tdb_traverse_read()
83  */
84 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
85 {
86         struct ctdb_traverse_local_handle *h = talloc_get_type(p, 
87                                                                struct ctdb_traverse_local_handle);
88         struct ctdb_rec_data *d;
89         struct ctdb_ltdb_header *hdr;
90
91         /* filter out non-authoritative and zero-length records */
92         hdr = (struct ctdb_ltdb_header *)data.dptr;
93         if (data.dsize <= sizeof(struct ctdb_ltdb_header) ||
94             hdr->dmaster != h->ctdb_db->ctdb->vnn) {
95                 return 0;
96         }
97
98         d = ctdb_marshall_record(h, 0, key, data);
99         if (d == NULL) {
100                 /* error handling is tricky in this child code .... */
101                 return -1;
102         }
103
104         if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
105                 return -1;
106         }
107         return 0;
108 }
109
110
111 /*
112   setup a non-blocking traverse of a local ltdb. The callback function
113   will be called on every record in the local ltdb. To stop the
114   travserse, talloc_free() the travserse_handle.
115
116   The traverse is finished when the callback is called with tdb_null for key and data
117  */
118 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
119                                                               ctdb_traverse_fn_t callback,
120                                                               void *private_data)
121 {
122         struct ctdb_traverse_local_handle *h;
123         int ret;
124
125         h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle);
126         if (h == NULL) {
127                 return NULL;
128         }
129
130         ret = pipe(h->fd);
131
132         if (ret != 0) {
133                 talloc_free(h);
134                 return NULL;
135         }
136
137         h->child = fork();
138
139         if (h->child == (pid_t)-1) {
140                 close(h->fd[0]);
141                 close(h->fd[1]);
142                 talloc_free(h);
143                 return NULL;
144         }
145
146         h->callback = callback;
147         h->private_data = private_data;
148         h->ctdb_db = ctdb_db;
149
150         if (h->child == 0) {
151                 /* start the traverse in the child */
152                 close(h->fd[0]);
153                 tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
154                 _exit(0);
155         }
156
157         close(h->fd[1]);
158         talloc_set_destructor(h, traverse_local_destructor);
159
160         /*
161           setup a packet queue between the child and the parent. This
162           copes with all the async and packet boundary issues
163          */
164         h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
165         if (h->queue == NULL) {
166                 talloc_free(h);
167                 return NULL;
168         }
169
170         h->start_time = timeval_current();
171
172         return h;
173 }
174
175
176 struct ctdb_traverse_all_handle {
177         struct ctdb_context *ctdb;
178         uint32_t reqid;
179         ctdb_traverse_fn_t callback;
180         void *private_data;
181         uint32_t null_count;
182 };
183
184 /*
185   destroy a traverse_all op
186  */
187 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
188 {
189         ctdb_reqid_remove(state->ctdb, state->reqid);
190         return 0;
191 }
192
193 struct ctdb_traverse_all {
194         uint32_t db_id;
195         uint32_t reqid;
196         uint32_t vnn;
197 };
198
199 /* called when a traverse times out */
200 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te, 
201                                       struct timeval t, void *private_data)
202 {
203         struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
204
205         state->ctdb->statistics.timeouts.traverse++;
206
207         state->callback(state->private_data, tdb_null, tdb_null);
208         talloc_free(state);
209 }
210
211 /*
212   setup a cluster-wide non-blocking traverse of a ctdb. The
213   callback function will be called on every record in the local
214   ltdb. To stop the travserse, talloc_free() the traverse_handle.
215
216   The traverse is finished when the callback is called with tdb_null
217   for key and data
218  */
219 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
220                                                                  ctdb_traverse_fn_t callback,
221                                                                  void *private_data)
222 {
223         struct ctdb_traverse_all_handle *state;
224         struct ctdb_context *ctdb = ctdb_db->ctdb;
225         int ret;
226         TDB_DATA data;
227         struct ctdb_traverse_all r;
228
229         state = talloc(ctdb_db, struct ctdb_traverse_all_handle);
230         if (state == NULL) {
231                 return NULL;
232         }
233
234         state->ctdb = ctdb;
235         state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
236         state->callback = callback;
237         state->private_data = private_data;
238         state->null_count = 0;
239         
240         talloc_set_destructor(state, ctdb_traverse_all_destructor);
241
242         r.db_id = ctdb_db->db_id;
243         r.reqid = state->reqid;
244         r.vnn   = ctdb->vnn;
245
246         data.dptr = (uint8_t *)&r;
247         data.dsize = sizeof(r);
248
249         /* tell all the nodes in the cluster to start sending records to this node */
250         ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0, 
251                                        CTDB_CONTROL_TRAVERSE_ALL,
252                                        0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
253         if (ret != 0) {
254                 talloc_free(state);
255                 return NULL;
256         }
257
258         /* timeout the traverse */
259         event_add_timed(ctdb->ev, state, 
260                         timeval_current_ofs(ctdb->tunable.traverse_timeout, 0), 
261                         ctdb_traverse_all_timeout, state);
262
263         return state;
264 }
265
266 struct traverse_all_state {
267         struct ctdb_context *ctdb;
268         struct ctdb_traverse_local_handle *h;
269         uint32_t reqid;
270         uint32_t srcnode;
271 };
272
273 /*
274   called for each record during a traverse all 
275  */
276 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
277 {
278         struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
279         int ret;
280         struct ctdb_rec_data *d;
281         TDB_DATA cdata;
282
283         d = ctdb_marshall_record(state, state->reqid, key, data);
284         if (d == NULL) {
285                 /* darn .... */
286                 DEBUG(0,("Out of memory in traverse_all_callback\n"));
287                 return;
288         }
289
290         cdata.dptr = (uint8_t *)d;
291         cdata.dsize = d->length;
292
293         ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
294                                        0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
295         if (ret != 0) {
296                 DEBUG(0,("Failed to send traverse data\n"));
297         }
298
299         if (key.dsize == 0 && data.dsize == 0) {
300                 /* we're done */
301                 talloc_free(state);
302         }
303 }
304
305 /*
306   called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
307   setup a traverse of our local ltdb, sending the records as
308   CTDB_CONTROL_TRAVERSE_DATA records back to the originator
309  */
310 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
311 {
312         struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
313         struct traverse_all_state *state;
314         struct ctdb_db_context *ctdb_db;
315
316         if (data.dsize != sizeof(struct ctdb_traverse_all)) {
317                 DEBUG(0,("Invalid size in ctdb_control_traverse_all\n"));
318                 return -1;
319         }
320
321         ctdb_db = find_ctdb_db(ctdb, c->db_id);
322         if (ctdb_db == NULL) {
323                 return -1;
324         }
325
326         state = talloc(ctdb_db, struct traverse_all_state);
327         if (state == NULL) {
328                 return -1;
329         }
330
331         state->reqid = c->reqid;
332         state->srcnode = c->vnn;
333         state->ctdb = ctdb;
334
335         state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
336         if (state->h == NULL) {
337                 talloc_free(state);
338                 return -1;
339         }
340
341         return 0;
342 }
343
344
345 /*
346   called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
347   call the traverse_all callback with the record
348  */
349 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
350 {
351         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
352         struct ctdb_traverse_all_handle *state;
353         TDB_DATA key;
354         ctdb_traverse_fn_t callback;
355         void *private_data;
356
357         if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
358                 DEBUG(0,("Bad record size in ctdb_control_traverse_data\n"));
359                 return -1;
360         }
361
362         state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
363         if (state == NULL || d->reqid != state->reqid) {
364                 /* traverse might have been terminated already */
365                 return -1;
366         }
367
368         key.dsize = d->keylen;
369         key.dptr  = &d->data[0];
370         data.dsize = d->datalen;
371         data.dptr = &d->data[d->keylen];
372
373         if (key.dsize == 0 && data.dsize == 0) {
374                 state->null_count++;
375                 if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
376                         return 0;
377                 }
378         }
379
380         callback = state->callback;
381         private_data = state->private_data;
382
383         callback(private_data, key, data);
384         if (key.dsize == 0 && data.dsize == 0) {
385                 /* we've received all of the null replies, so all
386                    nodes are finished */
387                 talloc_free(state);
388         }
389         return 0;
390 }       
391
392 struct traverse_start_state {
393         struct ctdb_context *ctdb;
394         struct ctdb_traverse_all_handle *h;
395         uint32_t srcnode;
396         uint32_t reqid;
397         uint64_t srvid;
398 };
399
400 /*
401   callback which sends records as messages to the client
402  */
403 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
404 {
405         struct traverse_start_state *state;
406         struct ctdb_rec_data *d;
407         TDB_DATA cdata;
408
409         state = talloc_get_type(p, struct traverse_start_state);
410
411         d = ctdb_marshall_record(state, state->reqid, key, data);
412         if (d == NULL) {
413                 return;
414         }
415
416         cdata.dptr = (uint8_t *)d;
417         cdata.dsize = d->length;
418
419         ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
420         if (key.dsize == 0 && data.dsize == 0) {
421                 /* end of traverse */
422                 talloc_free(state);
423         }
424 }
425
426 /*
427   start a traverse_all - called as a control from a client
428  */
429 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data, 
430                                     TDB_DATA *outdata, uint32_t srcnode)
431 {
432         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
433         struct traverse_start_state *state;
434         struct ctdb_db_context *ctdb_db;
435
436         if (data.dsize != sizeof(*d)) {
437                 DEBUG(0,("Bad record size in ctdb_control_traverse_start\n"));
438                 return -1;
439         }
440
441         ctdb_db = find_ctdb_db(ctdb, d->db_id);
442         if (ctdb_db == NULL) {
443                 return -1;
444         }
445
446         state = talloc(ctdb_db, struct traverse_start_state);
447         if (state == NULL) {
448                 return -1;
449         }
450         
451         state->srcnode = srcnode;
452         state->reqid = d->reqid;
453         state->srvid = d->srvid;
454         state->ctdb = ctdb;
455
456         state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
457         if (state->h == NULL) {
458                 talloc_free(state);
459                 return -1;
460         }
461
462         return 0;
463 }