new version 1.3.1
[sahlberg/ctdb.git] / common / ctdb_io.c
index 6a5aa928b04f98dda8fce57c9f658bbcfda79ebf..81f9451396b1e62e888c7903077f8de0fe076155 100644 (file)
@@ -6,29 +6,29 @@
 
    Copyright (C) Andrew Tridgell  2006
 
-   This library is free software; you can redistribute it and/or
-   modify it under the terms of the GNU Lesser General Public
-   License as published by the Free Software Foundation; either
-   version 2 of the License, or (at your option) any later version.
-
-   This library is distributed in the hope that it will be useful,
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   Lesser General Public License for more details.
-
-   You should have received a copy of the GNU Lesser General Public
-   License along with this library; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
 #include "lib/tdb/include/tdb.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "lib/util/dlinklist.h"
 #include "system/network.h"
 #include "system/filesys.h"
 #include "../include/ctdb_private.h"
-#include "../include/ctdb.h"
+#include "../include/ctdb_client.h"
+#include <stdarg.h>
 
 /* structures for packet queueing - see common/ctdb_io.c */
 struct ctdb_partial {
@@ -40,28 +40,52 @@ struct ctdb_queue_pkt {
        struct ctdb_queue_pkt *next, *prev;
        uint8_t *data;
        uint32_t length;
+       uint32_t full_length;
 };
 
 struct ctdb_queue {
        struct ctdb_context *ctdb;
        struct ctdb_partial partial; /* partial input packet */
-       struct ctdb_queue_pkt *out_queue;
+       struct ctdb_queue_pkt *out_queue, *out_queue_tail;
+       uint32_t out_queue_length;
        struct fd_event *fde;
        int fd;
        size_t alignment;
        void *private_data;
        ctdb_queue_cb_fn_t callback;
+       bool *destroyed;
+       const char *name;
 };
 
 
 
+int ctdb_queue_length(struct ctdb_queue *queue)
+{
+       return queue->out_queue_length;
+}
+
+static void dump_packet(unsigned char *data, size_t len)
+{
+       size_t i;
+       char *p = talloc_array(NULL, char, len*3 + 1);
+       if (!p) {
+               DEBUG(DEBUG_CRIT,("Packet talloc fail"));
+               return;
+       }
+
+       for (i = 0; i < len; i++)
+               sprintf(p + i*3, " %02x", data[i]);
+       DEBUG(DEBUG_CRIT,("Contents: %s\n", p));
+       talloc_free(p);
+}
+
 /*
   called when an incoming connection is readable
 */
 static void queue_io_read(struct ctdb_queue *queue)
 {
        int num_ready = 0;
-       ssize_t nread;
+       ssize_t nread, totread, partlen;
        uint8_t *data, *data_base;
 
        if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
@@ -77,17 +101,19 @@ static void queue_io_read(struct ctdb_queue *queue)
                                                  num_ready + queue->partial.length);
 
        if (queue->partial.data == NULL) {
-               DEBUG(0,("read error alloc failed for %u\n", 
-                        num_ready + queue->partial.length));
+               DEBUG(DEBUG_ERR,("%s: read error alloc failed for %u\n",
+                       queue->name, num_ready + queue->partial.length));
                goto failed;
        }
 
        nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
        if (nread <= 0) {
-               DEBUG(0,("read error nread=%d\n", nread));
+               DEBUG(DEBUG_ERR,("%s: read error nread=%d\n",
+                                queue->name, (int)nread));
                goto failed;
        }
-
+       totread = nread;
+       partlen = queue->partial.length;
 
        data = queue->partial.data;
        nread += queue->partial.length;
@@ -108,14 +134,32 @@ static void queue_io_read(struct ctdb_queue *queue)
                /* we have at least one packet */
                uint8_t *d2;
                uint32_t len;
+               bool destroyed = false;
+
                len = *(uint32_t *)data;
+               if (len == 0) {
+                       /* bad packet! treat as EOF */
+                       DEBUG(DEBUG_CRIT,("%s: Invalid packet of length 0 (nread = %zu, totread = %zu, partlen = %zu)\n",
+                                         queue->name, nread, totread, partlen));
+                       dump_packet(data_base, totread + partlen);
+                       goto failed;
+               }
                d2 = talloc_memdup(queue, data, len);
                if (d2 == NULL) {
-                       DEBUG(0,("read error memdup failed for %u\n", len));
+                       DEBUG(DEBUG_ERR,("%s: read error memdup failed for %u\n",
+                                        queue->name, len));
                        /* sigh */
                        goto failed;
                }
+
+               queue->destroyed = &destroyed;
                queue->callback(d2, len, queue->private_data);
+               /* If callback freed us, don't do anything else. */
+               if (destroyed) {
+                       return;
+               }
+               queue->destroyed = NULL;
+
                data += len;
                nread -= len;           
        }
