net/smc: add SMC-D support in data transfer
authorHans Wippel <hwippel@linux.ibm.com>
Thu, 28 Jun 2018 17:05:10 +0000 (19:05 +0200)
committerDavid S. Miller <davem@davemloft.net>
Sat, 30 Jun 2018 11:42:26 +0000 (20:42 +0900)
The data transfer and CDC message headers differ in SMC-R and SMC-D.
This patch adds support for the SMC-D data transfer to the existing SMC
code. It consists of the following:

* SMC-D CDC support
* SMC-D tx support
* SMC-D rx support

The CDC header is stored at the beginning of the receive buffer. Thus, a
rx_offset variable is added for the CDC header offset within the buffer
(0 for SMC-R).

Signed-off-by: Hans Wippel <hwippel@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com>
Suggested-by: Thomas Richter <tmricht@linux.ibm.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/smc/smc.h
net/smc/smc_cdc.c
net/smc/smc_cdc.h
net/smc/smc_core.c
net/smc/smc_ism.c
net/smc/smc_rx.c
net/smc/smc_tx.c
net/smc/smc_tx.h

index 7c86f716a92ea13e464136746b9fa5d2f3dbeab3..8c6231011779ce6208e729ca16fe39118d34f62a 100644 (file)
@@ -183,6 +183,11 @@ struct smc_connection {
        spinlock_t              acurs_lock;     /* protect cursors */
 #endif
        struct work_struct      close_work;     /* peer sent some closing */
+       struct tasklet_struct   rx_tsklet;      /* Receiver tasklet for SMC-D */
+       u8                      rx_off;         /* receive offset:
+                                                * 0 for SMC-R, 32 for SMC-D
+                                                */
+       u64                     peer_token;     /* SMC-D token of peer */
 };
 
 struct smc_sock {                              /* smc sock container */
index a7e8d63fc8aebe61c094c232d4b6f31439eed3e2..621d8cca570ba38568295fdaaef0380830ab0990 100644 (file)
@@ -117,7 +117,7 @@ int smc_cdc_msg_send(struct smc_connection *conn,
        return rc;
 }
 
-int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
+static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn)
 {
        struct smc_cdc_tx_pend *pend;
        struct smc_wr_buf *wr_buf;
@@ -130,6 +130,21 @@ int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
        return smc_cdc_msg_send(conn, wr_buf, pend);
 }
 
+int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
+{
+       int rc;
+
+       if (conn->lgr->is_smcd) {
+               spin_lock_bh(&conn->send_lock);
+               rc = smcd_cdc_msg_send(conn);
+               spin_unlock_bh(&conn->send_lock);
+       } else {
+               rc = smcr_cdc_get_slot_and_msg_send(conn);
+       }
+
+       return rc;
+}
+
 static bool smc_cdc_tx_filter(struct smc_wr_tx_pend_priv *tx_pend,
                              unsigned long data)
 {
@@ -157,6 +172,45 @@ void smc_cdc_tx_dismiss_slots(struct smc_connection *conn)
                                (unsigned long)conn);
 }
 
+/* Send a SMC-D CDC header.
+ * This increments the free space available in our send buffer.
+ * Also update the confirmed receive buffer with what was sent to the peer.
+ */
+int smcd_cdc_msg_send(struct smc_connection *conn)
+{
+       struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
+       struct smcd_cdc_msg cdc;
+       int rc, diff;
+
+       memset(&cdc, 0, sizeof(cdc));
+       cdc.common.type = SMC_CDC_MSG_TYPE;
+       cdc.prod_wrap = conn->local_tx_ctrl.prod.wrap;
+       cdc.prod_count = conn->local_tx_ctrl.prod.count;
+
+       cdc.cons_wrap = conn->local_tx_ctrl.cons.wrap;
+       cdc.cons_count = conn->local_tx_ctrl.cons.count;
+       cdc.prod_flags = conn->local_tx_ctrl.prod_flags;
+       cdc.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
+       rc = smcd_tx_ism_write(conn, &cdc, sizeof(cdc), 0, 1);
+       if (rc)
+               return rc;
+       smc_curs_write(&conn->rx_curs_confirmed,
+                      smc_curs_read(&conn->local_tx_ctrl.cons, conn), conn);
+       /* Calculate transmitted data and increment free send buffer space */
+       diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin,
+                            &conn->tx_curs_sent);
+       /* increased by confirmed number of bytes */
+       smp_mb__before_atomic();
+       atomic_add(diff, &conn->sndbuf_space);
+       /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
+       smp_mb__after_atomic();
+       smc_curs_write(&conn->tx_curs_fin,
+                      smc_curs_read(&conn->tx_curs_sent, conn), conn);
+
+       smc_tx_sndbuf_nonfull(smc);
+       return rc;
+}
+
 /********************************* receive ***********************************/
 
 static inline bool smc_cdc_before(u16 seq1, u16 seq2)
