3 Utility functions to read/write blobs of data from a file descriptor
4 and handle the case where we might need multiple read/writes to get all the
7 Copyright (C) Andrew Tridgell 2006
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, see <http://www.gnu.org/licenses/>.
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../include/ctdb_client.h"
32 /* structures for packet queueing - see common/ctdb_io.c */
38 struct ctdb_queue_pkt {
39 struct ctdb_queue_pkt *next, *prev;
46 struct ctdb_context *ctdb;
47 struct ctdb_partial partial; /* partial input packet */
48 struct ctdb_queue_pkt *out_queue, *out_queue_tail;
49 uint32_t out_queue_length;
54 ctdb_queue_cb_fn_t callback;
61 int ctdb_queue_length(struct ctdb_queue *queue)
63 return queue->out_queue_length;
67 called when an incoming connection is readable
68 This function MUST be safe for reentry via the queue callback!
70 static void queue_io_read(struct ctdb_queue *queue)
73 uint32_t sz_bytes_req;
75 uint32_t pkt_bytes_remaining;
80 /* check how much data is available on the socket for immediately
81 guaranteed nonblocking access.
82 as long as we are careful never to try to read more than this
83 we know all reads will be successful and will neither block
84 nor fail with a "data not available right now" error
86 if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
90 /* the descriptor has been closed */
94 if (queue->partial.data == NULL) {
95 /* starting fresh, allocate buf for size bytes */
96 sz_bytes_req = sizeof(pkt_size);
97 queue->partial.data = talloc_size(queue, sz_bytes_req);
98 if (queue->partial.data == NULL) {
99 DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
103 } else if (queue->partial.length < sizeof(pkt_size)) {
104 /* yet to find out the packet length */
105 sz_bytes_req = sizeof(pkt_size) - queue->partial.length;
107 /* partial packet, length known, full buf allocated */
110 data = queue->partial.data;
112 if (sz_bytes_req > 0) {
113 to_read = MIN(sz_bytes_req, num_ready);
114 nread = read(queue->fd, data + queue->partial.length,
117 DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread));
120 queue->partial.length += nread;
122 if (nread < sz_bytes_req) {
123 /* not enough to know the length */
124 DEBUG(DEBUG_DEBUG,("Partial packet length read\n"));
127 /* size now known, allocate buffer for the full packet */
128 queue->partial.data = talloc_realloc_size(queue, data,
130 if (queue->partial.data == NULL) {
131 DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
135 data = queue->partial.data;
139 pkt_size = *(uint32_t *)data;
141 DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n"));
145 pkt_bytes_remaining = pkt_size - queue->partial.length;
146 to_read = MIN(pkt_bytes_remaining, num_ready);
147 nread = read(queue->fd, data + queue->partial.length,
150 DEBUG(DEBUG_ERR,("read error nread=%d\n",
154 queue->partial.length += nread;
156 if (queue->partial.length < pkt_size) {
157 DEBUG(DEBUG_DEBUG,("Partial packet data read\n"));
161 queue->partial.data = NULL;
162 queue->partial.length = 0;
163 /* it is the responsibility of the callback to free 'data' */
164 queue->callback(data, pkt_size, queue->private_data);
168 queue->callback(NULL, 0, queue->private_data);
172 /* used when an event triggers a dead queue */
173 static void queue_dead(struct event_context *ev, struct timed_event *te,
174 struct timeval t, void *private_data)
176 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
177 queue->callback(NULL, 0, queue->private_data);
182 called when an incoming connection is writeable
184 static void queue_io_write(struct ctdb_queue *queue)
186 while (queue->out_queue) {
187 struct ctdb_queue_pkt *pkt = queue->out_queue;
189 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
190 n = write(queue->fd, pkt->data, 1);
192 n = write(queue->fd, pkt->data, pkt->length);
195 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
196 if (pkt->length != pkt->full_length) {
197 /* partial packet sent - we have to drop it */
198 DLIST_REMOVE(queue->out_queue, pkt);
199 queue->out_queue_length--;
202 talloc_free(queue->fde);
205 event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
211 if (n != pkt->length) {
217 DLIST_REMOVE(queue->out_queue, pkt);
218 queue->out_queue_length--;
222 EVENT_FD_NOT_WRITEABLE(queue->fde);
226 called when an incoming connection is readable or writeable
228 static void queue_io_handler(struct event_context *ev, struct fd_event *fde,
229 uint16_t flags, void *private_data)
231 struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
233 if (flags & EVENT_FD_READ) {
234 queue_io_read(queue);
236 queue_io_write(queue);
242 queue a packet for sending
244 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
246 struct ctdb_queue_pkt *pkt;
247 uint32_t length2, full_length;
249 if (queue->alignment) {
250 /* enforce the length and alignment rules from the tcp packet allocator */
251 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
252 *(uint32_t *)data = length2;
257 if (length2 != length) {
258 memset(data+length, 0, length2-length);
261 full_length = length2;
263 /* if the queue is empty then try an immediate write, avoiding
264 queue overhead. This relies on non-blocking sockets */
265 if (queue->out_queue == NULL && queue->fd != -1 &&
266 !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
267 ssize_t n = write(queue->fd, data, length2);
268 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
269 talloc_free(queue->fde);
272 event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
274 /* yes, we report success, as the dead node is
275 handled via a separate event */
282 if (length2 == 0) return 0;
285 pkt = talloc(queue, struct ctdb_queue_pkt);
286 CTDB_NO_MEMORY(queue->ctdb, pkt);
288 pkt->data = talloc_memdup(pkt, data, length2);
289 CTDB_NO_MEMORY(queue->ctdb, pkt->data);
291 pkt->length = length2;
292 pkt->full_length = full_length;
294 if (queue->out_queue == NULL && queue->fd != -1) {
295 EVENT_FD_WRITEABLE(queue->fde);
298 DLIST_ADD_END(queue->out_queue, pkt, NULL);
300 queue->out_queue_length++;
302 if (queue->ctdb->tunable.verbose_memory_names != 0) {
303 struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
304 switch (hdr->operation) {
305 case CTDB_REQ_CONTROL: {
306 struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
307 talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
308 queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
311 case CTDB_REQ_MESSAGE: {
312 struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
313 talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
314 queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
318 talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
319 queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
320 (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
330 setup the fd used by the queue
332 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
335 talloc_free(queue->fde);
339 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
340 queue_io_handler, queue);
341 if (queue->fde == NULL) {
344 tevent_fd_set_auto_close(queue->fde);
346 if (queue->out_queue) {
347 EVENT_FD_WRITEABLE(queue->fde);
354 /* If someone sets up this pointer, they want to know if the queue is freed */
355 static int queue_destructor(struct ctdb_queue *queue)
357 if (queue->destroyed != NULL)
358 *queue->destroyed = true;
363 setup a packet queue on a socket
365 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
366 TALLOC_CTX *mem_ctx, int fd, int alignment,
368 ctdb_queue_cb_fn_t callback,
369 void *private_data, const char *fmt, ...)
371 struct ctdb_queue *queue;
374 queue = talloc_zero(mem_ctx, struct ctdb_queue);
375 CTDB_NO_MEMORY_NULL(ctdb, queue);
377 queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
379 CTDB_NO_MEMORY_NULL(ctdb, queue->name);
383 queue->alignment = alignment;
384 queue->private_data = private_data;
385 queue->callback = callback;
387 if (ctdb_queue_set_fd(queue, fd) != 0) {
392 talloc_set_destructor(queue, queue_destructor);