ctdb: Introduce buffer.offset to avoid memmove
authorSwen Schillig <swen@vnet.ibm.com>
Mon, 12 Mar 2018 10:00:55 +0000 (11:00 +0100)
committerChristof Schmitt <cs@samba.org>
Fri, 7 Dec 2018 18:56:16 +0000 (19:56 +0100)
The memmove operation is quite expensive, therefore,
a new buffer attribute "offset" is introduced to support
an optimized buffer processing.
The optimization is to "walk" through the buffer and process
each packet until the buffer is fully processed (empty)
without requiring any memmove.
Only if a packet is in-complete, the buffer content is moved
and the new data is read from the queue.
This way almost all memmove operations are eliminated.

Signed-off-by: Swen Schillig <swen@vnet.ibm.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
Reviewed-by: Christof Schmitt <cs@samba.org>
ctdb/common/ctdb_io.c

index 32d8fc753a643f4d2525fad3d8cdac52320a40c7..5bed7a61b3177707f716e2d2f928173400f4d11d 100644 (file)
@@ -43,6 +43,7 @@ struct ctdb_buffer {
        uint8_t *data;
        uint32_t length;
        uint32_t size;
+       uint32_t offset;
 };
 
 struct ctdb_queue_pkt {
@@ -95,14 +96,14 @@ static void queue_process_event(struct tevent_context *ev, struct tevent_immedia
 static void queue_process(struct ctdb_queue *queue)
 {
        uint32_t pkt_size;
-       uint8_t *data;
+       uint8_t *data = NULL;
 
        if (queue->buffer.length < sizeof(pkt_size)) {
                return;
        }
 
        /* Did we at least read the size into the buffer */
-       pkt_size = *(uint32_t *)queue->buffer.data;
+       pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
        if (pkt_size == 0) {
                DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
                goto failed;
@@ -114,20 +115,26 @@ static void queue_process(struct ctdb_queue *queue)
        }
 
        /* Extract complete packet */
-       data = talloc_memdup(queue, queue->buffer.data, pkt_size);
+       data = talloc_memdup(queue,
+                            queue->buffer.data + queue->buffer.offset,
+                            pkt_size);
+
        if (data == NULL) {
                D_ERR("read error alloc failed for %u\n", pkt_size);
                return;
        }
 
-       /* Shift packet out from buffer */
-       if (queue->buffer.length > pkt_size) {
-               memmove(queue->buffer.data,
-                       queue->buffer.data + pkt_size,
-                       queue->buffer.length - pkt_size);
-       }
+       queue->buffer.offset += pkt_size;
        queue->buffer.length -= pkt_size;
 
+       if (queue->buffer.offset < pkt_size ||
+           queue->buffer.offset > queue->buffer.size) {
+               D_ERR("buffer offset overflow\n");
+               TALLOC_FREE(queue->buffer.data);
+               memset(&queue->buffer, 0, sizeof(queue->buffer));
+               goto failed;
+       }
+
        if (queue->buffer.length > 0) {
                /* There is more data to be processed, schedule an event */
                tevent_schedule_immediate(queue->im, queue->ctdb->ev,
@@ -137,6 +144,7 @@ static void queue_process(struct ctdb_queue *queue)
                        TALLOC_FREE(queue->buffer.data);
                        queue->buffer.size = 0;
                }
+               queue->buffer.offset = 0;
        }
 
        /* It is the responsibility of the callback to free 'data' */
@@ -145,10 +153,8 @@ static void queue_process(struct ctdb_queue *queue)
 
 failed:
        queue->callback(NULL, 0, queue->private_data);
-
 }
 
-
 /*
   called when an incoming connection is readable
   This function MUST be safe for reentry via the queue callback!
@@ -156,7 +162,7 @@ failed:
 static void queue_io_read(struct ctdb_queue *queue)
 {
        int num_ready = 0;
-       uint32_t pkt_size;
+       uint32_t pkt_size = 0;
        ssize_t nread;
        uint8_t *data;
 
@@ -185,12 +191,12 @@ static void queue_io_read(struct ctdb_queue *queue)
                goto data_read;
        }
 
-       if (queue->buffer.length < sizeof(pkt_size)) {
+       if (sizeof(pkt_size) > queue->buffer.length) {
                /* data read is not sufficient to gather message size */
-               goto data_read;
+               goto buffer_shift;
        }
 
-       pkt_size = *(uint32_t *)queue->buffer.data;
+       pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
        if (pkt_size > queue->buffer.size) {
                data = talloc_realloc_size(queue,
                                           queue->buffer.data,
@@ -201,6 +207,21 @@ static void queue_io_read(struct ctdb_queue *queue)
                }
                queue->buffer.data = data;
                queue->buffer.size = pkt_size;
+               /* fall through here as we might need to move the data as well */
+       }
+
+buffer_shift:
+       if (sizeof(pkt_size) > queue->buffer.size - queue->buffer.offset ||
+           pkt_size > queue->buffer.size - queue->buffer.offset) {
+               /* Either the offset has progressed too far to host at least
+                * the size information or the remaining space in the buffer
+                * is not sufficient for the full message.
+                * Therefore, move the data and try again.
+                */
+               memmove(queue->buffer.data,
+                       queue->buffer.data + queue->buffer.offset,
+                       queue->buffer.length);
+               queue->buffer.offset = 0;
        }
 
 data_read:
@@ -208,7 +229,9 @@ data_read:
 
        if (num_ready > 0) {
                nread = sys_read(queue->fd,
-                                queue->buffer.data + queue->buffer.length,
+                                queue->buffer.data +
+                                       queue->buffer.offset +
+                                       queue->buffer.length,
                                 num_ready);
                if (nread <= 0) {
                        DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));