@@ -128,8 +172,8 @@ static void queue_io_read(struct ctdb_queue *queue)
                } else {
                        queue->partial.data = talloc_memdup(queue, data, nread);
                        if (queue->partial.data == NULL) {
-                               DEBUG(0,("read error memdup partial failed for %u\n", 
-                                        nread));
+                               DEBUG(DEBUG_ERR,("%s: read error memdup partial failed for %u\n",
+                                                queue->name, (unsigned)nread));
                                goto failed;
                        }
                        queue->partial.length = nread;
@@ -170,9 +214,17 @@ static void queue_io_write(struct ctdb_queue *queue)
                }
 
                if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       if (pkt->length != pkt->full_length) {
+                               /* partial packet sent - we have to drop it */
+                               DLIST_REMOVE(queue->out_queue, pkt);
+                               queue->out_queue_length--;
+                               talloc_free(pkt);
+                       }
+                       talloc_free(queue->fde);
+                       queue->fde = NULL;
+                       queue->fd = -1;
                        event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
                                        queue_dead, queue);
-                       EVENT_FD_NOT_WRITEABLE(queue->fde);
                        return;
                }
                if (n <= 0) return;
@@ -184,6 +236,7 @@ static void queue_io_write(struct ctdb_queue *queue)
                }
 
                DLIST_REMOVE(queue->out_queue, pkt);
+               queue->out_queue_length--;
                talloc_free(pkt);
        }
 
@@ -212,15 +265,21 @@ static void queue_io_handler(struct event_context *ev, struct fd_event *fde,
 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
 {
        struct ctdb_queue_pkt *pkt;
-       uint32_t length2;
+       uint32_t length2, full_length;
 
-       /* enforce the length and alignment rules from the tcp packet allocator */
-       length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
-       *(uint32_t *)data = length2;
+       if (queue->alignment) {
+               /* enforce the length and alignment rules from the tcp packet allocator */
+               length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
+               *(uint32_t *)data = length2;
+       } else {
+               length2 = length;
+       }
 
        if (length2 != length) {
                memset(data+length, 0, length2-length);
        }
+
+       full_length = length2;
        
        /* if the queue is empty then try an immediate write, avoiding
           queue overhead. This relies on non-blocking sockets */
@@ -228,6 +287,9 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
            !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
                ssize_t n = write(queue->fd, data, length2);
                if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       talloc_free(queue->fde);
+                       queue->fde = NULL;
+                       queue->fd = -1;
                        event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
                                        queue_dead, queue);
                        /* yes, we report success, as the dead node is 
@@ -248,12 +310,38 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
        CTDB_NO_MEMORY(queue->ctdb, pkt->data);
 
        pkt->length = length2;
+       pkt->full_length = full_length;
 
        if (queue->out_queue == NULL && queue->fd != -1) {
                EVENT_FD_WRITEABLE(queue->fde);
        }
 
-       DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);
+       DLIST_ADD_END(queue->out_queue, pkt, NULL);
+
+       queue->out_queue_length++;
+
+       if (queue->ctdb->tunable.verbose_memory_names != 0) {
+               struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
+               switch (hdr->operation) {
+               case CTDB_REQ_CONTROL: {
+                       struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
+                       talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
+                                       queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
+                       break;
+               }
+               case CTDB_REQ_MESSAGE: {
+                       struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
+                       talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
+                                       queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
+                       break;
+               }
+               default:
+                       talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
+                                       queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
+                                       (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
+                       break;
+               }
+       }
 
        return 0;
 }
@@ -269,11 +357,12 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
        queue->fde = NULL;
 
        if (fd != -1) {
-               queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, 
+               queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
                                          queue_io_handler, queue);
                if (queue->fde == NULL) {
                        return -1;
                }
+               tevent_fd_set_auto_close(queue->fde);
 
                if (queue->out_queue) {
                        EVENT_FD_WRITEABLE(queue->fde);         
@@ -283,7 +372,13 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
        return 0;
 }
 
-
+/* If someone sets up this pointer, they want to know if the queue is freed */
+static int queue_destructor(struct ctdb_queue *queue)
+{
+       if (queue->destroyed != NULL)
+               *queue->destroyed = true;
+       return 0;
+}
 
 /*
   setup a packet queue on a socket
@@ -292,12 +387,17 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
                                    TALLOC_CTX *mem_ctx, int fd, int alignment,
                                    
                                    ctdb_queue_cb_fn_t callback,
-                                   void *private_data)
+                                   void *private_data, const char *fmt, ...)
 {
        struct ctdb_queue *queue;
+       va_list ap;
 
        queue = talloc_zero(mem_ctx, struct ctdb_queue);
        CTDB_NO_MEMORY_NULL(ctdb, queue);
+       va_start(ap, fmt);
+       queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
+       va_end(ap);
+       CTDB_NO_MEMORY_NULL(ctdb, queue->name);
 
        queue->ctdb = ctdb;
        queue->fd = fd;
@@ -310,6 +410,7 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
                        return NULL;
                }
        }
+       talloc_set_destructor(queue, queue_destructor);
 
        return queue;
 }