ctdb: Adding memory pool for queue callback
[amitay/samba.git] / ctdb / common / ctdb_io.c
index 2ca84212ce120a4ad0c60985325ca9287c780ce9..d86540762ea78d88b5d187029e175766d7151982 100644 (file)
@@ -6,34 +6,44 @@
 
    Copyright (C) Andrew Tridgell  2006
 
-   This library is free software; you can redistribute it and/or
-   modify it under the terms of the GNU Lesser General Public
-   License as published by the Free Software Foundation; either
-   version 2 of the License, or (at your option) any later version.
-
-   This library is distributed in the hope that it will be useful,
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   Lesser General Public License for more details.
-
-   You should have received a copy of the GNU Lesser General Public
-   License along with this library; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
 
-#include "includes.h"
-#include "lib/tdb/include/tdb.h"
-#include "lib/events/events.h"
-#include "lib/util/dlinklist.h"
+#include "replace.h"
 #include "system/network.h"
 #include "system/filesys.h"
-#include "../include/ctdb_private.h"
-#include "../include/ctdb.h"
+
+#include <tdb.h>
+#include <talloc.h>
+#include <tevent.h>
+
+#include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+#include "lib/util/sys_rw.h"
+
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+
+#include "common/logging.h"
+#include "common/common.h"
 
 /* structures for packet queueing - see common/ctdb_io.c */
