/*
queue a packet or die
*/
-static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+static inline void ctdb_queue_packet(struct ctdb_node *node, struct ctdb_req_header *hdr)
{
- struct ctdb_node *node;
- node = ctdb->nodes[hdr->destnode];
- if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
- ctdb_fatal(ctdb, "Unable to queue packet\n");
+ if (node->ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
+ ctdb_fatal(node->ctdb, "Unable to queue packet\n");
}
}
struct ctdb_reply_error *r;
char *msg;
int len;
+ struct ctdb_node *node;
+
+ node = ctdb->nodes[hdr->srcnode];
va_start(ap, fmt);
msg = talloc_vasprintf(ctdb, fmt, ap);
va_end(ap);
len = strlen(msg)+1;
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
+ r = ctdb->methods->allocate_pkt(node, sizeof(*r) + len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + len;
r->hdr.operation = CTDB_REPLY_ERROR;
talloc_free(msg);
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
+ ctdb_queue_packet(node, &r->hdr);
+ ctdb->methods->dealloc_pkt(node, r);
}
struct ctdb_ltdb_header *header)
{
struct ctdb_reply_redirect *r;
+ struct ctdb_node *node;
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
+ node = ctdb->nodes[c->hdr.srcnode];
+
+ r = ctdb->methods->allocate_pkt(node, sizeof(*r));
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r);
r->hdr.operation = CTDB_REPLY_REDIRECT;
r->hdr.reqid = c->hdr.reqid;
r->dmaster = header->dmaster;
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
+ ctdb_queue_packet(node, &r->hdr);
+ ctdb->methods->dealloc_pkt(node, r);
}
/*
{
struct ctdb_req_dmaster *r;
int len;
+ struct ctdb_node *node;
+ uint32_t destnode;
+
+ destnode = ctdb_lmaster(ctdb, key);
+ node = ctdb->nodes[destnode];
len = sizeof(*r) + key->dsize + data->dsize;
- r = ctdb->methods->allocate_pkt(ctdb, len);
+ r = ctdb->methods->allocate_pkt(node, len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = len;
r->hdr.operation = CTDB_REQ_DMASTER;
- r->hdr.destnode = ctdb_lmaster(ctdb, key);
+ r->hdr.destnode = destnode;
r->hdr.srcnode = ctdb->vnn;
r->hdr.reqid = c->hdr.reqid;
r->dmaster = header->laccessor;
/* we are the lmaster - don't send to ourselves */
ctdb_request_dmaster(ctdb, &r->hdr);
} else {
- ctdb_queue_packet(ctdb, &r->hdr);
+ ctdb_queue_packet(node, &r->hdr);
/* update the ltdb to record the new dmaster */
header->dmaster = r->hdr.destnode;
ctdb_ltdb_store(ctdb, *key, header, *data);
}
- talloc_free(r);
+ ctdb->methods->dealloc_pkt(node, r);
}
TDB_DATA key, data;
struct ctdb_ltdb_header header;
int ret;
+ struct ctdb_node *node;
+ node = ctdb->nodes[c->dmaster];
key.dptr = c->data;
key.dsize = c->keylen;
data.dptr = c->data + c->keylen;
}
/* send the CTDB_REPLY_DMASTER */
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
+ r = ctdb->methods->allocate_pkt(node, sizeof(*r) + data.dsize);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + data.dsize;
r->hdr.operation = CTDB_REPLY_DMASTER;
r->datalen = data.dsize;
memcpy(&r->data[0], data.dptr, data.dsize);
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
+ ctdb_queue_packet(node, &r->hdr);
+ ctdb->methods->dealloc_pkt(node, r);
}
struct ctdb_reply_call *r;
int ret;
struct ctdb_ltdb_header header;
+ struct ctdb_node *node;
+ node = ctdb->nodes[hdr->srcnode];
key.dptr = c->data;
key.dsize = c->keylen;
call_data.dptr = c->data + c->keylen;
call_data.dsize?&call_data:NULL,
&reply_data, c->hdr.srcnode);
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
+ r = ctdb->methods->allocate_pkt(node, sizeof(*r) + reply_data.dsize);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + reply_data.dsize;
r->hdr.operation = CTDB_REPLY_CALL;
r->datalen = reply_data.dsize;
memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
- ctdb_queue_packet(ctdb, &r->hdr);
+ ctdb_queue_packet(node, &r->hdr);
+ ctdb->methods->dealloc_pkt(node, r);
talloc_free(reply_data.dptr);
- talloc_free(r);
}
enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
{
struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
struct ctdb_call_state *state;
-
+ struct ctdb_node *node;
+#ifdef USE_INFINIBAND
+ uint8_t *r;
+#endif /* USE_INFINIBAND */
state = idr_find(ctdb->idr, hdr->reqid);
talloc_steal(state, c);
/* send it off again */
state->node = ctdb->nodes[c->dmaster];
- ctdb_queue_packet(ctdb, &state->c->hdr);
+ node = ctdb->nodes[state->c->hdr.destnode];
+
+#ifdef USE_INFINIBAND
+ r = ctdb->methods->allocate_pkt(node, state->c->hdr.length);
+ memcpy(r, &state->c->hdr, state->c->hdr.length);
+#endif /* USE_INFINIBAND */
+
+ ctdb_queue_packet(node, &state->c->hdr);
+
+#ifdef USE_INFINIBAND
+ ctdb->methods->dealloc_pkt(node, r);
+#endif /* USE_INFINIBAND */
}
/*
int ret;
struct ctdb_ltdb_header header;
TDB_DATA data;
+ struct ctdb_node *node;
/*
if we are the dmaster for this key then we don't need to
state = talloc_zero(ctdb, struct ctdb_call_state);
CTDB_NO_MEMORY_NULL(ctdb, state);
+ node = ctdb->nodes[header.dmaster];
len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
- state->c = ctdb->methods->allocate_pkt(ctdb, len);
+ state->c = ctdb->methods->allocate_pkt(node, len);
CTDB_NO_MEMORY_NULL(ctdb, state->c);
state->c->hdr.length = len;
talloc_set_destructor(state, ctdb_call_destructor);
- ctdb_queue_packet(ctdb, &state->c->hdr);
+ ctdb_queue_packet(node, &state->c->hdr);
+
+#ifdef USE_INFINIBAND
+ ctdb->methods->dealloc_pkt(node, state->c);
+ state->c = NULL;
+#endif /* USE_INFINIBAND */
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
ctdb_call_timeout, state);
#include "ibwrapper.h"
#include "ibw_ctdb.h"
+/* not nice; temporary workaround for the current implementation... */
+static void *last_key = NULL;
+
static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog)
{
struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx);
/*
* transport packet allocator - allows transport to control memory for packets
*/
-static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+static void *ctdb_ibw_allocate_pkt(struct ctdb_node *node, size_t size)
{
struct ibw_conn *conn = NULL;
void *buf = NULL;
- void *key; /* TODO: expand the param list with this */
- /* TODO2: !!! I need "node" or ibw_conn here */
- if (ibw_alloc_send_buf(conn, &buf, &key, (int)size))
+ if (ibw_alloc_send_buf(conn, &buf, &last_key, size))
return NULL;
return buf;
static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
{
struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
- void *key = NULL; /* TODO: expand the param list with this */
+ int rc;
+
+ rc = ibw_send(conn, data, last_key, length);
+ last_key = NULL;
+
+ return rc;
+}
- assert(conn!=NULL);
- return ibw_send(conn, data, key, length);
+static void ctdb_ibw_dealloc_pkt(struct ctdb_node *node, void *data)
+{
+ if (last_key) {
+ struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
+
+ assert(conn!=NULL);
+ ibw_cancel_send_buf(conn, data, last_key);
+ } /* else ibw_send is already using it and will free it after completion */
+}
+
+static int ctdb_ibw_stop(struct ctdb_context *cctx)
+{
+ struct ibw_ctx *ictx = talloc_get_type(cctx->private, struct ibw_ctx);
+
+ assert(ictx!=NULL);
+ return ibw_stop(ictx);
}
static const struct ctdb_methods ctdb_ibw_methods = {
.start = ctdb_ibw_start,
.add_node = ctdb_ibw_add_node,
.queue_pkt = ctdb_ibw_queue_pkt,
- .allocate_pkt = ctdb_ibw_allocate_pkt
-
-// .dealloc_pkt = ctdb_ibw_dealloc_pkt
-// .stop = ctdb_ibw_stop
+ .allocate_pkt = ctdb_ibw_allocate_pkt,
+
+ .dealloc_pkt = ctdb_ibw_dealloc_pkt,
+ .stop = ctdb_ibw_stop
};
/*
int ctdb_ibw_init(struct ctdb_context *ctdb)
{
struct ibw_ctx *ictx;
-
+
ictx = ibw_init(
NULL, //struct ibw_initattr *attr, /* TODO */
0, //int nattr, /* TODO */