*/
#include "includes.h"
#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
#include "system/network.h"
#include "system/filesys.h"
-#include "ctdb_private.h"
-
+#include "../include/ctdb_private.h"
/*
queue a packet or die
/*
send an error reply
*/
+static void ctdb_send_error(struct ctdb_context *ctdb,
+ struct ctdb_req_header *hdr, uint32_t status,
+ const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
static void ctdb_send_error(struct ctdb_context *ctdb,
struct ctdb_req_header *hdr, uint32_t status,
const char *fmt, ...)
va_list ap;
struct ctdb_reply_error *r;
char *msg;
- int len;
+ int msglen, len;
va_start(ap, fmt);
msg = talloc_vasprintf(ctdb, fmt, ap);
}
va_end(ap);
- len = strlen(msg)+1;
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
+ msglen = strlen(msg)+1;
+ len = offsetof(struct ctdb_reply_error, msg);
+ r = ctdb->methods->allocate_pkt(ctdb, len + msglen);
CTDB_NO_MEMORY_FATAL(ctdb, r);
- r->hdr.length = sizeof(*r) + len;
+
+ r->hdr.length = len + msglen;
r->hdr.operation = CTDB_REPLY_ERROR;
r->hdr.destnode = hdr->srcnode;
r->hdr.srcnode = ctdb->vnn;
r->hdr.reqid = hdr->reqid;
r->status = status;
- r->msglen = len;
- memcpy(&r->msg[0], msg, len);
+ r->msglen = msglen;
+ memcpy(&r->msg[0], msg, msglen);
talloc_free(msg);
struct ctdb_req_dmaster *r;
int len;
- len = sizeof(*r) + key->dsize + data->dsize;
+ len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
r = ctdb->methods->allocate_pkt(ctdb, len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = len;
memcpy(&r->data[0], key->dptr, key->dsize);
memcpy(&r->data[key->dsize], data->dptr, data->dsize);
- if (r->hdr.destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+ if (r->hdr.destnode == ctdb->vnn) {
/* we are the lmaster - don't send to ourselves */
ctdb_request_dmaster(ctdb, &r->hdr);
} else {
{
struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
struct ctdb_reply_dmaster *r;
- TDB_DATA key, data;
+ TDB_DATA key, data, data2;
struct ctdb_ltdb_header header;
- int ret;
+ int ret, len;
key.dptr = c->data;
key.dsize = c->keylen;
data.dsize = c->datalen;
/* fetch the current record */
- ret = ctdb_ltdb_fetch(ctdb, key, &header, &data);
+ ret = ctdb_ltdb_fetch(ctdb, key, &header, &data2);
if (ret != 0) {
ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
return;
}
/* send the CTDB_REPLY_DMASTER */
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
+ len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize;
+ r = ctdb->methods->allocate_pkt(ctdb, len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
- r->hdr.length = sizeof(*r) + data.dsize;
+ r->hdr.length = len;
r->hdr.operation = CTDB_REPLY_DMASTER;
r->hdr.destnode = c->dmaster;
r->hdr.srcnode = ctdb->vnn;
r->datalen = data.dsize;
memcpy(&r->data[0], data.dptr, data.dsize);
- ctdb_queue_packet(ctdb, &r->hdr);
+ if (r->hdr.destnode == r->hdr.srcnode) {
+ ctdb_reply_dmaster(ctdb, &r->hdr);
+ } else {
+ ctdb_queue_packet(ctdb, &r->hdr);
+ }
talloc_free(r);
}
struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
TDB_DATA key, data, call_data, reply_data;
struct ctdb_reply_call *r;
- int ret;
+ int ret, len;
struct ctdb_ltdb_header header;
key.dptr = c->data;
/* if this nodes has done enough consecutive calls on the same record
then give them the record */
if (header.laccessor == c->hdr.srcnode &&
- header.lacount >= CTDB_MAX_LACOUNT) {
+ header.lacount >= ctdb->max_lacount) {
ctdb_call_send_dmaster(ctdb, c, &header, &key, &data);
talloc_free(data.dptr);
return;
call_data.dsize?&call_data:NULL,
&reply_data, c->hdr.srcnode);
- r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
+ len = offsetof(struct ctdb_reply_call, data) + reply_data.dsize;
+ r = ctdb->methods->allocate_pkt(ctdb, len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
- r->hdr.length = sizeof(*r) + reply_data.dsize;
+ r->hdr.length = len;
r->hdr.operation = CTDB_REPLY_CALL;
r->hdr.destnode = hdr->srcnode;
r->hdr.srcnode = hdr->destnode;
TDB_DATA reply_data;
state = idr_find(ctdb->idr, hdr->reqid);
+ if (state == NULL) return;
reply_data.dptr = c->data;
reply_data.dsize = c->datalen;
TDB_DATA data;
state = idr_find(ctdb->idr, hdr->reqid);
+ if (state == NULL) return;
data.dptr = c->data;
data.dsize = c->datalen;
struct ctdb_call_state *state;
state = idr_find(ctdb->idr, hdr->reqid);
+ if (state == NULL) return;
talloc_steal(state, c);
struct ctdb_call_state *state;
state = idr_find(ctdb->idr, hdr->reqid);
+ if (state == NULL) return;
talloc_steal(state, c);
{
struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
state->state = CTDB_CALL_ERROR;
- ctdb_set_error(state->node->ctdb, "ctdb_call timed out");
+ ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out",
+ state->c->hdr.reqid);
}
/*
state = talloc_zero(ctdb, struct ctdb_call_state);
CTDB_NO_MEMORY_NULL(ctdb, state);
- len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
+ len = offsetof(struct ctdb_req_call, data) + key.dsize + (call_data?call_data->dsize:0);
state->c = ctdb->methods->allocate_pkt(ctdb, len);
CTDB_NO_MEMORY_NULL(ctdb, state->c);
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
+#ifndef _CTDB_PRIVATE_H
+#define _CTDB_PRIVATE_H
+
+#include "ctdb.h"
/*
an installed ctdb remote call
const struct ctdb_methods *methods; /* transport methods */
const struct ctdb_upcalls *upcalls; /* transport upcalls */
void *private; /* private to transport */
+ unsigned max_lacount;
};
#define CTDB_NO_MEMORY(ctdb, p) do { if (!(p)) { \
/* number of consecutive calls from the same node before we give them
the record */
-#define CTDB_MAX_LACOUNT 7
+#define CTDB_DEFAULT_MAX_LACOUNT 7
/*
the extended header for records in the ltdb
uint32_t callid;
uint32_t keylen;
uint32_t calldatalen;
- uint8_t data[0]; /* key[] followed by calldata[] */
+ uint8_t data[1]; /* key[] followed by calldata[] */
};
struct ctdb_reply_call {
struct ctdb_req_header hdr;
uint32_t datalen;
- uint8_t data[0];
+ uint8_t data[1];
};
struct ctdb_reply_error {
struct ctdb_req_header hdr;
uint32_t status;
uint32_t msglen;
- uint8_t msg[0];
+ uint8_t msg[1];
};
struct ctdb_reply_redirect {
uint32_t dmaster;
uint32_t keylen;
uint32_t datalen;
- uint8_t data[0];
+ uint8_t data[1];
};
struct ctdb_reply_dmaster {
struct ctdb_req_header hdr;
uint32_t datalen;
- uint8_t data[0];
+ uint8_t data[1];
};
/* internal prototypes */
-void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...);
+void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) PRINTF_ATTRIBUTE(2,3);
void ctdb_fatal(struct ctdb_context *ctdb, const char *msg);
bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2);
int ctdb_parse_address(struct ctdb_context *ctdb,
struct ctdb_ltdb_header *header, TDB_DATA data);
+#endif
#include "includes.h"
#include "lib/events/events.h"
+#include "lib/util/dlinklist.h"
+#include "lib/tdb/include/tdb.h"
#include "system/network.h"
#include "system/filesys.h"
-#include "ctdb_private.h"
+#include "../include/ctdb_private.h"
#include "ctdb_tcp.h"
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
struct ctdb_tcp_packet *pkt;
+ uint32_t length2;
/* enforce the length and alignment rules from the tcp packet allocator */
- length = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
- *(uint32_t *)data = length;
+ length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
+ *(uint32_t *)data = length2;
+
+ if (length2 != length) {
+ memset(data+length, 0, length2-length);
+ }
/* if the queue is empty then try an immediate write, avoiding
queue overhead. This relies on non-blocking sockets */
if (tnode->queue == NULL && tnode->fd != -1) {
- ssize_t n = write(tnode->fd, data, length);
+ ssize_t n = write(tnode->fd, data, length2);
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
event_add_timed(node->ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_dead, node);
}
if (n > 0) {
data += n;
- length -= n;
+ length2 -= n;
}
- if (length == 0) return 0;
+ if (length2 == 0) return 0;
}
pkt = talloc(tnode, struct ctdb_tcp_packet);
CTDB_NO_MEMORY(node->ctdb, pkt);
- pkt->data = talloc_memdup(pkt, data, length);
+ pkt->data = talloc_memdup(pkt, data, length2);
CTDB_NO_MEMORY(node->ctdb, pkt->data);
- pkt->length = length;
+ pkt->length = length2;
if (tnode->queue == NULL && tnode->fd != -1) {
EVENT_FD_WRITEABLE(tnode->fde);