-struct ctdb_partial {
+struct ctdb_buffer {
        uint8_t *data;
        uint32_t length;
+       uint32_t size;
+       uint32_t offset;
 };
 
 struct ctdb_queue_pkt {
@@ -41,110 +51,197 @@ struct ctdb_queue_pkt {
        uint8_t *data;
        uint32_t length;
        uint32_t full_length;
+       uint8_t buf[];
 };
 
 struct ctdb_queue {
        struct ctdb_context *ctdb;
-       struct ctdb_partial partial; /* partial input packet */
-       struct ctdb_queue_pkt *out_queue;
-       struct fd_event *fde;
+       struct tevent_immediate *im;
+       struct ctdb_buffer buffer; /* input buffer */
+       struct ctdb_queue_pkt *out_queue, *out_queue_tail;
+       uint32_t out_queue_length;
+       struct tevent_fd *fde;
        int fd;
        size_t alignment;
        void *private_data;
        ctdb_queue_cb_fn_t callback;
+       TALLOC_CTX *data_pool;
+       const char *name;
+       uint32_t buffer_size;
 };
 
 
 
+int ctdb_queue_length(struct ctdb_queue *queue)
+{
+       return queue->out_queue_length;
+}
+
+static void queue_process(struct ctdb_queue *queue);
+
+static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
+                               void *private_data)
+{
+       struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
+
+       queue_process(queue);
+}
+
 /*
-  called when an incoming connection is readable
-*/
-static void queue_io_read(struct ctdb_queue *queue)
+ * This function is used to process data in queue buffer.
+ *
+ * Queue callback function can end up freeing the queue, there should not be a
+ * loop processing packets from queue buffer.  Instead set up a timed event for
+ * immediate run to process remaining packets from buffer.
+ */
+static void queue_process(struct ctdb_queue *queue)
 {
-       int num_ready = 0;
-       ssize_t nread;
-       uint8_t *data, *data_base;
+       uint32_t pkt_size;
+       uint8_t *data = NULL;
 
-       if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
+       if (queue->buffer.length < sizeof(pkt_size)) {
                return;
        }
-       if (num_ready == 0) {
-               /* the descriptor has been closed */
+
+       /* Did we at least read the size into the buffer */
+       pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
+       if (pkt_size == 0) {
+               DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
                goto failed;
        }
 
+       /* the buffer doesn't contain the full packet, return to get the rest */
+       if (queue->buffer.length < pkt_size) {
+               return;
+       }
 
-       queue->partial.data = talloc_realloc_size(queue, queue->partial.data, 
-                                                 num_ready + queue->partial.length);
+       /* Extract complete packet */
+       data = talloc_memdup(queue->data_pool,
+                            queue->buffer.data + queue->buffer.offset,
+                            pkt_size);
 
-       if (queue->partial.data == NULL) {
-               DEBUG(0,("read error alloc failed for %u\n", 
-                        num_ready + queue->partial.length));
-               goto failed;
+       if (data == NULL) {
+               D_ERR("read error alloc failed for %u\n", pkt_size);
+               return;
        }
 
-       nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
-       if (nread <= 0) {
-               DEBUG(0,("read error nread=%d\n", nread));
+       queue->buffer.offset += pkt_size;
+       queue->buffer.length -= pkt_size;
+
+       if (queue->buffer.offset < pkt_size ||
+           queue->buffer.offset > queue->buffer.size) {
+               D_ERR("buffer offset overflow\n");
+               TALLOC_FREE(queue->buffer.data);
+               memset(&queue->buffer, 0, sizeof(queue->buffer));
                goto failed;
        }
 
+       if (queue->buffer.length > 0) {
+               /* There is more data to be processed, schedule an event */
+               tevent_schedule_immediate(queue->im, queue->ctdb->ev,
+                                         queue_process_event, queue);
+       } else {
+               if (queue->buffer.size > queue->buffer_size) {
+                       TALLOC_FREE(queue->buffer.data);
+                       queue->buffer.size = 0;
+               }
+               queue->buffer.offset = 0;
+       }
 
-       data = queue->partial.data;
-       nread += queue->partial.length;
+       /* It is the responsibility of the callback to free 'data' */
+       queue->callback(data, pkt_size, queue->private_data);
+       return;
 
-       queue->partial.data = NULL;
-       queue->partial.length = 0;
+failed:
+       queue->callback(NULL, 0, queue->private_data);
+}
 
-       if (nread >= 4 && *(uint32_t *)data == nread) {
-               /* it is the responsibility of the incoming packet
-                function to free 'data' */
-               queue->callback(data, nread, queue->private_data);
+/*
+  called when an incoming connection is readable
+  This function MUST be safe for reentry via the queue callback!
+*/
+static void queue_io_read(struct ctdb_queue *queue)
+{
+       int num_ready = 0;
+       uint32_t pkt_size = 0;
+       ssize_t nread;
+       uint8_t *data;
+
+       /* check how much data is available on the socket for immediately
+          guaranteed nonblocking access.
+          as long as we are careful never to try to read more than this
+          we know all reads will be successful and will neither block
+          nor fail with a "data not available right now" error
+       */
+       if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
                return;
        }
+       if (num_ready == 0) {
+               /* the descriptor has been closed */
+               goto failed;
+       }
 
-       data_base = data;
-
-       while (nread >= 4 && *(uint32_t *)data <= nread) {
-               /* we have at least one packet */
-               uint8_t *d2;
-               uint32_t len;
-               len = *(uint32_t *)data;
-               if (len == 0) {
-                       /* bad packet! treat as EOF */
-                       DEBUG(0,("Invalid packet of length 0\n"));
+       if (queue->buffer.data == NULL) {
+               /* starting fresh, allocate buf to read data */
+               queue->buffer.data = talloc_size(queue, queue->buffer_size);
+               if (queue->buffer.data == NULL) {
+                       DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
                        goto failed;
                }
-               d2 = talloc_memdup(queue, data, len);
-               if (d2 == NULL) {
-                       DEBUG(0,("read error memdup failed for %u\n", len));
-                       /* sigh */
+               queue->buffer.size = queue->buffer_size;
+               goto data_read;
+       }
+
+       if (sizeof(pkt_size) > queue->buffer.length) {
+               /* data read is not sufficient to gather message size */
+               goto buffer_shift;
+       }
+
+       pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
+       if (pkt_size > queue->buffer.size) {
+               data = talloc_realloc_size(queue,
+                                          queue->buffer.data,
+                                          pkt_size);
+               if (data == NULL) {
+                       DBG_ERR("read error realloc failed for %u\n", pkt_size);
                        goto failed;
                }
-               queue->callback(d2, len, queue->private_data);
-               data += len;
-               nread -= len;           
+               queue->buffer.data = data;
+               queue->buffer.size = pkt_size;
+               /* fall through here as we might need to move the data as well */
        }
 
-       if (nread > 0) {
-               /* we have only part of a packet */
-               if (data_base == data) {
-                       queue->partial.data = data;
-                       queue->partial.length = nread;
-               } else {
-                       queue->partial.data = talloc_memdup(queue, data, nread);
-                       if (queue->partial.data == NULL) {
-                               DEBUG(0,("read error memdup partial failed for %u\n", 
-                                        nread));
-                               goto failed;
-                       }
-                       queue->partial.length = nread;
-                       talloc_free(data_base);
+buffer_shift:
+       if (sizeof(pkt_size) > queue->buffer.size - queue->buffer.offset ||
+           pkt_size > queue->buffer.size - queue->buffer.offset) {
+               /* Either the offset has progressed too far to host at least
+                * the size information or the remaining space in the buffer
+                * is not sufficient for the full message.
+                * Therefore, move the data and try again.
+                */
+               memmove(queue->buffer.data,
+                       queue->buffer.data + queue->buffer.offset,
+                       queue->buffer.length);
+               queue->buffer.offset = 0;
+       }
+
+data_read:
+       num_ready = MIN(num_ready, queue->buffer.size - queue->buffer.length);
+
+       if (num_ready > 0) {
+               nread = sys_read(queue->fd,
+                                queue->buffer.data +
+                                       queue->buffer.offset +
+                                       queue->buffer.length,
+                                num_ready);
+               if (nread <= 0) {
+                       DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
+                       goto failed;
                }
-               return;
+               queue->buffer.length += nread;
        }
 
-       talloc_free(data_base);
+       queue_process(queue);
        return;
 
 failed:
@@ -153,8 +250,8 @@ failed:
 
 
 /* used when an event triggers a dead queue */
-static void queue_dead(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void queue_dead(struct tevent_context *ev, struct tevent_immediate *im,
+                      void *private_data)
 {
        struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
        queue->callback(NULL, 0, queue->private_data);
@@ -179,13 +276,14 @@ static void queue_io_write(struct ctdb_queue *queue)
                        if (pkt->length != pkt->full_length) {
                                /* partial packet sent - we have to drop it */
                                DLIST_REMOVE(queue->out_queue, pkt);
+                               queue->out_queue_length--;
                                talloc_free(pkt);
                        }
                        talloc_free(queue->fde);
                        queue->fde = NULL;
                        queue->fd = -1;
-                       event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
-                                       queue_dead, queue);
+                       tevent_schedule_immediate(queue->im, queue->ctdb->ev,
+                                                 queue_dead, queue);
                        return;
                }
                if (n <= 0) return;
@@ -197,21 +295,22 @@ static void queue_io_write(struct ctdb_queue *queue)
                }
 
                DLIST_REMOVE(queue->out_queue, pkt);
+               queue->out_queue_length--;
                talloc_free(pkt);
        }
 
-       EVENT_FD_NOT_WRITEABLE(queue->fde);
+       TEVENT_FD_NOT_WRITEABLE(queue->fde);
 }
 
 /*
   called when an incoming connection is readable or writeable
 */
-static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
+static void queue_io_handler(struct tevent_context *ev, struct tevent_fd *fde,
                             uint16_t flags, void *private_data)
 {
        struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
 
-       if (flags & EVENT_FD_READ) {
+       if (flags & TEVENT_FD_READ) {
                queue_io_read(queue);
        } else {
                queue_io_write(queue);
@@ -224,9 +323,15 @@ static void queue_io_handler(struct event_context *ev, struct fd_event *fde,
 */
 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
 {
+       struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
        struct ctdb_queue_pkt *pkt;
        uint32_t length2, full_length;
 
+       /* If the queue does not have valid fd, no point queueing a packet */
+       if (queue->fd == -1) {
+               return 0;
+       }
+
        if (queue->alignment) {
                /* enforce the length and alignment rules from the tcp packet allocator */
                length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
@@ -250,8 +355,8 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
                        talloc_free(queue->fde);
                        queue->fde = NULL;
                        queue->fd = -1;
-                       event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
-                                       queue_dead, queue);
+                       tevent_schedule_immediate(queue->im, queue->ctdb->ev,
+                                                 queue_dead, queue);
                        /* yes, we report success, as the dead node is 
                           handled via a separate event */
                        return 0;
@@ -263,20 +368,46 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
                if (length2 == 0) return 0;
        }
 
-       pkt = talloc(queue, struct ctdb_queue_pkt);
+       pkt = talloc_size(
+               queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
        CTDB_NO_MEMORY(queue->ctdb, pkt);
+       talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
 
-       pkt->data = talloc_memdup(pkt, data, length2);
-       CTDB_NO_MEMORY(queue->ctdb, pkt->data);
+       pkt->data = pkt->buf;
+       memcpy(pkt->data, data, length2);
 
        pkt->length = length2;
        pkt->full_length = full_length;
 
        if (queue->out_queue == NULL && queue->fd != -1) {
-               EVENT_FD_WRITEABLE(queue->fde);
+               TEVENT_FD_WRITEABLE(queue->fde);
        }
 
-       DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);
+       DLIST_ADD_END(queue->out_queue, pkt);
+
+       queue->out_queue_length++;
+
+       if (queue->ctdb->tunable.verbose_memory_names != 0) {
+               switch (hdr->operation) {
+               case CTDB_REQ_CONTROL: {
+                       struct ctdb_req_control_old *c = (struct ctdb_req_control_old *)hdr;
+                       talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
+                                       queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
+                       break;
+               }
+               case CTDB_REQ_MESSAGE: {
+                       struct ctdb_req_message_old *m = (struct ctdb_req_message_old *)hdr;
+                       talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
+                                       queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
+                       break;
+               }
+               default:
+                       talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
+                                       queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
+                                       (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
+                       break;
+               }
+       }
 
        return 0;
 }
@@ -292,35 +423,42 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
        queue->fde = NULL;
 
        if (fd != -1) {
-               queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
-                                         queue_io_handler, queue);
+               queue->fde = tevent_add_fd(queue->ctdb->ev, queue, fd,
+                                          TEVENT_FD_READ,
+                                          queue_io_handler, queue);
                if (queue->fde == NULL) {
                        return -1;
                }
+               tevent_fd_set_auto_close(queue->fde);
 
                if (queue->out_queue) {
-                       EVENT_FD_WRITEABLE(queue->fde);         
+                       TEVENT_FD_WRITEABLE(queue->fde);
                }
        }
 
        return 0;
 }
 
-
-
 /*
   setup a packet queue on a socket
  */
 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
                                    TALLOC_CTX *mem_ctx, int fd, int alignment,
-                                   
                                    ctdb_queue_cb_fn_t callback,
-                                   void *private_data)
+                                   void *private_data, const char *fmt, ...)
 {
        struct ctdb_queue *queue;
+       va_list ap;
 
        queue = talloc_zero(mem_ctx, struct ctdb_queue);
        CTDB_NO_MEMORY_NULL(ctdb, queue);
+       va_start(ap, fmt);
+       queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
+       va_end(ap);
+       CTDB_NO_MEMORY_NULL(ctdb, queue->name);
+
+       queue->im= tevent_create_immediate(queue);
+       CTDB_NO_MEMORY_NULL(ctdb, queue->im);
 
        queue->ctdb = ctdb;
        queue->fd = fd;
@@ -334,5 +472,19 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
                }
        }
 
+       queue->buffer_size = ctdb->tunable.queue_buffer_size;
+       /* In client code, ctdb->tunable is not initialized.
+        * This does not affect recovery daemon.
+        */
+       if (queue->buffer_size == 0) {
+               queue->buffer_size = 1024;
+       }
+
+       queue->data_pool = talloc_pool(queue, queue->buffer_size);
+       if (queue->data_pool == NULL) {
+               TALLOC_FREE(queue);
+               return NULL;
+       }
+
        return queue;
 }