rxrpc: Don't hold a ref for call timer or workqueue
[sfrench/cifs-2.6.git] / net / rxrpc / call_object.c
index 6401cdf7a62469dad7d2b5b4deb532dafa28634a..9cd7e0190ef4c35c764a852d727bcdf9879b54b8 100644 (file)
@@ -52,10 +52,8 @@ static void rxrpc_call_timer_expired(struct timer_list *t)
        _enter("%d", call->debug_id);
 
        if (call->state < RXRPC_CALL_COMPLETE) {
-               trace_rxrpc_timer(call, rxrpc_timer_expired, jiffies);
-               __rxrpc_queue_call(call);
-       } else {
-               rxrpc_put_call(call, rxrpc_call_put);
+               trace_rxrpc_timer_expired(call, jiffies);
+               rxrpc_queue_call(call, rxrpc_call_queue_timer);
        }
 }
 
@@ -64,21 +62,14 @@ void rxrpc_reduce_call_timer(struct rxrpc_call *call,
                             unsigned long now,
                             enum rxrpc_timer_trace why)
 {
-       if (rxrpc_try_get_call(call, rxrpc_call_got_timer)) {
-               trace_rxrpc_timer(call, why, now);
-               if (timer_reduce(&call->timer, expire_at))
-                       rxrpc_put_call(call, rxrpc_call_put_notimer);
-       }
-}
-
-void rxrpc_delete_call_timer(struct rxrpc_call *call)
-{
-       if (del_timer_sync(&call->timer))
-               rxrpc_put_call(call, rxrpc_call_put_timer);
+       trace_rxrpc_timer(call, why, now);
+       timer_reduce(&call->timer, expire_at);
 }
 
 static struct lock_class_key rxrpc_call_user_mutex_lock_class_key;
 
+static void rxrpc_destroy_call(struct work_struct *);
+
 /*
  * find an extant server call
  * - called in process context with IRQs enabled
@@ -110,7 +101,7 @@ struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx,
        return NULL;
 
 found_extant_call:
-       rxrpc_get_call(call, rxrpc_call_got);
+       rxrpc_get_call(call, rxrpc_call_get_sendmsg);
        read_unlock(&rx->call_lock);
        _leave(" = %p [%d]", call, refcount_read(&call->ref));
        return call;
@@ -129,16 +120,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
        if (!call)
                return NULL;
 
-       call->rxtx_buffer = kcalloc(RXRPC_RXTX_BUFF_SIZE,
-                                   sizeof(struct sk_buff *),
-                                   gfp);
-       if (!call->rxtx_buffer)
-               goto nomem;
-
-       call->rxtx_annotations = kcalloc(RXRPC_RXTX_BUFF_SIZE, sizeof(u8), gfp);
-       if (!call->rxtx_annotations)
-               goto nomem_2;
-
        mutex_init(&call->user_mutex);
 
        /* Prevent lockdep reporting a deadlock false positive between the afs
@@ -149,43 +130,46 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
                                  &rxrpc_call_user_mutex_lock_class_key);
 
        timer_setup(&call->timer, rxrpc_call_timer_expired, 0);
-       INIT_WORK(&call->processor, &rxrpc_process_call);
+       INIT_WORK(&call->processor, rxrpc_process_call);
+       INIT_WORK(&call->destroyer, rxrpc_destroy_call);
        INIT_LIST_HEAD(&call->link);
        INIT_LIST_HEAD(&call->chan_wait_link);
        INIT_LIST_HEAD(&call->accept_link);
        INIT_LIST_HEAD(&call->recvmsg_link);
        INIT_LIST_HEAD(&call->sock_link);
+       INIT_LIST_HEAD(&call->tx_buffer);
+       skb_queue_head_init(&call->recvmsg_queue);
+       skb_queue_head_init(&call->rx_oos_queue);
        init_waitqueue_head(&call->waitq);
-       spin_lock_init(&call->lock);
        spin_lock_init(&call->notify_lock);
+       spin_lock_init(&call->tx_lock);
        spin_lock_init(&call->input_lock);
+       spin_lock_init(&call->acks_ack_lock);
        rwlock_init(&call->state_lock);
        refcount_set(&call->ref, 1);
        call->debug_id = debug_id;
        call->tx_total_len = -1;
        call->next_rx_timo = 20 * HZ;
        call->next_req_timo = 1 * HZ;
+       atomic64_set(&call->ackr_window, 0x100000001ULL);
 
        memset(&call->sock_node, 0xed, sizeof(call->sock_node));
 
-       /* Leave space in the ring to handle a maxed-out jumbo packet */
        call->rx_winsize = rxrpc_rx_window_size;
        call->tx_winsize = 16;