@@ -178,7 +232,7 @@ static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc,
        if (!sock_flag(&smc->sk, SOCK_URGINLINE))
                /* we'll skip the urgent byte, so don't account for it */
                (*diff_prod)--;
-       base = (char *)conn->rmb_desc->cpu_addr;
+       base = (char *)conn->rmb_desc->cpu_addr + conn->rx_off;
        if (conn->urg_curs.count)
                conn->urg_rx_byte = *(base + conn->urg_curs.count - 1);
        else
@@ -276,6 +330,34 @@ static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc)
        sock_put(&smc->sk); /* no free sk in softirq-context */
 }
 
+/* Schedule a tasklet for this connection. Triggered from the ISM device IRQ
+ * handler to indicate update in the DMBE.
+ *
+ * Context:
+ * - tasklet context
+ */
+static void smcd_cdc_rx_tsklet(unsigned long data)
+{
+       struct smc_connection *conn = (struct smc_connection *)data;
+       struct smcd_cdc_msg cdc;
+       struct smc_sock *smc;
+
+       if (!conn)
+               return;
+
+       memcpy(&cdc, conn->rmb_desc->cpu_addr, sizeof(cdc));
+       smc = container_of(conn, struct smc_sock, conn);
+       smc_cdc_msg_recv(smc, (struct smc_cdc_msg *)&cdc);
+}
+
+/* Initialize receive tasklet. Called from ISM device IRQ handler to start
+ * receiver side.
+ */
+void smcd_cdc_rx_init(struct smc_connection *conn)
+{
+       tasklet_init(&conn->rx_tsklet, smcd_cdc_rx_tsklet, (unsigned long)conn);
+}
+
 /***************************** init, exit, misc ******************************/
 
 static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf)
index f60082fee5b8750c92a6c5f9dfa69d9a2d151ce6..8fbce4fee3e4a106244d6de7f71c7d4b7ec330f5 100644 (file)
@@ -50,6 +50,20 @@ struct smc_cdc_msg {
        u8                              reserved[18];
 } __packed;                                    /* format defined in RFC7609 */
 
