2 efficient async ctdb traverse
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
29 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
32 handle returned to caller - freeing this handler will kill the child and
33 terminate the traverse
35 struct ctdb_traverse_local_handle {
36 struct ctdb_traverse_local_handle *next, *prev;
37 struct ctdb_db_context *ctdb_db;
41 uint32_t client_reqid;
43 ctdb_traverse_fn_t callback;
44 struct timeval start_time;
45 struct ctdb_queue *queue;
49 called when data is available from the child
51 static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
53 struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
54 struct ctdb_traverse_local_handle);
56 ctdb_traverse_fn_t callback = h->callback;
57 void *p = h->private_data;
58 struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
60 if (rawdata == NULL || length < 4 || length != tdata->length) {
63 callback(p, tdb_null, tdb_null);
67 key.dsize = tdata->keylen;
68 key.dptr = &tdata->data[0];
69 data.dsize = tdata->datalen;
70 data.dptr = &tdata->data[tdata->keylen];
72 callback(p, key, data);
76 destroy a in-flight traverse operation
78 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
80 DLIST_REMOVE(h->ctdb_db->traverse, h);
81 kill(h->child, SIGKILL);
86 callback from tdb_traverse_read()
88 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
90 struct ctdb_traverse_local_handle *h = talloc_get_type(p,
91 struct ctdb_traverse_local_handle);
92 struct ctdb_rec_data *d;
93 struct ctdb_ltdb_header *hdr;
95 hdr = (struct ctdb_ltdb_header *)data.dptr;
97 if (h->ctdb_db->persistent == 0) {
98 /* filter out zero-length records */
99 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
103 /* filter out non-authoritative records */
104 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
109 d = ctdb_marshall_record(h, 0, key, NULL, data);
111 /* error handling is tricky in this child code .... */
115 if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
121 struct traverse_all_state {
122 struct ctdb_context *ctdb;
123 struct ctdb_traverse_local_handle *h;
126 uint32_t client_reqid;
131 setup a non-blocking traverse of a local ltdb. The callback function
132 will be called on every record in the local ltdb. To stop the
133 travserse, talloc_free() the travserse_handle.
135 The traverse is finished when the callback is called with tdb_null for key and data
137 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
138 ctdb_traverse_fn_t callback,
139 struct traverse_all_state *all_state)
141 struct ctdb_traverse_local_handle *h;
144 h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
158 if (h->child == (pid_t)-1) {
165 h->callback = callback;
166 h->private_data = all_state;
167 h->ctdb_db = ctdb_db;
168 h->client_reqid = all_state->client_reqid;
169 h->srvid = all_state->srvid;
172 /* start the traverse in the child */
174 tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
179 set_close_on_exec(h->fd[0]);
181 talloc_set_destructor(h, traverse_local_destructor);
183 DLIST_ADD(ctdb_db->traverse, h);
186 setup a packet queue between the child and the parent. This
187 copes with all the async and packet boundary issues
189 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child traverse\n", h->fd[0]));
191 h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h,
193 if (h->queue == NULL) {
198 h->start_time = timeval_current();
204 struct ctdb_traverse_all_handle {
205 struct ctdb_context *ctdb;
206 struct ctdb_db_context *ctdb_db;
208 ctdb_traverse_fn_t callback;
214 destroy a traverse_all op
216 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
218 ctdb_reqid_remove(state->ctdb, state->reqid);
222 struct ctdb_traverse_all {
226 uint32_t client_reqid;
230 /* called when a traverse times out */
231 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te,
232 struct timeval t, void *private_data)
234 struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
236 DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
237 state->ctdb->statistics.timeouts.traverse++;
239 state->callback(state->private_data, tdb_null, tdb_null);
243 struct traverse_start_state {
244 struct ctdb_context *ctdb;
245 struct ctdb_traverse_all_handle *h;
254 setup a cluster-wide non-blocking traverse of a ctdb. The
255 callback function will be called on every record in the local
256 ltdb. To stop the travserse, talloc_free() the traverse_handle.
258 The traverse is finished when the callback is called with tdb_null
261 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
262 ctdb_traverse_fn_t callback,
263 struct traverse_start_state *start_state)
265 struct ctdb_traverse_all_handle *state;
266 struct ctdb_context *ctdb = ctdb_db->ctdb;
269 struct ctdb_traverse_all r;
270 uint32_t destination;
272 state = talloc(start_state, struct ctdb_traverse_all_handle);
278 state->ctdb_db = ctdb_db;
279 state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
280 state->callback = callback;
281 state->private_data = start_state;
282 state->null_count = 0;
284 talloc_set_destructor(state, ctdb_traverse_all_destructor);
286 r.db_id = ctdb_db->db_id;
287 r.reqid = state->reqid;
289 r.client_reqid = start_state->reqid;
290 r.srvid = start_state->srvid;
292 data.dptr = (uint8_t *)&r;
293 data.dsize = sizeof(r);
295 if (ctdb_db->persistent == 0) {
296 /* normal database, traverse all nodes */
297 destination = CTDB_BROADCAST_VNNMAP;
300 /* persistent database, traverse one node, preferably
303 destination = ctdb->pnn;
304 /* check we are in the vnnmap */
305 for (i=0; i < ctdb->vnn_map->size; i++) {
306 if (ctdb->vnn_map->map[i] == ctdb->pnn) {
310 /* if we are not in the vnn map we just pick the first
313 if (i == ctdb->vnn_map->size) {
314 destination = ctdb->vnn_map->map[0];
318 /* tell all the nodes in the cluster to start sending records to this
319 * node, or if it is a persistent database, just tell the local
322 ret = ctdb_daemon_send_control(ctdb, destination, 0,
323 CTDB_CONTROL_TRAVERSE_ALL,
324 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
331 /* timeout the traverse */
332 event_add_timed(ctdb->ev, state,
333 timeval_current_ofs(ctdb->tunable.traverse_timeout, 0),
334 ctdb_traverse_all_timeout, state);
340 called for each record during a traverse all
342 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
344 struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
346 struct ctdb_rec_data *d;
349 d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
352 DEBUG(DEBUG_ERR,("Out of memory in traverse_all_callback\n"));
356 cdata.dptr = (uint8_t *)d;
357 cdata.dsize = d->length;
359 ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
360 0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
362 DEBUG(DEBUG_ERR,("Failed to send traverse data\n"));
365 if (key.dsize == 0 && data.dsize == 0) {
372 called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
373 setup a traverse of our local ltdb, sending the records as
374 CTDB_CONTROL_TRAVERSE_DATA records back to the originator
376 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
378 struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
379 struct traverse_all_state *state;
380 struct ctdb_db_context *ctdb_db;
382 if (data.dsize != sizeof(struct ctdb_traverse_all)) {
383 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
387 ctdb_db = find_ctdb_db(ctdb, c->db_id);
388 if (ctdb_db == NULL) {
392 if (ctdb_db->unhealthy_reason) {
393 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
394 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
395 ctdb_db->db_name, ctdb_db->unhealthy_reason));
398 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
399 ctdb_db->db_name, ctdb_db->unhealthy_reason));
402 state = talloc(ctdb_db, struct traverse_all_state);
407 state->reqid = c->reqid;
408 state->srcnode = c->pnn;
410 state->client_reqid = c->client_reqid;
411 state->srvid = c->srvid;
413 state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
414 if (state->h == NULL) {
424 called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
425 call the traverse_all callback with the record
427 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
429 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
430 struct ctdb_traverse_all_handle *state;
432 ctdb_traverse_fn_t callback;
435 if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
436 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
440 state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
441 if (state == NULL || d->reqid != state->reqid) {
442 /* traverse might have been terminated already */
446 key.dsize = d->keylen;
447 key.dptr = &d->data[0];
448 data.dsize = d->datalen;
449 data.dptr = &d->data[d->keylen];
451 if (key.dsize == 0 && data.dsize == 0) {
453 /* Persistent databases are only scanned on one node (the local
456 if (state->ctdb_db->persistent == 0) {
457 if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
463 callback = state->callback;
464 private_data = state->private_data;
466 callback(private_data, key, data);
471 kill a in-progress traverse, used when a client disconnects
473 int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data,
474 TDB_DATA *outdata, uint32_t srcnode)
476 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
477 struct ctdb_db_context *ctdb_db;
478 struct ctdb_traverse_local_handle *t;
480 ctdb_db = find_ctdb_db(ctdb, d->db_id);
481 if (ctdb_db == NULL) {
485 for (t=ctdb_db->traverse; t; t=t->next) {
486 if (t->client_reqid == d->reqid &&
487 t->srvid == d->srvid) {
498 this is called when a client disconnects during a traverse
499 we need to notify all the nodes taking part in the search that they
500 should kill their traverse children
502 static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
504 struct ctdb_traverse_start r;
507 DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
508 r.db_id = state->db_id;
509 r.reqid = state->reqid;
510 r.srvid = state->srvid;
512 data.dptr = (uint8_t *)&r;
513 data.dsize = sizeof(r);
515 ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0,
516 CTDB_CONTROL_TRAVERSE_KILL,
517 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
522 callback which sends records as messages to the client
524 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
526 struct traverse_start_state *state;
527 struct ctdb_rec_data *d;
530 state = talloc_get_type(p, struct traverse_start_state);
532 d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
537 cdata.dptr = (uint8_t *)d;
538 cdata.dsize = d->length;
540 ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
541 if (key.dsize == 0 && data.dsize == 0) {
542 /* end of traverse */
543 talloc_set_destructor(state, NULL);
550 start a traverse_all - called as a control from a client
552 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data,
553 TDB_DATA *outdata, uint32_t srcnode, uint32_t client_id)
555 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
556 struct traverse_start_state *state;
557 struct ctdb_db_context *ctdb_db;
558 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
560 if (client == NULL) {
561 DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
565 if (data.dsize != sizeof(*d)) {
566 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
570 ctdb_db = find_ctdb_db(ctdb, d->db_id);
571 if (ctdb_db == NULL) {
575 if (ctdb_db->unhealthy_reason) {
576 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
577 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
578 ctdb_db->db_name, ctdb_db->unhealthy_reason));
581 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
582 ctdb_db->db_name, ctdb_db->unhealthy_reason));
585 state = talloc(client, struct traverse_start_state);
590 state->srcnode = srcnode;
591 state->reqid = d->reqid;
592 state->srvid = d->srvid;
593 state->db_id = d->db_id;
596 state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
597 if (state->h == NULL) {
602 talloc_set_destructor(state, ctdb_traverse_start_destructor);