Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net
[sfrench/cifs-2.6.git] / net / rds / send.c
index 44672befc0ee29a3e04ca01768c087fd0abd2f36..e9430f537f9c2bb23bbaeeb66933e1e85058bd34 100644 (file)
@@ -140,8 +140,11 @@ int rds_send_xmit(struct rds_connection *conn)
        struct scatterlist *sg;
        int ret = 0;
        LIST_HEAD(to_be_dropped);
+       int batch_count;
+       unsigned long send_gen = 0;
 
 restart:
+       batch_count = 0;
 
        /*
         * sendmsg calls here after having queued its message on the send
@@ -156,6 +159,17 @@ restart:
                goto out;
        }
 
+       /*
+        * we record the send generation after doing the xmit acquire.
+        * if someone else manages to jump in and do some work, we'll use
+        * this to avoid a goto restart farther down.
+        *
+        * The acquire_in_xmit() check above ensures that only one
+        * caller can increment c_send_gen at any time.
+        */
+       conn->c_send_gen++;
+       send_gen = conn->c_send_gen;
+
        /*
         * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
         * we do the opposite to avoid races.
@@ -202,6 +216,16 @@ restart:
                if (!rm) {
                        unsigned int len;
 
+                       batch_count++;
+
+                       /* we want to process as big a batch as we can, but
+                        * we also want to avoid softlockups.  If we've been
+                        * through a lot of messages, lets back off and see
+                        * if anyone else jumps in
+                        */
+                       if (batch_count >= 1024)
+                               goto over_batch;
+
                        spin_lock_irqsave(&conn->c_lock, flags);
 
                        if (!list_empty(&conn->c_send_queue)) {
@@ -357,9 +381,9 @@ restart:
                }
        }
 
+over_batch:
        if (conn->c_trans->xmit_complete)
                conn->c_trans->xmit_complete(conn);
-
        release_in_xmit(conn);
 
        /* Nuke any messages we decided not to retransmit. */
@@ -380,10 +404,15 @@ restart:
         * If the transport cannot continue (i.e ret != 0), then it must
         * call us when more room is available, such as from the tx
         * completion handler.
+        *
+        * We have an extra generation check here so that if someone manages
+        * to jump in after our release_in_xmit, we'll see that they have done
+        * some work and we will skip our goto
         */
        if (ret == 0) {
                smp_mb();
-               if (!list_empty(&conn->c_send_queue)) {
+               if (!list_empty(&conn->c_send_queue) &&
+                   send_gen == conn->c_send_gen) {
                        rds_stats_inc(s_send_lock_queue_raced);
                        goto restart;
                }