+/* CDC message for SMC-D */
+struct smcd_cdc_msg {
+       struct smc_wr_rx_hdr common;    /* Type = 0xFE */
+       u8 res1[7];
+       u16 prod_wrap;
+       u32 prod_count;
+       u8 res2[2];
+       u16 cons_wrap;
+       u32 cons_count;
+       struct smc_cdc_producer_flags   prod_flags;
+       struct smc_cdc_conn_state_flags conn_state_flags;
+       u8 res3[8];
+} __packed;
+
 static inline bool smc_cdc_rxed_any_close(struct smc_connection *conn)
 {
        return conn->local_rx_ctrl.conn_state_flags.peer_conn_abort ||
@@ -204,9 +218,9 @@ static inline void smc_cdc_cursor_to_host(union smc_host_cursor *local,
        smc_curs_write(local, smc_curs_read(&temp, conn), conn);
 }
 
-static inline void smc_cdc_msg_to_host(struct smc_host_cdc_msg *local,
-                                      struct smc_cdc_msg *peer,
-                                      struct smc_connection *conn)
+static inline void smcr_cdc_msg_to_host(struct smc_host_cdc_msg *local,
+                                       struct smc_cdc_msg *peer,
+                                       struct smc_connection *conn)
 {
        local->common.type = peer->common.type;
        local->len = peer->len;
@@ -218,6 +232,27 @@ static inline void smc_cdc_msg_to_host(struct smc_host_cdc_msg *local,
        local->conn_state_flags = peer->conn_state_flags;
 }
 
+static inline void smcd_cdc_msg_to_host(struct smc_host_cdc_msg *local,
+                                       struct smcd_cdc_msg *peer)
+{
+       local->prod.wrap = peer->prod_wrap;
+       local->prod.count = peer->prod_count;
+       local->cons.wrap = peer->cons_wrap;
+       local->cons.count = peer->cons_count;
+       local->prod_flags = peer->prod_flags;
+       local->conn_state_flags = peer->conn_state_flags;
+}
+
+static inline void smc_cdc_msg_to_host(struct smc_host_cdc_msg *local,
+                                      struct smc_cdc_msg *peer,
+                                      struct smc_connection *conn)
+{
+       if (conn->lgr->is_smcd)
+               smcd_cdc_msg_to_host(local, (struct smcd_cdc_msg *)peer);
+       else
+               smcr_cdc_msg_to_host(local, peer, conn);
+}
+
 struct smc_cdc_tx_pend;
 
 int smc_cdc_get_free_slot(struct smc_connection *conn,
@@ -227,6 +262,8 @@ void smc_cdc_tx_dismiss_slots(struct smc_connection *conn);
 int smc_cdc_msg_send(struct smc_connection *conn, struct smc_wr_buf *wr_buf,
                     struct smc_cdc_tx_pend *pend);
 int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn);
+int smcd_cdc_msg_send(struct smc_connection *conn);
 int smc_cdc_init(void) __init;
+void smcd_cdc_rx_init(struct smc_connection *conn);
 
 #endif /* SMC_CDC_H */
index daa88db1841ae55676b774f1b94e756485d44909..434c028162a4c33b9f9dfcc594f0b8f4a3c4d30d 100644 (file)
@@ -281,10 +281,12 @@ void smc_conn_free(struct smc_connection *conn)
 {
        if (!conn->lgr)
                return;
-       if (conn->lgr->is_smcd)
+       if (conn->lgr->is_smcd) {
                smc_ism_unset_conn(conn);
-       else
+               tasklet_kill(&conn->rx_tsklet);
+       } else {
                smc_cdc_tx_dismiss_slots(conn);
+       }
        smc_lgr_unregister_conn(conn);
        smc_buf_unuse(conn);
 }
@@ -324,10 +326,13 @@ static void smcr_buf_free(struct smc_link_group *lgr, bool is_rmb,
 static void smcd_buf_free(struct smc_link_group *lgr, bool is_dmb,
                          struct smc_buf_desc *buf_desc)
 {
-       if (is_dmb)
+       if (is_dmb) {
+               /* restore original buf len */
+               buf_desc->len += sizeof(struct smcd_cdc_msg);
                smc_ism_unregister_dmb(lgr->smcd, buf_desc);
-       else
+       } else {
                kfree(buf_desc->cpu_addr);
+       }
        kfree(buf_desc);
 }
 
@@ -632,6 +637,10 @@ create:
        conn->local_tx_ctrl.common.type = SMC_CDC_MSG_TYPE;
        conn->local_tx_ctrl.len = SMC_WR_TX_SIZE;
        conn->urg_state = SMC_URG_READ;
+       if (is_smcd) {
+               conn->rx_off = sizeof(struct smcd_cdc_msg);
+               smcd_cdc_rx_init(conn); /* init tasklet for this conn */
+       }
 #ifndef KERNEL_HAS_ATOMIC64
        spin_lock_init(&conn->acurs_lock);
 #endif
@@ -776,8 +785,9 @@ static struct smc_buf_desc *smcd_new_buf_create(struct smc_link_group *lgr,
                        kfree(buf_desc);
                        return ERR_PTR(-EAGAIN);
                }
-               memset(buf_desc->cpu_addr, 0, bufsize);
-               buf_desc->len = bufsize;
+               buf_desc->pages = virt_to_page(buf_desc->cpu_addr);
+               /* CDC header stored in buf. So, pretend it was smaller */
+               buf_desc->len = bufsize - sizeof(struct smcd_cdc_msg);
        } else {
                buf_desc->cpu_addr = kzalloc(bufsize, GFP_KERNEL |
                                             __GFP_NOWARN | __GFP_NORETRY |
@@ -854,7 +864,8 @@ static int __smc_buf_create(struct smc_sock *smc, bool is_smcd, bool is_rmb)
                conn->rmbe_size_short = bufsize_short;
                smc->sk.sk_rcvbuf = bufsize * 2;
                atomic_set(&conn->bytes_to_rcv, 0);
-               conn->rmbe_update_limit = smc_rmb_wnd_update_limit(bufsize);
+               conn->rmbe_update_limit =
+                       smc_rmb_wnd_update_limit(buf_desc->len);
                if (is_smcd)
                        smc_ism_set_conn(conn); /* map RMB/smcd_dev to conn */
        } else {
index f44e4dff244ae168de344da660300f968254164c..cfade7fdcc6d3115d18e9da7b0bfd51b247ed7f7 100644 (file)
@@ -302,5 +302,13 @@ EXPORT_SYMBOL_GPL(smcd_handle_event);
  */
 void smcd_handle_irq(struct smcd_dev *smcd, unsigned int dmbno)
 {
+       struct smc_connection *conn = NULL;
+       unsigned long flags;
+
+       spin_lock_irqsave(&smcd->lock, flags);
+       conn = smcd->conn[dmbno];
+       if (conn)
+               tasklet_schedule(&conn->rx_tsklet);
+       spin_unlock_irqrestore(&smcd->lock, flags);
 }
 EXPORT_SYMBOL_GPL(smcd_handle_irq);
index 3d77b383cccd97f7770580f3512e642aeae24d6b..b329803c8339dab65ab2112cb7a1971931286075 100644 (file)
@@ -305,7 +305,7 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
        target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
 
        /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
-       rcvbuf_base = conn->rmb_desc->cpu_addr;
+       rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr;
 
        do { /* while (read_remaining) */
                if (read_done >= target || (pipe && read_done))
index f82886b7d1d8394adada4998159a708c3c897a82..142bcb134dd64879fcade28dc35993886519a04c 100644 (file)
@@ -24,6 +24,7 @@
 #include "smc.h"
 #include "smc_wr.h"
 #include "smc_cdc.h"
+#include "smc_ism.h"
 #include "smc_tx.h"
 
 #define SMC_TX_WORK_DELAY      HZ
@@ -250,6 +251,24 @@ out_err:
 
 /***************************** sndbuf consumer *******************************/
 
+/* sndbuf consumer: actual data transfer of one target chunk with ISM write */
+int smcd_tx_ism_write(struct smc_connection *conn, void *data, size_t len,
+                     u32 offset, int signal)
+{
+       struct smc_ism_position pos;
+       int rc;
+
+       memset(&pos, 0, sizeof(pos));
+       pos.token = conn->peer_token;
+       pos.index = conn->peer_rmbe_idx;
+       pos.offset = conn->tx_off + offset;
+       pos.signal = signal;
+       rc = smc_ism_write(conn->lgr->smcd, &pos, data, len);
+       if (rc)
+               conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
+       return rc;
+}
+
 /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
 static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
                             int num_sges, struct ib_sge sges[])
@@ -297,21 +316,104 @@ static inline void smc_tx_advance_cursors(struct smc_connection *conn,
        smc_curs_add(conn->sndbuf_desc->len, sent, len);
 }
 
+/* SMC-R helper for smc_tx_rdma_writes() */
+static int smcr_tx_rdma_writes(struct smc_connection *conn, size_t len,
+                              size_t src_off, size_t src_len,
+                              size_t dst_off, size_t dst_len)
+{
+       dma_addr_t dma_addr =
+               sg_dma_address(conn->sndbuf_desc->sgt[SMC_SINGLE_LINK].sgl);
+       struct smc_link *link = &conn->lgr->lnk[SMC_SINGLE_LINK];
+       int src_len_sum = src_len, dst_len_sum = dst_len;
+       struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
+       int sent_count = src_off;
+       int srcchunk, dstchunk;
+       int num_sges;
+       int rc;
+
+       for (dstchunk = 0; dstchunk < 2; dstchunk++) {
+               num_sges = 0;
+               for (srcchunk = 0; srcchunk < 2; srcchunk++) {
+                       sges[srcchunk].addr = dma_addr + src_off;
+                       sges[srcchunk].length = src_len;
+                       sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
+                       num_sges++;
+
+                       src_off += src_len;
+                       if (src_off >= conn->sndbuf_desc->len)
+                               src_off -= conn->sndbuf_desc->len;
+                                               /* modulo in send ring */
+                       if (src_len_sum == dst_len)
+                               break; /* either on 1st or 2nd iteration */
+                       /* prepare next (== 2nd) iteration */
+                       src_len = dst_len - src_len; /* remainder */
+                       src_len_sum += src_len;
+               }
+               rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
+               if (rc)
+                       return rc;
+               if (dst_len_sum == len)
+                       break; /* either on 1st or 2nd iteration */
+               /* prepare next (== 2nd) iteration */
+               dst_off = 0; /* modulo offset in RMBE ring buffer */
+               dst_len = len - dst_len; /* remainder */
+               dst_len_sum += dst_len;
+               src_len = min_t(int, dst_len, conn->sndbuf_desc->len -
+                               sent_count);
+               src_len_sum = src_len;
+       }
+       return 0;
+}
+
+/* SMC-D helper for smc_tx_rdma_writes() */
+static int smcd_tx_rdma_writes(struct smc_connection *conn, size_t len,
+                              size_t src_off, size_t src_len,
+                              size_t dst_off, size_t dst_len)
+{
+       int src_len_sum = src_len, dst_len_sum = dst_len;
+       int srcchunk, dstchunk;
+       int rc;
+
+       for (dstchunk = 0; dstchunk < 2; dstchunk++) {
+               for (srcchunk = 0; srcchunk < 2; srcchunk++) {
+                       void *data = conn->sndbuf_desc->cpu_addr + src_off;
+
+                       rc = smcd_tx_ism_write(conn, data, src_len, dst_off +
+                                              sizeof(struct smcd_cdc_msg), 0);
+                       if (rc)
+                               return rc;
+                       dst_off += src_len;
+                       src_off += src_len;
+                       if (src_off >= conn->sndbuf_desc->len)
+                               src_off -= conn->sndbuf_desc->len;
+                                               /* modulo in send ring */
+                       if (src_len_sum == dst_len)
+                               break; /* either on 1st or 2nd iteration */
+                       /* prepare next (== 2nd) iteration */
+                       src_len = dst_len - src_len; /* remainder */
+                       src_len_sum += src_len;
+               }
+               if (dst_len_sum == len)
+                       break; /* either on 1st or 2nd iteration */
+               /* prepare next (== 2nd) iteration */
+               dst_off = 0; /* modulo offset in RMBE ring buffer */
+               dst_len = len - dst_len; /* remainder */
+               dst_len_sum += dst_len;
+               src_len = min_t(int, dst_len, conn->sndbuf_desc->len - src_off);
+               src_len_sum = src_len;
+       }
+       return 0;
+}
+
 /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
  * usable snd_wnd as max transmit
  */
 static int smc_tx_rdma_writes(struct smc_connection *conn)
 {
-       size_t src_off, src_len, dst_off, dst_len; /* current chunk values */
-       size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk;
+       size_t len, src_len, dst_off, dst_len; /* current chunk values */
        union smc_host_cursor sent, prep, prod, cons;
-       struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
-       struct smc_link_group *lgr = conn->lgr;
        struct smc_cdc_producer_flags *pflags;
        int to_send, rmbespace;
-       struct smc_link *link;
-       dma_addr_t dma_addr;
-       int num_sges;
        int rc;
 
        /* source: sndbuf */
@@ -341,7 +443,6 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
        len = min(to_send, rmbespace);
 
        /* initialize variables for first iteration of subsequent nested loop */
-       link = &lgr->lnk[SMC_SINGLE_LINK];
        dst_off = prod.count;
        if (prod.wrap == cons.wrap) {
                /* the filled destination area is unwrapped,
@@ -358,8 +459,6 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
                 */
                dst_len = len;
        }
-       dst_len_sum = dst_len;
-       src_off = sent.count;
        /* dst_len determines the maximum src_len */
        if (sent.count + dst_len <= conn->sndbuf_desc->len) {
                /* unwrapped src case: single chunk of entire dst_len */
@@ -368,38 +467,15 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
                /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
                src_len = conn->sndbuf_desc->len - sent.count;
        }
-       src_len_sum = src_len;
-       dma_addr = sg_dma_address(conn->sndbuf_desc->sgt[SMC_SINGLE_LINK].sgl);
-       for (dstchunk = 0; dstchunk < 2; dstchunk++) {
-               num_sges = 0;
-               for (srcchunk = 0; srcchunk < 2; srcchunk++) {
-                       sges[srcchunk].addr = dma_addr + src_off;
-                       sges[srcchunk].length = src_len;
-                       sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
-                       num_sges++;
-                       src_off += src_len;
-                       if (src_off >= conn->sndbuf_desc->len)
-                               src_off -= conn->sndbuf_desc->len;
-                                               /* modulo in send ring */
-                       if (src_len_sum == dst_len)
-                               break; /* either on 1st or 2nd iteration */
-                       /* prepare next (== 2nd) iteration */
-                       src_len = dst_len - src_len; /* remainder */
-                       src_len_sum += src_len;
-               }
-               rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
-               if (rc)
-                       return rc;
-               if (dst_len_sum == len)
-                       break; /* either on 1st or 2nd iteration */
-               /* prepare next (== 2nd) iteration */
-               dst_off = 0; /* modulo offset in RMBE ring buffer */
-               dst_len = len - dst_len; /* remainder */
-               dst_len_sum += dst_len;
-               src_len = min_t(int,
-                               dst_len, conn->sndbuf_desc->len - sent.count);
-               src_len_sum = src_len;
-       }
+
+       if (conn->lgr->is_smcd)
+               rc = smcd_tx_rdma_writes(conn, len, sent.count, src_len,
+                                        dst_off, dst_len);
+       else
+               rc = smcr_tx_rdma_writes(conn, len, sent.count, src_len,
+                                        dst_off, dst_len);
+       if (rc)
+               return rc;
 
        if (conn->urg_tx_pend && len == to_send)
                pflags->urg_data_present = 1;
@@ -420,7 +496,7 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
 /* Wakeup sndbuf consumers from any context (IRQ or process)
  * since there is more data to transmit; usable snd_wnd as max transmit
  */
-int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
+static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
 {
        struct smc_cdc_producer_flags *pflags;
        struct smc_cdc_tx_pend *pend;
@@ -467,6 +543,37 @@ out_unlock:
        return rc;
 }
 
+static int smcd_tx_sndbuf_nonempty(struct smc_connection *conn)
+{
+       struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
+       int rc = 0;
+
+       spin_lock_bh(&conn->send_lock);
+       if (!pflags->urg_data_present)
+               rc = smc_tx_rdma_writes(conn);
+       if (!rc)
+               rc = smcd_cdc_msg_send(conn);
+
+       if (!rc && pflags->urg_data_present) {
+               pflags->urg_data_pending = 0;
+               pflags->urg_data_present = 0;
+       }
+       spin_unlock_bh(&conn->send_lock);
+       return rc;
+}
+
+int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
+{
+       int rc;
+
+       if (conn->lgr->is_smcd)
+               rc = smcd_tx_sndbuf_nonempty(conn);
+       else
+               rc = smcr_tx_sndbuf_nonempty(conn);
+
+       return rc;
+}
+
 /* Wakeup sndbuf consumers from process context
  * since there is more data to transmit
  */
index 9d2238909fa08d72e63537607132dd3ac6a4e93f..b22bdc5694c4da4ca41c0c166b2862153efa8257 100644 (file)
@@ -33,5 +33,7 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len);
 int smc_tx_sndbuf_nonempty(struct smc_connection *conn);
 void smc_tx_sndbuf_nonfull(struct smc_sock *smc);
 void smc_tx_consumer_update(struct smc_connection *conn, bool force);
+int smcd_tx_ism_write(struct smc_connection *conn, void *data, size_t len,
+                     u32 offset, int signal);
 
 #endif /* SMC_TX_H */