r23153: a first cut at a fix for the dgram flood problem that volker
authorAndrew Tridgell <tridge@samba.org>
Sat, 26 May 2007 08:47:27 +0000 (08:47 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 19:52:58 +0000 (14:52 -0500)
found. Not sure this is the best solution, but it should work.
(This used to be commit 80002cd12a64fa2679e48c58906cb9b26ad17e49)

source4/lib/messaging/messaging.c

index 58b5e5243e472a4d35dcb2a2bffeecb965b13f78..ab94a30acea82355971137a80e3bf7e703f438e7 100644 (file)
@@ -48,11 +48,12 @@ struct messaging_context {
        uint32_t num_types;
        struct idr_context *dispatch_tree;
        struct messaging_rec *pending;
+       struct messaging_rec *retry_queue;
        struct irpc_list *irpc;
        struct idr_context *idr;
        const char **names;
        struct timeval start_time;
-
+       struct timed_event *retry_te;
        struct {
                struct event_context *ev;
                struct fd_event *fde;
@@ -83,6 +84,7 @@ struct messaging_rec {
        } *header;
 
        DATA_BLOB packet;
+       uint32_t retries;
 };
 
 
@@ -168,6 +170,7 @@ static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB pac
        rec->path          = msg->path;
        rec->header        = (struct messaging_header *)packet.data;
        rec->packet        = packet;
+       rec->retries       = 0;
 
        if (packet.length != sizeof(*rec->header) + rec->header->length) {
                DEBUG(0,("messaging: bad message header size %d should be %d\n", 
@@ -210,6 +213,26 @@ static NTSTATUS try_send(struct messaging_rec *rec)
        return status;
 }
 
+/*
+  retry backed off messages
+*/
+static void msg_retry_timer(struct event_context *ev, struct timed_event *te, 
+                           struct timeval t, void *private)
+{
+       struct messaging_context *msg = talloc_get_type(private, 
+                                                       struct messaging_context);
+       msg->retry_te = NULL;
+
+       /* put the messages back on the main queue */
+       while (msg->retry_queue) {
+               struct messaging_rec *rec = msg->retry_queue;
+               DLIST_REMOVE(msg->retry_queue, rec);
+               DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);
+       }
+
+       EVENT_FD_WRITEABLE(msg->event.fde);     
+}
+
 /*
   handle a socket write event
 */
@@ -220,8 +243,23 @@ static void messaging_send_handler(struct messaging_context *msg)
                NTSTATUS status;
                status = try_send(rec);
                if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+                       rec->retries++;
+                       if (rec->retries > 3) {
+                               /* we're getting continuous write errors -
+                                  backoff this record */
+                               DLIST_REMOVE(msg->pending, rec);
+                               DLIST_ADD_END(msg->retry_queue, rec, 
+                                             struct messaging_rec *);
+                               if (msg->retry_te == NULL) {
+                                       msg->retry_te = 
+                                               event_add_timed(msg->event.ev, msg, 
+                                                               timeval_current_ofs(1, 0), 
+                                                               msg_retry_timer, msg);
+                               }
+                       }
                        break;
                }
+               rec->retries = 0;
                if (!NT_STATUS_IS_OK(status)) {
                        DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n", 
                                 rec->header->from.id, rec->header->to.id, rec->header->msg_type, 
@@ -281,6 +319,7 @@ static void messaging_recv_handler(struct messaging_context *msg)
        rec->path          = msg->path;
        rec->header        = (struct messaging_header *)packet.data;
        rec->packet        = packet;
+       rec->retries       = 0;
 
        if (msize != sizeof(*rec->header) + rec->header->length) {
                DEBUG(0,("messaging: bad message header size %d should be %d\n", 
@@ -415,6 +454,7 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
                return NT_STATUS_NO_MEMORY;
        }
 
+       rec->retries       = 0;
        rec->msg              = msg;
        rec->header           = (struct messaging_header *)rec->packet.data;
        rec->header->version  = MESSAGING_VERSION;