-       call->rx_expect_next = 1;
 
-       call->cong_cwnd = 2;
-       call->cong_ssthresh = RXRPC_RXTX_BUFF_SIZE - 1;
+       if (RXRPC_TX_SMSS > 2190)
+               call->cong_cwnd = 2;
+       else if (RXRPC_TX_SMSS > 1095)
+               call->cong_cwnd = 3;
+       else
+               call->cong_cwnd = 4;
+       call->cong_ssthresh = RXRPC_TX_MAX_WINDOW;
 
        call->rxnet = rxnet;
        call->rtt_avail = RXRPC_CALL_RTT_AVAIL_MASK;
        atomic_inc(&rxnet->nr_calls);
        return call;
-
-nomem_2:
-       kfree(call->rxtx_buffer);
-nomem:
-       kmem_cache_free(rxrpc_call_jar, call);
-       return NULL;
 }
 
 /*
@@ -206,7 +190,6 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
                return ERR_PTR(-ENOMEM);
        call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
        call->service_id = srx->srx_service;
-       call->tx_phase = true;
        now = ktime_get_real();
        call->acks_latest_ts = now;
        call->cong_tstamp = now;
@@ -223,7 +206,7 @@ static void rxrpc_start_call_timer(struct rxrpc_call *call)
        unsigned long now = jiffies;
        unsigned long j = now + MAX_JIFFY_OFFSET;
 
-       call->ack_at = j;
+       call->delay_ack_at = j;
        call->ack_lost_at = j;
        call->resend_at = j;
        call->ping_at = j;
@@ -279,7 +262,6 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
        struct rxrpc_net *rxnet;
        struct semaphore *limiter;
        struct rb_node *parent, **pp;
-       const void *here = __builtin_return_address(0);
        int ret;
 
        _enter("%p,%lx", rx, p->user_call_ID);
@@ -300,9 +282,8 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 
        call->interruptibility = p->interruptibility;
        call->tx_total_len = p->tx_total_len;
-       trace_rxrpc_call(call->debug_id, rxrpc_call_new_client,
-                        refcount_read(&call->ref),
-                        here, (const void *)p->user_call_ID);
+       trace_rxrpc_call(call->debug_id, refcount_read(&call->ref),
+                        p->user_call_ID, rxrpc_call_new_client);
        if (p->kernel)
                __set_bit(RXRPC_CALL_KERNEL, &call->flags);
 
@@ -331,7 +312,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
        rcu_assign_pointer(call->socket, rx);
        call->user_call_ID = p->user_call_ID;
        __set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
-       rxrpc_get_call(call, rxrpc_call_got_userid);
+       rxrpc_get_call(call, rxrpc_call_get_userid);
        rb_link_node(&call->sock_node, parent, pp);
        rb_insert_color(&call->sock_node, &rx->calls);
        list_add(&call->sock_link, &rx->sock_calls);
@@ -353,13 +334,10 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
        if (ret < 0)
                goto error_attached_to_socket;
 
-       trace_rxrpc_call(call->debug_id, rxrpc_call_connected,
-                        refcount_read(&call->ref), here, NULL);
+       rxrpc_see_call(call, rxrpc_call_see_connected);
 
        rxrpc_start_call_timer(call);
 
-       _net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
-
        _leave(" = %p [new]", call);
        return call;
 
@@ -373,11 +351,11 @@ error_dup_user_ID:
        release_sock(&rx->sk);
        __rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
                                    RX_CALL_DEAD, -EEXIST);
-       trace_rxrpc_call(call->debug_id, rxrpc_call_error,
-                        refcount_read(&call->ref), here, ERR_PTR(-EEXIST));
+       trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0,
+                        rxrpc_call_see_userid_exists);
        rxrpc_release_call(rx, call);
        mutex_unlock(&call->user_mutex);
-       rxrpc_put_call(call, rxrpc_call_put);
+       rxrpc_put_call(call, rxrpc_call_put_userid_exists);
        _leave(" = -EEXIST");
        return ERR_PTR(-EEXIST);
 
@@ -387,8 +365,8 @@ error_dup_user_ID:
         * leave the error to recvmsg() to deal with.
         */
 error_attached_to_socket:
