/* an individual message */
struct messaging_rec {
- struct messaging_rec *next, *prev;
-
struct messaging_state *msg;
struct socket_context *sock;
struct fd_event *fde;
*/
static void messaging_dispatch(struct messaging_state *msg, struct messaging_rec *rec)
{
- struct dispatch_fn *d;
- for (d=msg->dispatch;d;d=d->next) {
+ struct dispatch_fn *d, *next;
+ for (d=msg->dispatch;d;d=next) {
+ next = d->next;
if (d->msg_type == rec->header.msg_type) {
d->fn(msg, d->private, d->msg_type, rec->header.from, &rec->data);
}
/*
De-register the function for a particular message type.
*/
-void messaging_deregister(void *ctx, uint32_t msg_type)
+void messaging_deregister(void *ctx, uint32_t msg_type, void *private)
{
struct messaging_state *msg = ctx;
struct dispatch_fn *d, *next;
for (d = msg->dispatch; d; d = next) {
next = d->next;
- if (d->msg_type == msg_type) {
+ if (d->msg_type == msg_type &&
+ d->private == private) {
DLIST_REMOVE(msg->dispatch, d);
talloc_free(d);
}
}
+/*
+ when the servers listen queue is full we use this to backoff the message
+*/
+static void messaging_backoff_handler(struct event_context *ev, struct timed_event *te, time_t t)
+{
+ struct messaging_rec *rec = te->private;
+ struct messaging_state *msg = rec->msg;
+ NTSTATUS status;
+ struct fd_event fde;
+
+ status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
+ if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+ /* backoff again */
+ te->next_event = t+1;
+ return;
+ }
+
+ if (!NT_STATUS_IS_OK(status)) {
+ DEBUG(1,("messaging: Lost message from %u to %u of type %u after backoff - %s\n",
+ rec->header.from, rec->header.to, rec->header.msg_type, nt_errstr(status)));
+ talloc_free(rec);
+ return;
+ }
+
+ fde.private = rec;
+ fde.fd = socket_get_fd(rec->sock);
+ fde.flags = EVENT_FD_WRITE;
+ fde.handler = messaging_send_handler;
+
+ rec->fde = event_add_fd(msg->event.ev, &fde);
+
+ talloc_set_destructor(rec, rec_destructor);
+
+ messaging_send_handler(msg->event.ev, rec->fde, 0, EVENT_FD_WRITE);
+}
+
+
/*
Send a message to a particular server
*/
rec->path = messaging_path(rec, server);
status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
+ if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+ /* backoff on this message - the servers listen queue is full */
+ struct timed_event te;
+ te.next_event = time(NULL)+1;
+ te.handler = messaging_backoff_handler;
+ te.private = rec;
+ event_add_timed(msg->event.ev, &te);
+ return NT_STATUS_OK;
+ }
+
if (!NT_STATUS_IS_OK(status)) {
talloc_free(rec);
return status;
talloc_set_destructor(rec, rec_destructor);
+ messaging_send_handler(msg->event.ev, rec->fde, 0, EVENT_FD_WRITE);
+
return NT_STATUS_OK;
}
{
struct messaging_state *msg = msg_ctx;
event_remove_fd(msg->event.ev, msg->event.fde);
+ unlink(msg->path);
return 0;
}