This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
+ the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/filesys.h"
#include "messaging/messaging.h"
-#include "lib/util/dlinklist.h"
+#include "../lib/util/dlinklist.h"
#include "lib/socket/socket.h"
#include "librpc/gen_ndr/ndr_irpc.h"
#include "lib/messaging/irpc.h"
-#include "db_wrap.h"
-#include "lib/util/unix_privs.h"
+#include "tdb_wrap.h"
+#include "../lib/util/unix_privs.h"
#include "librpc/rpc/dcerpc.h"
-#include "lib/tdb/include/tdb.h"
-#include "lib/util/util_tdb.h"
-#include "lib/util/util_tdb.h"
+#include "../tdb/include/tdb.h"
+#include "../lib/util/util_tdb.h"
#include "cluster/cluster.h"
/* change the message version with any incompatible changes in the protocol */
uint32_t num_types;
struct idr_context *dispatch_tree;
struct messaging_rec *pending;
+ struct messaging_rec *retry_queue;
+ struct smb_iconv_convenience *iconv_convenience;
struct irpc_list *irpc;
struct idr_context *idr;
const char **names;
struct timeval start_time;
-
+ struct tevent_timer *retry_te;
struct {
- struct event_context *ev;
- struct fd_event *fde;
+ struct tevent_context *ev;
+ struct tevent_fd *fde;
} event;
};
struct dispatch_fn {
struct dispatch_fn *next, *prev;
uint32_t msg_type;
- void *private;
+ void *private_data;
msg_callback_t fn;
};
} *header;
DATA_BLOB packet;
+ uint32_t retries;
};
/*
A useful function for testing the message system.
*/
-static void ping_message(struct messaging_context *msg, void *private,
+static void ping_message(struct messaging_context *msg, void *private_data,
uint32_t msg_type, struct server_id src, DATA_BLOB *data)
{
DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n",
static NTSTATUS irpc_uptime(struct irpc_message *msg,
struct irpc_uptime *r)
{
- struct messaging_context *ctx = talloc_get_type(msg->private, struct messaging_context);
+ struct messaging_context *ctx = talloc_get_type(msg->private_data, struct messaging_context);
*r->out.start_time = timeval_to_nttime(&ctx->start_time);
return NT_STATUS_OK;
}
*/
static char *messaging_path(struct messaging_context *msg, struct server_id server_id)
{
- return talloc_asprintf(msg, "%s/msg.%u.%u", msg->base_path,
- (unsigned)server_id.node, (unsigned)server_id.id);
+ TALLOC_CTX *tmp_ctx = talloc_new(msg);
+ const char *id = cluster_id_string(tmp_ctx, server_id);
+ char *s;
+ if (id == NULL) {
+ return NULL;
+ }
+ s = talloc_asprintf(msg, "%s/msg.%s", msg->base_path, id);
+ talloc_steal(s, tmp_ctx);
+ return s;
}
/*
/* temporary IDs use an idtree, the rest use a array of pointers */
if (rec->header->msg_type >= MSG_TMP_BASE) {
- d = idr_find(msg->dispatch_tree, rec->header->msg_type);
+ d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
+ rec->header->msg_type);
} else if (rec->header->msg_type < msg->num_types) {
d = msg->dispatch[rec->header->msg_type];
} else {
next = d->next;
data.data = rec->packet.data + sizeof(*rec->header);
data.length = rec->header->length;
- d->fn(msg, d->private, d->msg_type, rec->header->from, &data);
+ d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data);
}
rec->header->length = 0;
}
rec->path = msg->path;
rec->header = (struct messaging_header *)packet.data;
rec->packet = packet;
+ rec->retries = 0;
if (packet.length != sizeof(*rec->header) + rec->header->length) {
DEBUG(0,("messaging: bad message header size %d should be %d\n",
return status;
}
+/*
+ retry backed off messages
+*/
+static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te,
+ struct timeval t, void *private_data)
+{
+ struct messaging_context *msg = talloc_get_type(private_data,
+ struct messaging_context);
+ msg->retry_te = NULL;
+
+ /* put the messages back on the main queue */
+ while (msg->retry_queue) {
+ struct messaging_rec *rec = msg->retry_queue;
+ DLIST_REMOVE(msg->retry_queue, rec);
+ DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);
+ }
+
+ EVENT_FD_WRITEABLE(msg->event.fde);
+}
+
/*
handle a socket write event
*/
NTSTATUS status;
status = try_send(rec);
if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+ rec->retries++;
+ if (rec->retries > 3) {
+ /* we're getting continuous write errors -
+ backoff this record */
+ DLIST_REMOVE(msg->pending, rec);
+ DLIST_ADD_END(msg->retry_queue, rec,
+ struct messaging_rec *);
+ if (msg->retry_te == NULL) {
+ msg->retry_te =
+ event_add_timed(msg->event.ev, msg,
+ timeval_current_ofs(1, 0),
+ msg_retry_timer, msg);
+ }
+ }
break;
}
+ rec->retries = 0;
if (!NT_STATUS_IS_OK(status)) {
- DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n",
- rec->header->from.id, rec->header->to.id, rec->header->msg_type,
+ DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n",
+ cluster_id_string(debug_ctx(), rec->header->from),
+ cluster_id_string(debug_ctx(), rec->header->to),
+ rec->header->msg_type,
nt_errstr(status)));
}
DLIST_REMOVE(msg->pending, rec);
rec->path = msg->path;
rec->header = (struct messaging_header *)packet.data;
rec->packet = packet;
+ rec->retries = 0;
if (msize != sizeof(*rec->header) + rec->header->length) {
DEBUG(0,("messaging: bad message header size %d should be %d\n",
/*
handle a socket event
*/
-static void messaging_handler(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private)
+static void messaging_handler(struct tevent_context *ev, struct tevent_fd *fde,
+ uint16_t flags, void *private_data)
{
- struct messaging_context *msg = talloc_get_type(private,
+ struct messaging_context *msg = talloc_get_type(private_data,
struct messaging_context);
if (flags & EVENT_FD_WRITE) {
messaging_send_handler(msg);
/*
Register a dispatch function for a particular message type.
*/
-NTSTATUS messaging_register(struct messaging_context *msg, void *private,
+NTSTATUS messaging_register(struct messaging_context *msg, void *private_data,
uint32_t msg_type, msg_callback_t fn)
{
struct dispatch_fn *d;
d = talloc_zero(msg->dispatch, struct dispatch_fn);
NT_STATUS_HAVE_NO_MEMORY(d);
d->msg_type = msg_type;
- d->private = private;
+ d->private_data = private_data;
d->fn = fn;
DLIST_ADD(msg->dispatch[msg_type], d);
register a temporary message handler. The msg_type is allocated
above MSG_TMP_BASE
*/
-NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private,
+NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private_data,
msg_callback_t fn, uint32_t *msg_type)
{
struct dispatch_fn *d;
d = talloc_zero(msg->dispatch, struct dispatch_fn);
NT_STATUS_HAVE_NO_MEMORY(d);
- d->private = private;
+ d->private_data = private_data;
d->fn = fn;
id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX);
/*
De-register the function for a particular message type.
*/
-void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private)
+void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private_data)
{
struct dispatch_fn *d, *next;
if (msg_type >= msg->num_types) {
- d = idr_find(msg->dispatch_tree, msg_type);
+ d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
+ msg_type);
if (!d) return;
idr_remove(msg->dispatch_tree, msg_type);
talloc_free(d);
for (d = msg->dispatch[msg_type]; d; d = next) {
next = d->next;
- if (d->private == private) {
+ if (d->private_data == private_data) {
DLIST_REMOVE(msg->dispatch[msg_type], d);
talloc_free(d);
}
return NT_STATUS_NO_MEMORY;
}
+ rec->retries = 0;
rec->msg = msg;
rec->header = (struct messaging_header *)rec->packet.data;
+ /* zero padding */
+ ZERO_STRUCTP(rec->header);
rec->header->version = MESSAGING_VERSION;
rec->header->msg_type = msg_type;
rec->header->from = msg->server_id;
{
DATA_BLOB blob;
- blob.data = (void *)&ptr;
+ blob.data = (uint8_t *)&ptr;
blob.length = sizeof(void *);
return messaging_send(msg, server, msg_type, &blob);
create the listening socket and setup the dispatcher
*/
struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
+ const char *dir,
struct server_id server_id,
- struct event_context *ev)
+ struct smb_iconv_convenience *iconv_convenience,
+ struct tevent_context *ev)
{
struct messaging_context *msg;
NTSTATUS status;
struct socket_address *path;
- char *dir;
+
+ if (ev == NULL) {
+ return NULL;
+ }
msg = talloc_zero(mem_ctx, struct messaging_context);
if (msg == NULL) {
return NULL;
}
- if (ev == NULL) {
- ev = event_context_init(msg);
- }
-
/* create the messaging directory if needed */
- dir = smbd_tmp_path(msg, "messaging");
mkdir(dir, 0700);
- talloc_free(dir);
- msg->base_path = smbd_tmp_path(msg, "messaging");
+ msg->base_path = talloc_reference(msg, dir);
msg->path = messaging_path(msg, server_id);
msg->server_id = server_id;
+ msg->iconv_convenience = iconv_convenience;
msg->idr = idr_init(msg);
msg->dispatch_tree = idr_init(msg);
msg->start_time = timeval_current();
}
/* it needs to be non blocking for sends */
- set_blocking(socket_get_fd(msg->sock), False);
+ set_blocking(socket_get_fd(msg->sock), false);
- msg->event.ev = talloc_reference(msg, ev);
+ msg->event.ev = ev;
msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock),
EVENT_FD_READ, messaging_handler, msg);
A hack, for the short term until we get 'client only' messaging in place
*/
struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx,
- struct event_context *ev)
+ const char *dir,
+ struct smb_iconv_convenience *iconv_convenience,
+ struct tevent_context *ev)
{
struct server_id id;
ZERO_STRUCT(id);
id.id = random() % 0x10000000;
- return messaging_init(mem_ctx, id, ev);
+ return messaging_init(mem_ctx, dir, id, iconv_convenience, ev);
}
/*
a list of registered irpc server functions
struct irpc_list {
struct irpc_list *next, *prev;
struct GUID uuid;
- const struct dcerpc_interface_table *table;
+ const struct ndr_interface_table *table;
int callnum;
irpc_function_t fn;
- void *private;
+ void *private_data;
};
register a irpc server function
*/
NTSTATUS irpc_register(struct messaging_context *msg_ctx,
- const struct dcerpc_interface_table *table,
- int callnum, irpc_function_t fn, void *private)
+ const struct ndr_interface_table *table,
+ int callnum, irpc_function_t fn, void *private_data)
{
struct irpc_list *irpc;
irpc->table = table;
irpc->callnum = callnum;
irpc->fn = fn;
- irpc->private = private;
+ irpc->private_data = private_data;
irpc->uuid = irpc->table->syntax_id.uuid;
return NT_STATUS_OK;
static void irpc_handler_reply(struct messaging_context *msg_ctx, struct irpc_message *m)
{
struct irpc_request *irpc;
+ enum ndr_err_code ndr_err;
- irpc = idr_find(msg_ctx->idr, m->header.callid);
+ irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid);
if (irpc == NULL) return;
/* parse the reply data */
- irpc->status = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r);
- if (NT_STATUS_IS_OK(irpc->status)) {
+ ndr_err = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r);
+ if (NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
irpc->status = m->header.status;
talloc_steal(irpc->mem_ctx, m);
} else {
+ irpc->status = ndr_map_error2ntstatus(ndr_err);
talloc_steal(irpc, m);
}
- irpc->done = True;
+ irpc->done = true;
if (irpc->async.fn) {
irpc->async.fn(irpc);
}
{
struct ndr_push *push;
DATA_BLOB packet;
+ enum ndr_err_code ndr_err;
m->header.status = status;
/* setup the reply */
- push = ndr_push_init_ctx(m->ndr);
+ push = ndr_push_init_ctx(m->ndr, m->msg_ctx->iconv_convenience);
if (push == NULL) {
status = NT_STATUS_NO_MEMORY;
goto failed;
m->header.flags |= IRPC_FLAG_REPLY;
/* construct the packet */
- status = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
- if (!NT_STATUS_IS_OK(status)) goto failed;
+ ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
+ if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+ status = ndr_map_error2ntstatus(ndr_err);
+ goto failed;
+ }
- status = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
- if (!NT_STATUS_IS_OK(status)) goto failed;
+ ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
+ if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+ status = ndr_map_error2ntstatus(ndr_err);
+ goto failed;
+ }
/* send the reply message */
packet = ndr_push_blob(push);
{
struct irpc_list *i;
void *r;
- NTSTATUS status;
+ enum ndr_err_code ndr_err;
for (i=msg_ctx->irpc; i; i=i->next) {
if (GUID_equal(&i->uuid, &m->header.uuid) &&
if (r == NULL) goto failed;
/* parse the request data */
- status = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
- if (!NT_STATUS_IS_OK(status)) goto failed;
+ ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
+ if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
/* make the call */
- m->private = i->private;
- m->defer_reply = False;
+ m->private_data= i->private_data;
+ m->defer_reply = false;
m->msg_ctx = msg_ctx;
m->irpc = i;
m->data = r;
/*
handle an incoming irpc message
*/
-static void irpc_handler(struct messaging_context *msg_ctx, void *private,
+static void irpc_handler(struct messaging_context *msg_ctx, void *private_data,
uint32_t msg_type, struct server_id src, DATA_BLOB *packet)
{
struct irpc_message *m;
- NTSTATUS status;
+ enum ndr_err_code ndr_err;
m = talloc(msg_ctx, struct irpc_message);
if (m == NULL) goto failed;
m->from = src;
- m->ndr = ndr_pull_init_blob(packet, m);
+ m->ndr = ndr_pull_init_blob(packet, m, msg_ctx->iconv_convenience);
if (m->ndr == NULL) goto failed;
m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
- status = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
- if (!NT_STATUS_IS_OK(status)) goto failed;
+ ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
+ if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
if (m->header.flags & IRPC_FLAG_REPLY) {
irpc_handler_reply(msg_ctx, m);
*/
static int irpc_destructor(struct irpc_request *irpc)
{
- idr_remove(irpc->msg_ctx->idr, irpc->callid);
+ if (irpc->callid != -1) {
+ idr_remove(irpc->msg_ctx->idr, irpc->callid);
+ irpc->callid = -1;
+ }
+
+ if (irpc->reject_free) {
+ return -1;
+ }
return 0;
}
/*
timeout a irpc request
*/
-static void irpc_timeout(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private)
+static void irpc_timeout(struct tevent_context *ev, struct tevent_timer *te,
+ struct timeval t, void *private_data)
{
- struct irpc_request *irpc = talloc_get_type(private, struct irpc_request);
+ struct irpc_request *irpc = talloc_get_type(private_data, struct irpc_request);
irpc->status = NT_STATUS_IO_TIMEOUT;
- irpc->done = True;
+ irpc->done = true;
if (irpc->async.fn) {
irpc->async.fn(irpc);
}
*/
struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
struct server_id server_id,
- const struct dcerpc_interface_table *table,
+ const struct ndr_interface_table *table,
int callnum, void *r, TALLOC_CTX *ctx)
{
struct irpc_header header;
NTSTATUS status;
DATA_BLOB packet;
struct irpc_request *irpc;
+ enum ndr_err_code ndr_err;
irpc = talloc(msg_ctx, struct irpc_request);
if (irpc == NULL) goto failed;
irpc->callid = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX);
if (irpc->callid == -1) goto failed;
irpc->r = r;
- irpc->done = False;
+ irpc->done = false;
irpc->async.fn = NULL;
irpc->mem_ctx = ctx;
+ irpc->reject_free = false;
talloc_set_destructor(irpc, irpc_destructor);
header.status = NT_STATUS_OK;
/* construct the irpc packet */
- ndr = ndr_push_init_ctx(irpc);
+ ndr = ndr_push_init_ctx(irpc, msg_ctx->iconv_convenience);
if (ndr == NULL) goto failed;
- status = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
- if (!NT_STATUS_IS_OK(status)) goto failed;
+ ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
+ if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
- status = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
- if (!NT_STATUS_IS_OK(status)) goto failed;
+ ndr_err = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
+ if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
/* and send it */
packet = ndr_push_blob(ndr);
NT_STATUS_HAVE_NO_MEMORY(irpc);
+ irpc->reject_free = true;
+
while (!irpc->done) {
if (event_loop_once(irpc->msg_ctx->event.ev) != 0) {
return NT_STATUS_CONNECTION_DISCONNECTED;
}
}
+
+ irpc->reject_free = false;
+
status = irpc->status;
talloc_free(irpc);
return status;
*/
NTSTATUS irpc_call(struct messaging_context *msg_ctx,
struct server_id server_id,
- const struct dcerpc_interface_table *table,
+ const struct ndr_interface_table *table,
int callnum, void *r,
TALLOC_CTX *mem_ctx)
{
/*
return a list of server ids for a server name
*/
-struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx,
+struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx,
+ TALLOC_CTX *mem_ctx,
const char *name)
{
struct tdb_wrap *t;
return NULL;
}
count = rec.dsize / sizeof(struct server_id);
- ret = talloc_array(msg_ctx, struct server_id, count+1);
+ ret = talloc_array(mem_ctx, struct server_id, count+1);
if (ret == NULL) {
tdb_unlock_bystring(t->tdb, name);
talloc_free(t);
for (i=0;i<count;i++) {
ret[i] = ((struct server_id *)rec.dptr)[i];
}
- ret[i] = cluster_id(0);
+ ret[i] = cluster_id(0, 0);
free(rec.dptr);
tdb_unlock_bystring(t->tdb, name);
talloc_free(t);
return;
}
rec = tdb_fetch_bystring(t->tdb, name);
+ if (rec.dptr == NULL) {
+ tdb_unlock_bystring(t->tdb, name);
+ talloc_free(t);
+ return;
+ }
count = rec.dsize / sizeof(struct server_id);
if (count == 0) {
+ free(rec.dptr);
tdb_unlock_bystring(t->tdb, name);
talloc_free(t);
return;
tdb_unlock_bystring(t->tdb, name);
talloc_free(t);
}
+
+struct server_id messaging_get_server_id(struct messaging_context *msg_ctx)
+{
+ return msg_ctx->server_id;
+}