-       trace_rxrpc_call(call->debug_id, rxrpc_call_error,
-                        refcount_read(&call->ref), here, ERR_PTR(ret));
+       trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), ret,
+                        rxrpc_call_see_connect_failed);
        set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
        __rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
                                    RX_CALL_DEAD, ret);
@@ -428,98 +406,63 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
        conn->channels[chan].call_id = call->call_id;
        rcu_assign_pointer(conn->channels[chan].call, call);
 
-       spin_lock(&conn->params.peer->lock);
-       hlist_add_head_rcu(&call->error_link, &conn->params.peer->error_targets);
-       spin_unlock(&conn->params.peer->lock);
-
-       _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
+       spin_lock(&conn->peer->lock);
+       hlist_add_head_rcu(&call->error_link, &conn->peer->error_targets);
+       spin_unlock(&conn->peer->lock);
 
        rxrpc_start_call_timer(call);
        _leave("");
 }
 
 /*
- * Queue a call's work processor, getting a ref to pass to the work queue.
- */
-bool rxrpc_queue_call(struct rxrpc_call *call)
-{
-       const void *here = __builtin_return_address(0);
-       int n;
-
-       if (!__refcount_inc_not_zero(&call->ref, &n))
-               return false;
-       if (rxrpc_queue_work(&call->processor))
-               trace_rxrpc_call(call->debug_id, rxrpc_call_queued, n + 1,
-                                here, NULL);
-       else
-               rxrpc_put_call(call, rxrpc_call_put_noqueue);
-       return true;
-}
-
-/*
- * Queue a call's work processor, passing the callers ref to the work queue.
+ * Queue a call's work processor.
  */
-bool __rxrpc_queue_call(struct rxrpc_call *call)
+void rxrpc_queue_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
 {
-       const void *here = __builtin_return_address(0);
-       int n = refcount_read(&call->ref);
-       ASSERTCMP(n, >=, 1);
        if (rxrpc_queue_work(&call->processor))
-               trace_rxrpc_call(call->debug_id, rxrpc_call_queued_ref, n,
-                                here, NULL);
-       else
-               rxrpc_put_call(call, rxrpc_call_put_noqueue);
-       return true;
+               trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0, why);
 }
 
 /*
  * Note the re-emergence of a call.
  */
-void rxrpc_see_call(struct rxrpc_call *call)
+void rxrpc_see_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
 {
-       const void *here = __builtin_return_address(0);
        if (call) {
-               int n = refcount_read(&call->ref);
+               int r = refcount_read(&call->ref);
 
-               trace_rxrpc_call(call->debug_id, rxrpc_call_seen, n,
-                                here, NULL);
+               trace_rxrpc_call(call->debug_id, r, 0, why);
        }
 }
 
-bool rxrpc_try_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
+bool rxrpc_try_get_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
 {
-       const void *here = __builtin_return_address(0);
-       int n;
+       int r;
 
-       if (!__refcount_inc_not_zero(&call->ref, &n))
+       if (!__refcount_inc_not_zero(&call->ref, &r))
                return false;
-       trace_rxrpc_call(call->debug_id, op, n + 1, here, NULL);
+       trace_rxrpc_call(call->debug_id, r + 1, 0, why);
        return true;
 }
 
 /*
  * Note the addition of a ref on a call.
  */
