#include "includes.h"
#include "lib/tdb/include/tdb.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
#include "lib/util/dlinklist.h"
#include "system/network.h"
#include "system/filesys.h"
#include "../include/ctdb_private.h"
-#include "../include/ctdb.h"
+#include "../include/ctdb_client.h"
+#include <stdarg.h>
/* structures for packet queueing - see common/ctdb_io.c */
struct ctdb_partial {
struct ctdb_queue {
struct ctdb_context *ctdb;
struct ctdb_partial partial; /* partial input packet */
- struct ctdb_queue_pkt *out_queue;
+ struct ctdb_queue_pkt *out_queue, *out_queue_tail;
+ uint32_t out_queue_length;
struct fd_event *fde;
int fd;
size_t alignment;
void *private_data;
ctdb_queue_cb_fn_t callback;
+ bool *destroyed;
+ const char *name;
};
+int ctdb_queue_length(struct ctdb_queue *queue)
+{
+ return queue->out_queue_length;
+}
+
+static void dump_packet(unsigned char *data, size_t len)
+{
+ size_t i;
+ char *p = talloc_array(NULL, char, len*3 + 1);
+ if (!p) {
+ DEBUG(DEBUG_CRIT,("Packet talloc fail"));
+ return;
+ }
+
+ for (i = 0; i < len; i++)
+ sprintf(p + i*3, " %02x", data[i]);
+ DEBUG(DEBUG_CRIT,("Contents: %s\n", p));
+ talloc_free(p);
+}
+
/*
called when an incoming connection is readable
*/
static void queue_io_read(struct ctdb_queue *queue)
{
int num_ready = 0;
- ssize_t nread;
+ ssize_t nread, totread, partlen;
uint8_t *data, *data_base;
if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
num_ready + queue->partial.length);
if (queue->partial.data == NULL) {
- DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
- num_ready + queue->partial.length));
+ DEBUG(DEBUG_ERR,("%s: read error alloc failed for %u\n",
+ queue->name, num_ready + queue->partial.length));
goto failed;
}
nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
if (nread <= 0) {
- DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread));
+ DEBUG(DEBUG_ERR,("%s: read error nread=%d\n",
+ queue->name, (int)nread));
goto failed;
}
-
+ totread = nread;
+ partlen = queue->partial.length;
data = queue->partial.data;
nread += queue->partial.length;
/* we have at least one packet */
uint8_t *d2;
uint32_t len;
+ bool destroyed = false;
+
len = *(uint32_t *)data;
if (len == 0) {
/* bad packet! treat as EOF */
- DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n"));
+ DEBUG(DEBUG_CRIT,("%s: Invalid packet of length 0 (nread = %zu, totread = %zu, partlen = %zu)\n",
+ queue->name, nread, totread, partlen));
+ dump_packet(data_base, totread + partlen);
goto failed;
}
d2 = talloc_memdup(queue, data, len);
if (d2 == NULL) {
- DEBUG(DEBUG_ERR,("read error memdup failed for %u\n", len));
+ DEBUG(DEBUG_ERR,("%s: read error memdup failed for %u\n",
+ queue->name, len));
/* sigh */
goto failed;
}
+
+ queue->destroyed = &destroyed;
queue->callback(d2, len, queue->private_data);
+ /* If callback freed us, don't do anything else. */
+ if (destroyed) {
+ return;
+ }
+ queue->destroyed = NULL;
+
data += len;
nread -= len;
}
} else {
queue->partial.data = talloc_memdup(queue, data, nread);
if (queue->partial.data == NULL) {
- DEBUG(DEBUG_ERR,("read error memdup partial failed for %u\n",
- (unsigned)nread));
+ DEBUG(DEBUG_ERR,("%s: read error memdup partial failed for %u\n",
+ queue->name, (unsigned)nread));
goto failed;
}
queue->partial.length = nread;
if (pkt->length != pkt->full_length) {
/* partial packet sent - we have to drop it */
DLIST_REMOVE(queue->out_queue, pkt);
+ queue->out_queue_length--;
talloc_free(pkt);
}
talloc_free(queue->fde);
}
DLIST_REMOVE(queue->out_queue, pkt);
+ queue->out_queue_length--;
talloc_free(pkt);
}
EVENT_FD_WRITEABLE(queue->fde);
}
- DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);
+ DLIST_ADD_END(queue->out_queue, pkt, NULL);
+
+ queue->out_queue_length++;
if (queue->ctdb->tunable.verbose_memory_names != 0) {
struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
switch (hdr->operation) {
case CTDB_REQ_CONTROL: {
struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
- talloc_set_name(pkt, "ctdb_queue_pkt: control opcode=%u srvid=%llu datalen=%u",
- (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
+ talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
+ queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
break;
}
case CTDB_REQ_MESSAGE: {
struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
- talloc_set_name(pkt, "ctdb_queue_pkt: message srvid=%llu datalen=%u",
- (unsigned long long)m->srvid, (unsigned)m->datalen);
+ talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
+ queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
break;
}
default:
- talloc_set_name(pkt, "ctdb_queue_pkt: operation=%u length=%u src=%u dest=%u",
- (unsigned)hdr->operation, (unsigned)hdr->length,
+ talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
+ queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
(unsigned)hdr->srcnode, (unsigned)hdr->destnode);
break;
}
queue->fde = NULL;
if (fd != -1) {
- queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
+ queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
queue_io_handler, queue);
if (queue->fde == NULL) {
return -1;
}
+ tevent_fd_set_auto_close(queue->fde);
if (queue->out_queue) {
EVENT_FD_WRITEABLE(queue->fde);
return 0;
}
-
+/* If someone sets up this pointer, they want to know if the queue is freed */
+static int queue_destructor(struct ctdb_queue *queue)
+{
+ if (queue->destroyed != NULL)
+ *queue->destroyed = true;
+ return 0;
+}
/*
setup a packet queue on a socket
TALLOC_CTX *mem_ctx, int fd, int alignment,
ctdb_queue_cb_fn_t callback,
- void *private_data)
+ void *private_data, const char *fmt, ...)
{
struct ctdb_queue *queue;
+ va_list ap;
queue = talloc_zero(mem_ctx, struct ctdb_queue);
CTDB_NO_MEMORY_NULL(ctdb, queue);
+ va_start(ap, fmt);
+ queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
+ va_end(ap);
+ CTDB_NO_MEMORY_NULL(ctdb, queue->name);
queue->ctdb = ctdb;
queue->fd = fd;
return NULL;
}
}
+ talloc_set_destructor(queue, queue_destructor);
return queue;
}