2 efficient async ctdb traverse
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
28 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
31 handle returned to caller - freeing this handler will kill the child and
32 terminate the traverse
34 struct ctdb_traverse_local_handle {
35 struct ctdb_db_context *ctdb_db;
39 ctdb_traverse_fn_t callback;
40 struct timeval start_time;
41 struct ctdb_queue *queue;
45 called when data is available from the child
47 static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
49 struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
50 struct ctdb_traverse_local_handle);
52 ctdb_traverse_fn_t callback = h->callback;
53 void *p = h->private_data;
54 struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
56 if (rawdata == NULL || length < 4 || length != tdata->length) {
59 callback(p, tdb_null, tdb_null);
63 key.dsize = tdata->keylen;
64 key.dptr = &tdata->data[0];
65 data.dsize = tdata->datalen;
66 data.dptr = &tdata->data[tdata->keylen];
68 callback(p, key, data);
72 destroy a in-flight traverse operation
74 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
76 kill(h->child, SIGKILL);
81 callback from tdb_traverse_read()
83 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
85 struct ctdb_traverse_local_handle *h = talloc_get_type(p,
86 struct ctdb_traverse_local_handle);
87 struct ctdb_rec_data *d;
88 struct ctdb_ltdb_header *hdr;
91 hdr = (struct ctdb_ltdb_header *)data.dptr;
93 if (h->ctdb_db->persistent == 0) {
94 /* filter out zero-length records */
95 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
99 /* filter out non-authoritative records */
100 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
105 d = ctdb_marshall_record(h, 0, key, NULL, data);
107 /* error handling is tricky in this child code .... */
111 if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
119 setup a non-blocking traverse of a local ltdb. The callback function
120 will be called on every record in the local ltdb. To stop the
121 travserse, talloc_free() the travserse_handle.
123 The traverse is finished when the callback is called with tdb_null for key and data
125 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
126 ctdb_traverse_fn_t callback,
129 struct ctdb_traverse_local_handle *h;
132 h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle);
146 if (h->child == (pid_t)-1) {
153 h->callback = callback;
154 h->private_data = private_data;
155 h->ctdb_db = ctdb_db;
158 /* start the traverse in the child */
160 tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
165 talloc_set_destructor(h, traverse_local_destructor);
168 setup a packet queue between the child and the parent. This
169 copes with all the async and packet boundary issues
171 h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
172 if (h->queue == NULL) {
177 h->start_time = timeval_current();
183 struct ctdb_traverse_all_handle {
184 struct ctdb_context *ctdb;
185 struct ctdb_db_context *ctdb_db;
187 ctdb_traverse_fn_t callback;
193 destroy a traverse_all op
195 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
197 ctdb_reqid_remove(state->ctdb, state->reqid);
201 struct ctdb_traverse_all {
207 /* called when a traverse times out */
208 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te,
209 struct timeval t, void *private_data)
211 struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
213 state->ctdb->statistics.timeouts.traverse++;
215 state->callback(state->private_data, tdb_null, tdb_null);
220 setup a cluster-wide non-blocking traverse of a ctdb. The
221 callback function will be called on every record in the local
222 ltdb. To stop the travserse, talloc_free() the traverse_handle.
224 The traverse is finished when the callback is called with tdb_null
227 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
228 ctdb_traverse_fn_t callback,
231 struct ctdb_traverse_all_handle *state;
232 struct ctdb_context *ctdb = ctdb_db->ctdb;
235 struct ctdb_traverse_all r;
236 uint32_t destination;
238 state = talloc(ctdb_db, struct ctdb_traverse_all_handle);
244 state->ctdb_db = ctdb_db;
245 state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
246 state->callback = callback;
247 state->private_data = private_data;
248 state->null_count = 0;
250 talloc_set_destructor(state, ctdb_traverse_all_destructor);
252 r.db_id = ctdb_db->db_id;
253 r.reqid = state->reqid;
256 data.dptr = (uint8_t *)&r;
257 data.dsize = sizeof(r);
259 if (ctdb_db->persistent == 0) {
260 /* normal database, traverse all nodes */
261 destination = CTDB_BROADCAST_VNNMAP;
264 /* persistent database, traverse one node, preferably
267 destination = ctdb->pnn;
268 /* check we are in the vnnmap */
269 for (i=0; i < ctdb->vnn_map->size; i++) {
270 if (ctdb->vnn_map->map[i] == ctdb->pnn) {
274 /* if we are not in the vnn map we just pick the first
277 if (i == ctdb->vnn_map->size) {
278 destination = ctdb->vnn_map->map[0];
282 /* tell all the nodes in the cluster to start sending records to this
283 * node, or if it is a persistent database, just tell the local
286 ret = ctdb_daemon_send_control(ctdb, destination, 0,
287 CTDB_CONTROL_TRAVERSE_ALL,
288 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
295 /* timeout the traverse */
296 event_add_timed(ctdb->ev, state,
297 timeval_current_ofs(ctdb->tunable.traverse_timeout, 0),
298 ctdb_traverse_all_timeout, state);
303 struct traverse_all_state {
304 struct ctdb_context *ctdb;
305 struct ctdb_traverse_local_handle *h;
311 called for each record during a traverse all
313 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
315 struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
317 struct ctdb_rec_data *d;
320 d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
323 DEBUG(DEBUG_ERR,("Out of memory in traverse_all_callback\n"));
327 cdata.dptr = (uint8_t *)d;
328 cdata.dsize = d->length;
330 ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
331 0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
333 DEBUG(DEBUG_ERR,("Failed to send traverse data\n"));
336 if (key.dsize == 0 && data.dsize == 0) {
343 called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
344 setup a traverse of our local ltdb, sending the records as
345 CTDB_CONTROL_TRAVERSE_DATA records back to the originator
347 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
349 struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
350 struct traverse_all_state *state;
351 struct ctdb_db_context *ctdb_db;
353 if (data.dsize != sizeof(struct ctdb_traverse_all)) {
354 DEBUG(DEBUG_ERR,("Invalid size in ctdb_control_traverse_all\n"));
358 ctdb_db = find_ctdb_db(ctdb, c->db_id);
359 if (ctdb_db == NULL) {
363 state = talloc(ctdb_db, struct traverse_all_state);
368 state->reqid = c->reqid;
369 state->srcnode = c->pnn;
372 state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
373 if (state->h == NULL) {
383 called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
384 call the traverse_all callback with the record
386 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
388 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
389 struct ctdb_traverse_all_handle *state;
391 ctdb_traverse_fn_t callback;
394 if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
395 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
399 state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
400 if (state == NULL || d->reqid != state->reqid) {
401 /* traverse might have been terminated already */
405 key.dsize = d->keylen;
406 key.dptr = &d->data[0];
407 data.dsize = d->datalen;
408 data.dptr = &d->data[d->keylen];
410 if (key.dsize == 0 && data.dsize == 0) {
412 /* Persistent databases are only scanned on one node (the local
415 if (state->ctdb_db->persistent == 0) {
416 if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
422 callback = state->callback;
423 private_data = state->private_data;
425 callback(private_data, key, data);
426 if (key.dsize == 0 && data.dsize == 0) {
427 /* we've received all of the null replies, so all
428 nodes are finished */
434 struct traverse_start_state {
435 struct ctdb_context *ctdb;
436 struct ctdb_traverse_all_handle *h;
443 callback which sends records as messages to the client
445 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
447 struct traverse_start_state *state;
448 struct ctdb_rec_data *d;
451 state = talloc_get_type(p, struct traverse_start_state);
453 d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
458 cdata.dptr = (uint8_t *)d;
459 cdata.dsize = d->length;
461 ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
462 if (key.dsize == 0 && data.dsize == 0) {
463 /* end of traverse */
469 start a traverse_all - called as a control from a client
471 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data,
472 TDB_DATA *outdata, uint32_t srcnode)
474 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
475 struct traverse_start_state *state;
476 struct ctdb_db_context *ctdb_db;
478 if (data.dsize != sizeof(*d)) {
479 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
483 ctdb_db = find_ctdb_db(ctdb, d->db_id);
484 if (ctdb_db == NULL) {
488 state = talloc(ctdb_db, struct traverse_start_state);
493 state->srcnode = srcnode;
494 state->reqid = d->reqid;
495 state->srvid = d->srvid;
498 state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
499 if (state->h == NULL) {