-void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
+void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
 {
-       const void *here = __builtin_return_address(0);
-       int n;
+       int r;
 
-       __refcount_inc(&call->ref, &n);
-       trace_rxrpc_call(call->debug_id, op, n + 1, here, NULL);
+       __refcount_inc(&call->ref, &r);
+       trace_rxrpc_call(call->debug_id, r + 1, 0, why);
 }
 
 /*
- * Clean up the RxTx skb ring.
+ * Clean up the Rx skb ring.
  */
 static void rxrpc_cleanup_ring(struct rxrpc_call *call)
 {
-       int i;
-
-       for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++) {
-               rxrpc_free_skb(call->rxtx_buffer[i], rxrpc_skb_cleaned);
-               call->rxtx_buffer[i] = NULL;
-       }
+       skb_queue_purge(&call->recvmsg_queue);
+       skb_queue_purge(&call->rx_oos_queue);
 }
 
 /*
@@ -527,25 +470,21 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call)
  */
 void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 {
-       const void *here = __builtin_return_address(0);
        struct rxrpc_connection *conn = call->conn;
        bool put = false;
 
        _enter("{%d,%d}", call->debug_id, refcount_read(&call->ref));
 
-       trace_rxrpc_call(call->debug_id, rxrpc_call_release,
-                        refcount_read(&call->ref),
-                        here, (const void *)call->flags);
+       trace_rxrpc_call(call->debug_id, refcount_read(&call->ref),
+                        call->flags, rxrpc_call_see_release);
 
        ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
 
-       spin_lock_bh(&call->lock);
        if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
                BUG();
-       spin_unlock_bh(&call->lock);
 
        rxrpc_put_call_slot(call);
-       rxrpc_delete_call_timer(call);
+       del_timer_sync(&call->timer);
 
        /* Make sure we don't get any more notifications */
        write_lock_bh(&rx->recvmsg_lock);
@@ -563,14 +502,14 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 
        write_unlock_bh(&rx->recvmsg_lock);
        if (put)
-               rxrpc_put_call(call, rxrpc_call_put);
+               rxrpc_put_call(call, rxrpc_call_put_unnotify);
 
        write_lock(&rx->call_lock);
 
        if (test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
                rb_erase(&call->sock_node, &rx->calls);
                memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
-               rxrpc_put_call(call, rxrpc_call_put_userid);
+               rxrpc_put_call(call, rxrpc_call_put_userid_exists);
        }
 
        list_del(&call->sock_link);
@@ -599,17 +538,17 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
                                  struct rxrpc_call, accept_link);
                list_del(&call->accept_link);
                rxrpc_abort_call("SKR", call, 0, RX_CALL_DEAD, -ECONNRESET);
-               rxrpc_put_call(call, rxrpc_call_put);
+               rxrpc_put_call(call, rxrpc_call_put_release_sock_tba);
        }
 
        while (!list_empty(&rx->sock_calls)) {
                call = list_entry(rx->sock_calls.next,
                                  struct rxrpc_call, sock_link);
-               rxrpc_get_call(call, rxrpc_call_got);
+               rxrpc_get_call(call, rxrpc_call_get_release_sock);
                rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, -ECONNRESET);
                rxrpc_send_abort_packet(call);
                rxrpc_release_call(rx, call);
-               rxrpc_put_call(call, rxrpc_call_put);
+               rxrpc_put_call(call, rxrpc_call_put_release_sock);
        }
 
        _leave("");
@@ -618,20 +557,18 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
 /*
  * release a call
  */
