r3183: moved the unlink of the messaging unixdom socket to the messaging destructor
[bbaumbach/samba-autobuild/.git] / source4 / lib / messaging / messaging.c
index 7e87ea45df028936511fff8fd5fa65c4beb0920e..13c1a049f8470afb3923829ce0a1e435433a39d6 100644 (file)
@@ -49,8 +49,6 @@ struct dispatch_fn {
 
 /* an individual message */
 struct messaging_rec {
-       struct messaging_rec *next, *prev;
-
        struct messaging_state *msg;
        struct socket_context *sock;
        struct fd_event *fde;
@@ -97,8 +95,9 @@ static char *messaging_path(TALLOC_CTX *mem_ctx, servid_t server_id)
 */
 static void messaging_dispatch(struct messaging_state *msg, struct messaging_rec *rec)
 {
-       struct dispatch_fn *d;
-       for (d=msg->dispatch;d;d=d->next) {
+       struct dispatch_fn *d, *next;
+       for (d=msg->dispatch;d;d=next) {
+               next = d->next;
                if (d->msg_type == rec->header.msg_type) {
                        d->fn(msg, d->private, d->msg_type, rec->header.from, &rec->data);
                }
@@ -252,14 +251,15 @@ void messaging_register(void *ctx, void *private,
 /*
   De-register the function for a particular message type.
 */
-void messaging_deregister(void *ctx, uint32_t msg_type)
+void messaging_deregister(void *ctx, uint32_t msg_type, void *private)
 {
        struct messaging_state *msg = ctx;
        struct dispatch_fn *d, *next;
 
        for (d = msg->dispatch; d; d = next) {
                next = d->next;
-               if (d->msg_type == msg_type) {
+               if (d->msg_type == msg_type && 
+                   d->private == private) {
                        DLIST_REMOVE(msg->dispatch, d);
                        talloc_free(d);
                }
@@ -323,6 +323,43 @@ static void messaging_send_handler(struct event_context *ev, struct fd_event *fd
 }
 
 
+/*
+  when the servers listen queue is full we use this to backoff the message
+*/
+static void messaging_backoff_handler(struct event_context *ev, struct timed_event *te, time_t t)
+{
+       struct messaging_rec *rec = te->private;
+       struct messaging_state *msg = rec->msg;
+       NTSTATUS status;
+       struct fd_event fde;
+
+       status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
+       if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+               /* backoff again */
+               te->next_event = t+1;
+               return;
+       }
+
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(1,("messaging: Lost message from %u to %u of type %u after backoff - %s\n", 
+                        rec->header.from, rec->header.to, rec->header.msg_type, nt_errstr(status)));
+               talloc_free(rec);
+               return;
+       }
+
+       fde.private     = rec;
+       fde.fd          = socket_get_fd(rec->sock);
+       fde.flags       = EVENT_FD_WRITE;
+       fde.handler     = messaging_send_handler;
+
+       rec->fde        = event_add_fd(msg->event.ev, &fde);
+
+       talloc_set_destructor(rec, rec_destructor);
+
+       messaging_send_handler(msg->event.ev, rec->fde, 0, EVENT_FD_WRITE);
+}
+
+
 /*
   Send a message to a particular server
 */
@@ -361,6 +398,16 @@ NTSTATUS messaging_send(void *msg_ctx, servid_t server, uint32_t msg_type, DATA_
        rec->path = messaging_path(rec, server);
 
        status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
+       if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+               /* backoff on this message - the servers listen queue is full */
+               struct timed_event te;
+               te.next_event = time(NULL)+1;
+               te.handler = messaging_backoff_handler;
+               te.private = rec;
+               event_add_timed(msg->event.ev, &te);
+               return NT_STATUS_OK;
+       }
+
        if (!NT_STATUS_IS_OK(status)) {
                talloc_free(rec);
                return status;
@@ -375,6 +422,8 @@ NTSTATUS messaging_send(void *msg_ctx, servid_t server, uint32_t msg_type, DATA_
 
        talloc_set_destructor(rec, rec_destructor);
 
+       messaging_send_handler(msg->event.ev, rec->fde, 0, EVENT_FD_WRITE);
+
        return NT_STATUS_OK;
 }
 
@@ -386,6 +435,7 @@ static int messaging_destructor(void *msg_ctx)
 {
        struct messaging_state *msg = msg_ctx;
        event_remove_fd(msg->event.ev, msg->event.fde);
+       unlink(msg->path);
        return 0;
 }