From 382705f495dd7f196efc8bb24b9cee3649b44836 Mon Sep 17 00:00:00 2001 From: Swen Schillig Date: Mon, 12 Mar 2018 11:00:55 +0100 Subject: [PATCH] ctdb: Introduce buffer.offset to avoid memmove The memmove operation is quite expensive, therefore, a new buffer attribute "offset" is introduced to support an optimized buffer processing. The optimization is to "walk" through the buffer and process each packet until the buffer is fully processed (empty) without requiring any memmove. Only if a packet is in-complete, the buffer content is moved and the new data is read from the queue. This way almost all memmove operations are eliminated. Signed-off-by: Swen Schillig Reviewed-by: Martin Schwenke Reviewed-by: Christof Schmitt --- ctdb/common/ctdb_io.c | 55 ++++++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c index 32d8fc753a6..5bed7a61b31 100644 --- a/ctdb/common/ctdb_io.c +++ b/ctdb/common/ctdb_io.c @@ -43,6 +43,7 @@ struct ctdb_buffer { uint8_t *data; uint32_t length; uint32_t size; + uint32_t offset; }; struct ctdb_queue_pkt { @@ -95,14 +96,14 @@ static void queue_process_event(struct tevent_context *ev, struct tevent_immedia static void queue_process(struct ctdb_queue *queue) { uint32_t pkt_size; - uint8_t *data; + uint8_t *data = NULL; if (queue->buffer.length < sizeof(pkt_size)) { return; } /* Did we at least read the size into the buffer */ - pkt_size = *(uint32_t *)queue->buffer.data; + pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset); if (pkt_size == 0) { DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n")); goto failed; @@ -114,20 +115,26 @@ static void queue_process(struct ctdb_queue *queue) } /* Extract complete packet */ - data = talloc_memdup(queue, queue->buffer.data, pkt_size); + data = talloc_memdup(queue, + queue->buffer.data + queue->buffer.offset, + pkt_size); + if (data == NULL) { D_ERR("read error alloc failed for %u\n", pkt_size); return; } - /* Shift packet out from buffer */ - if (queue->buffer.length > pkt_size) { - memmove(queue->buffer.data, - queue->buffer.data + pkt_size, - queue->buffer.length - pkt_size); - } + queue->buffer.offset += pkt_size; queue->buffer.length -= pkt_size; + if (queue->buffer.offset < pkt_size || + queue->buffer.offset > queue->buffer.size) { + D_ERR("buffer offset overflow\n"); + TALLOC_FREE(queue->buffer.data); + memset(&queue->buffer, 0, sizeof(queue->buffer)); + goto failed; + } + if (queue->buffer.length > 0) { /* There is more data to be processed, schedule an event */ tevent_schedule_immediate(queue->im, queue->ctdb->ev, @@ -137,6 +144,7 @@ static void queue_process(struct ctdb_queue *queue) TALLOC_FREE(queue->buffer.data); queue->buffer.size = 0; } + queue->buffer.offset = 0; } /* It is the responsibility of the callback to free 'data' */ @@ -145,10 +153,8 @@ static void queue_process(struct ctdb_queue *queue) failed: queue->callback(NULL, 0, queue->private_data); - } - /* called when an incoming connection is readable This function MUST be safe for reentry via the queue callback! @@ -156,7 +162,7 @@ failed: static void queue_io_read(struct ctdb_queue *queue) { int num_ready = 0; - uint32_t pkt_size; + uint32_t pkt_size = 0; ssize_t nread; uint8_t *data; @@ -185,12 +191,12 @@ static void queue_io_read(struct ctdb_queue *queue) goto data_read; } - if (queue->buffer.length < sizeof(pkt_size)) { + if (sizeof(pkt_size) > queue->buffer.length) { /* data read is not sufficient to gather message size */ - goto data_read; + goto buffer_shift; } - pkt_size = *(uint32_t *)queue->buffer.data; + pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset); if (pkt_size > queue->buffer.size) { data = talloc_realloc_size(queue, queue->buffer.data, @@ -201,6 +207,21 @@ static void queue_io_read(struct ctdb_queue *queue) } queue->buffer.data = data; queue->buffer.size = pkt_size; + /* fall through here as we might need to move the data as well */ + } + +buffer_shift: + if (sizeof(pkt_size) > queue->buffer.size - queue->buffer.offset || + pkt_size > queue->buffer.size - queue->buffer.offset) { + /* Either the offset has progressed too far to host at least + * the size information or the remaining space in the buffer + * is not sufficient for the full message. + * Therefore, move the data and try again. + */ + memmove(queue->buffer.data, + queue->buffer.data + queue->buffer.offset, + queue->buffer.length); + queue->buffer.offset = 0; } data_read: @@ -208,7 +229,9 @@ data_read: if (num_ready > 0) { nread = sys_read(queue->fd, - queue->buffer.data + queue->buffer.length, + queue->buffer.data + + queue->buffer.offset + + queue->buffer.length, num_ready); if (nread <= 0) { DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread)); -- 2.34.1