-void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
+void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
 {
        struct rxrpc_net *rxnet = call->rxnet;
-       const void *here = __builtin_return_address(0);
        unsigned int debug_id = call->debug_id;
        bool dead;
-       int n;
+       int r;
 
        ASSERT(call != NULL);
 
-       dead = __refcount_dec_and_test(&call->ref, &n);
-       trace_rxrpc_call(debug_id, op, n, here, NULL);
+       dead = __refcount_dec_and_test(&call->ref, &r);
+       trace_rxrpc_call(debug_id, r - 1, 0, why);
        if (dead) {
-               _debug("call %d dead", call->debug_id);
                ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
 
                if (!list_empty(&call->link)) {
@@ -645,38 +582,41 @@ void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
 }
 
 /*
- * Final call destruction - but must be done in process context.
+ * Free up the call under RCU.
  */
-static void rxrpc_destroy_call(struct work_struct *work)
+static void rxrpc_rcu_free_call(struct rcu_head *rcu)
 {
-       struct rxrpc_call *call = container_of(work, struct rxrpc_call, processor);
-       struct rxrpc_net *rxnet = call->rxnet;
-
-       rxrpc_delete_call_timer(call);
+       struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
+       struct rxrpc_net *rxnet = READ_ONCE(call->rxnet);
 
-       rxrpc_put_connection(call->conn);
-       rxrpc_put_peer(call->peer);
-       kfree(call->rxtx_buffer);
-       kfree(call->rxtx_annotations);
        kmem_cache_free(rxrpc_call_jar, call);
        if (atomic_dec_and_test(&rxnet->nr_calls))
                wake_up_var(&rxnet->nr_calls);
 }
 
 /*
- * Final call destruction under RCU.
+ * Final call destruction - but must be done in process context.
  */
-static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
+static void rxrpc_destroy_call(struct work_struct *work)
 {
-       struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
+       struct rxrpc_call *call = container_of(work, struct rxrpc_call, destroyer);
+       struct rxrpc_txbuf *txb;
 
-       if (in_softirq()) {
-               INIT_WORK(&call->processor, rxrpc_destroy_call);
-               if (!rxrpc_queue_work(&call->processor))
-                       BUG();
-       } else {
-               rxrpc_destroy_call(&call->processor);
+       del_timer_sync(&call->timer);
+       cancel_work_sync(&call->processor); /* The processor may restart the timer */
+       del_timer_sync(&call->timer);
+
+       rxrpc_cleanup_ring(call);
+       while ((txb = list_first_entry_or_null(&call->tx_buffer,
+                                              struct rxrpc_txbuf, call_link))) {
+               list_del(&txb->call_link);
+               rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
        }
+       rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
+       rxrpc_free_skb(call->acks_soft_tbl, rxrpc_skb_put_ack);
+       rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
+       rxrpc_put_peer(call->peer, rxrpc_peer_put_call);
+       call_rcu(&call->rcu, rxrpc_rcu_free_call);
 }
 
 /*
@@ -684,17 +624,21 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
  */
 void rxrpc_cleanup_call(struct rxrpc_call *call)
 {
-       _net("DESTROY CALL %d", call->debug_id);
-
        memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
 
        ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
        ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
 
-       rxrpc_cleanup_ring(call);
-       rxrpc_free_skb(call->tx_pending, rxrpc_skb_cleaned);
+       del_timer_sync(&call->timer);
+       cancel_work(&call->processor);
 
-       call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
+       if (in_softirq() || work_busy(&call->processor))
+               /* Can't use the rxrpc workqueue as we need to cancel/flush
+                * something that may be running/waiting there.
+                */
+               schedule_work(&call->destroyer);
+       else
+               rxrpc_destroy_call(&call->destroyer);
 }
 
 /*
@@ -716,7 +660,7 @@ void rxrpc_destroy_all_calls(struct rxrpc_net *rxnet)
                                          struct rxrpc_call, link);
                        _debug("Zapping call %p", call);
 
-                       rxrpc_see_call(call);
+                       rxrpc_see_call(call, rxrpc_call_see_zap);
                        list_del_init(&call->link);
 
                        pr_err("Call %p still in use (%d,%s,%lx,%lx)!\n",