3 Utility functions to read/write blobs of data from a file descriptor
4 and handle the case where we might need multiple read/writes to get all the
7 Copyright (C) Andrew Tridgell 2006
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, see <http://www.gnu.org/licenses/>.
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../include/ctdb_client.h"
32 #include "common/system.h"
34 #define QUEUE_BUFFER_SIZE (16*1024)
36 /* structures for packet queueing - see common/ctdb_io.c */
44 struct ctdb_queue_pkt {
45 struct ctdb_queue_pkt *next, *prev;
53 struct ctdb_context *ctdb;
54 struct tevent_immediate *im;
55 struct ctdb_buffer buffer; /* input buffer */
56 struct ctdb_queue_pkt *out_queue, *out_queue_tail;
57 uint32_t out_queue_length;
62 ctdb_queue_cb_fn_t callback;
69 int ctdb_queue_length(struct ctdb_queue *queue)
71 return queue->out_queue_length;
74 static void queue_process(struct ctdb_queue *queue);
76 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
79 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
85 * This function is used to process data in queue buffer.
87 * Queue callback function can end up freeing the queue, there should not be a
88 * loop processing packets from queue buffer. Instead set up a timed event for
89 * immediate run to process remaining packets from buffer.
91 static void queue_process(struct ctdb_queue *queue)
96 if (queue->buffer.length < sizeof(pkt_size)) {
100 pkt_size = *(uint32_t *)queue->buffer.data;
102 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
106 if (queue->buffer.length < pkt_size) {
107 if (pkt_size > QUEUE_BUFFER_SIZE) {
108 queue->buffer.extend = pkt_size;
113 /* Extract complete packet */
114 data = talloc_size(queue, pkt_size);
116 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
119 memcpy(data, queue->buffer.data, pkt_size);
121 /* Shift packet out from buffer */
122 if (queue->buffer.length > pkt_size) {
123 memmove(queue->buffer.data,
124 queue->buffer.data + pkt_size,
125 queue->buffer.length - pkt_size);
127 queue->buffer.length -= pkt_size;
129 if (queue->buffer.length > 0) {
130 /* There is more data to be processed, schedule an event */
131 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
132 queue_process_event, queue);
134 if (queue->buffer.size > QUEUE_BUFFER_SIZE) {
135 TALLOC_FREE(queue->buffer.data);
136 queue->buffer.size = 0;
140 /* It is the responsibility of the callback to free 'data' */
141 queue->callback(data, pkt_size, queue->private_data);
145 queue->callback(NULL, 0, queue->private_data);
151 called when an incoming connection is readable
152 This function MUST be safe for reentry via the queue callback!
154 static void queue_io_read(struct ctdb_queue *queue)
161 /* check how much data is available on the socket for immediately
162 guaranteed nonblocking access.
163 as long as we are careful never to try to read more than this
164 we know all reads will be successful and will neither block
165 nor fail with a "data not available right now" error
167 if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
170 if (num_ready == 0) {
171 /* the descriptor has been closed */
175 if (queue->buffer.data == NULL) {
176 /* starting fresh, allocate buf to read data */
177 queue->buffer.data = talloc_size(queue, QUEUE_BUFFER_SIZE);
178 if (queue->buffer.data == NULL) {
179 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
182 queue->buffer.size = QUEUE_BUFFER_SIZE;
183 } else if (queue->buffer.extend > 0) {
184 /* extending buffer */
185 data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
187 DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
190 queue->buffer.data = data;
191 queue->buffer.size = queue->buffer.extend;
192 queue->buffer.extend = 0;
195 navail = queue->buffer.size - queue->buffer.length;
196 if (num_ready > navail) {
201 nread = sys_read(queue->fd,
202 queue->buffer.data + queue->buffer.length,
205 DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
208 queue->buffer.length += nread;
211 queue_process(queue);
215 queue->callback(NULL, 0, queue->private_data);
219 /* used when an event triggers a dead queue */
220 static void queue_dead(struct event_context *ev, struct tevent_immediate *im,
223 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
224 queue->callback(NULL, 0, queue->private_data);
229 called when an incoming connection is writeable
231 static void queue_io_write(struct ctdb_queue *queue)
233 while (queue->out_queue) {
234 struct ctdb_queue_pkt *pkt = queue->out_queue;
236 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
237 n = write(queue->fd, pkt->data, 1);
239 n = write(queue->fd, pkt->data, pkt->length);
242 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
243 if (pkt->length != pkt->full_length) {
244 /* partial packet sent - we have to drop it */
245 DLIST_REMOVE(queue->out_queue, pkt);
246 queue->out_queue_length--;
249 talloc_free(queue->fde);
252 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
258 if (n != pkt->length) {
264 DLIST_REMOVE(queue->out_queue, pkt);
265 queue->out_queue_length--;
269 EVENT_FD_NOT_WRITEABLE(queue->fde);
273 called when an incoming connection is readable or writeable
275 static void queue_io_handler(struct event_context *ev, struct fd_event *fde,
276 uint16_t flags, void *private_data)
278 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
280 if (flags & EVENT_FD_READ) {
281 queue_io_read(queue);
283 queue_io_write(queue);
289 queue a packet for sending
291 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
293 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
294 struct ctdb_queue_pkt *pkt;
295 uint32_t length2, full_length;
297 if (queue->alignment) {
298 /* enforce the length and alignment rules from the tcp packet allocator */
299 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
300 *(uint32_t *)data = length2;
305 if (length2 != length) {
306 memset(data+length, 0, length2-length);
309 full_length = length2;
311 /* if the queue is empty then try an immediate write, avoiding
312 queue overhead. This relies on non-blocking sockets */
313 if (queue->out_queue == NULL && queue->fd != -1 &&
314 !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
315 ssize_t n = write(queue->fd, data, length2);
316 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
317 talloc_free(queue->fde);
320 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
322 /* yes, we report success, as the dead node is
323 handled via a separate event */
330 if (length2 == 0) return 0;
334 queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
335 CTDB_NO_MEMORY(queue->ctdb, pkt);
336 talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
338 pkt->data = pkt->buf;
339 memcpy(pkt->data, data, length2);
341 pkt->length = length2;
342 pkt->full_length = full_length;
344 if (queue->out_queue == NULL && queue->fd != -1) {
345 EVENT_FD_WRITEABLE(queue->fde);
348 DLIST_ADD_END(queue->out_queue, pkt, NULL);
350 queue->out_queue_length++;
352 if (queue->ctdb->tunable.verbose_memory_names != 0) {
353 switch (hdr->operation) {
354 case CTDB_REQ_CONTROL: {
355 struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
356 talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
357 queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
360 case CTDB_REQ_MESSAGE: {
361 struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
362 talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
363 queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
367 talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
368 queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
369 (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
379 setup the fd used by the queue
381 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
384 talloc_free(queue->fde);
388 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
389 queue_io_handler, queue);
390 if (queue->fde == NULL) {
393 tevent_fd_set_auto_close(queue->fde);
395 if (queue->out_queue) {
396 EVENT_FD_WRITEABLE(queue->fde);
403 /* If someone sets up this pointer, they want to know if the queue is freed */
404 static int queue_destructor(struct ctdb_queue *queue)
406 TALLOC_FREE(queue->buffer.data);
407 queue->buffer.length = 0;
408 queue->buffer.size = 0;
409 if (queue->destroyed != NULL)
410 *queue->destroyed = true;
415 setup a packet queue on a socket
417 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
418 TALLOC_CTX *mem_ctx, int fd, int alignment,
419 ctdb_queue_cb_fn_t callback,
420 void *private_data, const char *fmt, ...)
422 struct ctdb_queue *queue;
425 queue = talloc_zero(mem_ctx, struct ctdb_queue);
426 CTDB_NO_MEMORY_NULL(ctdb, queue);
428 queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
430 CTDB_NO_MEMORY_NULL(ctdb, queue->name);
432 queue->im= tevent_create_immediate(queue);
433 CTDB_NO_MEMORY_NULL(ctdb, queue->im);
437 queue->alignment = alignment;
438 queue->private_data = private_data;
439 queue->callback = callback;
441 if (ctdb_queue_set_fd(queue, fd) != 0) {
446 talloc_set_destructor(queue, queue_destructor);