2 efficient async ctdb traverse
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
29 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
32 handle returned to caller - freeing this handler will kill the child and
33 terminate the traverse
35 struct ctdb_traverse_local_handle {
36 struct ctdb_traverse_local_handle *next, *prev;
37 struct ctdb_db_context *ctdb_db;
41 uint32_t client_reqid;
43 ctdb_traverse_fn_t callback;
44 struct timeval start_time;
45 struct ctdb_queue *queue;
49 called when data is available from the child
51 static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
53 struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
54 struct ctdb_traverse_local_handle);
56 ctdb_traverse_fn_t callback = h->callback;
57 void *p = h->private_data;
58 struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
60 if (rawdata == NULL || length < 4 || length != tdata->length) {
63 callback(p, tdb_null, tdb_null);
67 key.dsize = tdata->keylen;
68 key.dptr = &tdata->data[0];
69 data.dsize = tdata->datalen;
70 data.dptr = &tdata->data[tdata->keylen];
72 callback(p, key, data);
76 destroy a in-flight traverse operation
78 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
80 DLIST_REMOVE(h->ctdb_db->traverse, h);
81 kill(h->child, SIGKILL);
86 callback from tdb_traverse_read()
88 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
90 struct ctdb_traverse_local_handle *h = talloc_get_type(p,
91 struct ctdb_traverse_local_handle);
92 struct ctdb_rec_data *d;
93 struct ctdb_ltdb_header *hdr;
95 hdr = (struct ctdb_ltdb_header *)data.dptr;
97 if (h->ctdb_db->persistent == 0) {
98 /* filter out zero-length records */
99 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
103 /* filter out non-authoritative records */
104 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
109 d = ctdb_marshall_record(h, 0, key, NULL, data);
111 /* error handling is tricky in this child code .... */
115 if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
121 struct traverse_all_state {
122 struct ctdb_context *ctdb;
123 struct ctdb_traverse_local_handle *h;
126 uint32_t client_reqid;
131 setup a non-blocking traverse of a local ltdb. The callback function
132 will be called on every record in the local ltdb. To stop the
133 travserse, talloc_free() the travserse_handle.
135 The traverse is finished when the callback is called with tdb_null for key and data
137 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
138 ctdb_traverse_fn_t callback,
139 struct traverse_all_state *all_state)
141 struct ctdb_traverse_local_handle *h;
144 h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
158 if (h->child == (pid_t)-1) {
165 h->callback = callback;
166 h->private_data = all_state;
167 h->ctdb_db = ctdb_db;
168 h->client_reqid = all_state->client_reqid;
169 h->srvid = all_state->srvid;
172 /* start the traverse in the child */
174 tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
179 set_close_on_exec(h->fd[0]);
181 talloc_set_destructor(h, traverse_local_destructor);
183 DLIST_ADD(ctdb_db->traverse, h);
186 setup a packet queue between the child and the parent. This
187 copes with all the async and packet boundary issues
189 DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to child traverse\n", h->fd[0]));
191 h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
192 if (h->queue == NULL) {
197 h->start_time = timeval_current();
203 struct ctdb_traverse_all_handle {
204 struct ctdb_context *ctdb;
205 struct ctdb_db_context *ctdb_db;
207 ctdb_traverse_fn_t callback;
213 destroy a traverse_all op
215 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
217 ctdb_reqid_remove(state->ctdb, state->reqid);
221 struct ctdb_traverse_all {
225 uint32_t client_reqid;
229 /* called when a traverse times out */
230 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te,
231 struct timeval t, void *private_data)
233 struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
235 DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
236 state->ctdb->statistics.timeouts.traverse++;
238 state->callback(state->private_data, tdb_null, tdb_null);
242 struct traverse_start_state {
243 struct ctdb_context *ctdb;
244 struct ctdb_traverse_all_handle *h;
253 setup a cluster-wide non-blocking traverse of a ctdb. The
254 callback function will be called on every record in the local
255 ltdb. To stop the travserse, talloc_free() the traverse_handle.
257 The traverse is finished when the callback is called with tdb_null
260 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
261 ctdb_traverse_fn_t callback,
262 struct traverse_start_state *start_state)
264 struct ctdb_traverse_all_handle *state;
265 struct ctdb_context *ctdb = ctdb_db->ctdb;
268 struct ctdb_traverse_all r;
269 uint32_t destination;
271 state = talloc(start_state, struct ctdb_traverse_all_handle);
277 state->ctdb_db = ctdb_db;
278 state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
279 state->callback = callback;
280 state->private_data = start_state;
281 state->null_count = 0;
283 talloc_set_destructor(state, ctdb_traverse_all_destructor);
285 r.db_id = ctdb_db->db_id;
286 r.reqid = state->reqid;
288 r.client_reqid = start_state->reqid;
289 r.srvid = start_state->srvid;
291 data.dptr = (uint8_t *)&r;
292 data.dsize = sizeof(r);
294 if (ctdb_db->persistent == 0) {
295 /* normal database, traverse all nodes */
296 destination = CTDB_BROADCAST_VNNMAP;
299 /* persistent database, traverse one node, preferably
302 destination = ctdb->pnn;
303 /* check we are in the vnnmap */
304 for (i=0; i < ctdb->vnn_map->size; i++) {
305 if (ctdb->vnn_map->map[i] == ctdb->pnn) {
309 /* if we are not in the vnn map we just pick the first
312 if (i == ctdb->vnn_map->size) {
313 destination = ctdb->vnn_map->map[0];
317 /* tell all the nodes in the cluster to start sending records to this
318 * node, or if it is a persistent database, just tell the local
321 ret = ctdb_daemon_send_control(ctdb, destination, 0,
322 CTDB_CONTROL_TRAVERSE_ALL,
323 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
330 /* timeout the traverse */
331 event_add_timed(ctdb->ev, state,
332 timeval_current_ofs(ctdb->tunable.traverse_timeout, 0),
333 ctdb_traverse_all_timeout, state);
339 called for each record during a traverse all
341 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
343 struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
345 struct ctdb_rec_data *d;
348 d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
351 DEBUG(DEBUG_ERR,("Out of memory in traverse_all_callback\n"));
355 cdata.dptr = (uint8_t *)d;
356 cdata.dsize = d->length;
358 ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
359 0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
361 DEBUG(DEBUG_ERR,("Failed to send traverse data\n"));
364 if (key.dsize == 0 && data.dsize == 0) {
371 called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
372 setup a traverse of our local ltdb, sending the records as
373 CTDB_CONTROL_TRAVERSE_DATA records back to the originator
375 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
377 struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
378 struct traverse_all_state *state;
379 struct ctdb_db_context *ctdb_db;
381 if (data.dsize != sizeof(struct ctdb_traverse_all)) {
382 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
386 ctdb_db = find_ctdb_db(ctdb, c->db_id);
387 if (ctdb_db == NULL) {
391 state = talloc(ctdb_db, struct traverse_all_state);
396 state->reqid = c->reqid;
397 state->srcnode = c->pnn;
399 state->client_reqid = c->client_reqid;
400 state->srvid = c->srvid;
402 state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
403 if (state->h == NULL) {
413 called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
414 call the traverse_all callback with the record
416 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
418 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
419 struct ctdb_traverse_all_handle *state;
421 ctdb_traverse_fn_t callback;
424 if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
425 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
429 state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
430 if (state == NULL || d->reqid != state->reqid) {
431 /* traverse might have been terminated already */
435 key.dsize = d->keylen;
436 key.dptr = &d->data[0];
437 data.dsize = d->datalen;
438 data.dptr = &d->data[d->keylen];
440 if (key.dsize == 0 && data.dsize == 0) {
442 /* Persistent databases are only scanned on one node (the local
445 if (state->ctdb_db->persistent == 0) {
446 if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
452 callback = state->callback;
453 private_data = state->private_data;
455 callback(private_data, key, data);
460 kill a in-progress traverse, used when a client disconnects
462 int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data,
463 TDB_DATA *outdata, uint32_t srcnode)
465 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
466 struct ctdb_db_context *ctdb_db;
467 struct ctdb_traverse_local_handle *t;
469 ctdb_db = find_ctdb_db(ctdb, d->db_id);
470 if (ctdb_db == NULL) {
474 for (t=ctdb_db->traverse; t; t=t->next) {
475 if (t->client_reqid == d->reqid &&
476 t->srvid == d->srvid) {
487 this is called when a client disconnects during a traverse
488 we need to notify all the nodes taking part in the search that they
489 should kill their traverse children
491 static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
493 struct ctdb_traverse_start r;
496 DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
497 r.db_id = state->db_id;
498 r.reqid = state->reqid;
499 r.srvid = state->srvid;
501 data.dptr = (uint8_t *)&r;
502 data.dsize = sizeof(r);
504 ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0,
505 CTDB_CONTROL_TRAVERSE_KILL,
506 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
511 callback which sends records as messages to the client
513 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
515 struct traverse_start_state *state;
516 struct ctdb_rec_data *d;
519 state = talloc_get_type(p, struct traverse_start_state);
521 d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
526 cdata.dptr = (uint8_t *)d;
527 cdata.dsize = d->length;
529 ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
530 if (key.dsize == 0 && data.dsize == 0) {
531 /* end of traverse */
532 talloc_set_destructor(state, NULL);
539 start a traverse_all - called as a control from a client
541 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data,
542 TDB_DATA *outdata, uint32_t srcnode, uint32_t client_id)
544 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
545 struct traverse_start_state *state;
546 struct ctdb_db_context *ctdb_db;
547 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
549 if (client == NULL) {
550 DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
554 if (data.dsize != sizeof(*d)) {
555 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
559 ctdb_db = find_ctdb_db(ctdb, d->db_id);
560 if (ctdb_db == NULL) {
564 state = talloc(client, struct traverse_start_state);
569 state->srcnode = srcnode;
570 state->reqid = d->reqid;
571 state->srvid = d->srvid;
572 state->db_id = d->db_id;
575 state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
576 if (state->h == NULL) {
581 talloc_set_destructor(state, ctdb_traverse_start_destructor);