Merge branch 'x86-mm-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git...
[sfrench/cifs-2.6.git] / fs / cifs / smbdirect.c
1 /*
2  *   Copyright (C) 2017, Microsoft Corporation.
3  *
4  *   Author(s): Long Li <longli@microsoft.com>
5  *
6  *   This program is free software;  you can redistribute it and/or modify
7  *   it under the terms of the GNU General Public License as published by
8  *   the Free Software Foundation; either version 2 of the License, or
9  *   (at your option) any later version.
10  *
11  *   This program is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY;  without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See
14  *   the GNU General Public License for more details.
15  */
16 #include <linux/module.h>
17 #include <linux/highmem.h>
18 #include "smbdirect.h"
19 #include "cifs_debug.h"
20
21 static struct smbd_response *get_empty_queue_buffer(
22                 struct smbd_connection *info);
23 static struct smbd_response *get_receive_buffer(
24                 struct smbd_connection *info);
25 static void put_receive_buffer(
26                 struct smbd_connection *info,
27                 struct smbd_response *response);
28 static int allocate_receive_buffers(struct smbd_connection *info, int num_buf);
29 static void destroy_receive_buffers(struct smbd_connection *info);
30
31 static void put_empty_packet(
32                 struct smbd_connection *info, struct smbd_response *response);
33 static void enqueue_reassembly(
34                 struct smbd_connection *info,
35                 struct smbd_response *response, int data_length);
36 static struct smbd_response *_get_first_reassembly(
37                 struct smbd_connection *info);
38
39 static int smbd_post_recv(
40                 struct smbd_connection *info,
41                 struct smbd_response *response);
42
43 static int smbd_post_send_empty(struct smbd_connection *info);
44 static int smbd_post_send_data(
45                 struct smbd_connection *info,
46                 struct kvec *iov, int n_vec, int remaining_data_length);
47 static int smbd_post_send_page(struct smbd_connection *info,
48                 struct page *page, unsigned long offset,
49                 size_t size, int remaining_data_length);
50
51 static void destroy_mr_list(struct smbd_connection *info);
52 static int allocate_mr_list(struct smbd_connection *info);
53
54 /* SMBD version number */
55 #define SMBD_V1 0x0100
56
57 /* Port numbers for SMBD transport */
58 #define SMB_PORT        445
59 #define SMBD_PORT       5445
60
61 /* Address lookup and resolve timeout in ms */
62 #define RDMA_RESOLVE_TIMEOUT    5000
63
64 /* SMBD negotiation timeout in seconds */
65 #define SMBD_NEGOTIATE_TIMEOUT  120
66
67 /* SMBD minimum receive size and fragmented sized defined in [MS-SMBD] */
68 #define SMBD_MIN_RECEIVE_SIZE           128
69 #define SMBD_MIN_FRAGMENTED_SIZE        131072
70
71 /*
72  * Default maximum number of RDMA read/write outstanding on this connection
73  * This value is possibly decreased during QP creation on hardware limit
74  */
75 #define SMBD_CM_RESPONDER_RESOURCES     32
76
77 /* Maximum number of retries on data transfer operations */
78 #define SMBD_CM_RETRY                   6
79 /* No need to retry on Receiver Not Ready since SMBD manages credits */
80 #define SMBD_CM_RNR_RETRY               0
81
82 /*
83  * User configurable initial values per SMBD transport connection
84  * as defined in [MS-SMBD] 3.1.1.1
85  * Those may change after a SMBD negotiation
86  */
87 /* The local peer's maximum number of credits to grant to the peer */
88 int smbd_receive_credit_max = 255;
89
90 /* The remote peer's credit request of local peer */
91 int smbd_send_credit_target = 255;
92
93 /* The maximum single message size can be sent to remote peer */
94 int smbd_max_send_size = 1364;
95
96 /*  The maximum fragmented upper-layer payload receive size supported */
97 int smbd_max_fragmented_recv_size = 1024 * 1024;
98
99 /*  The maximum single-message size which can be received */
100 int smbd_max_receive_size = 8192;
101
102 /* The timeout to initiate send of a keepalive message on idle */
103 int smbd_keep_alive_interval = 120;
104
105 /*
106  * User configurable initial values for RDMA transport
107  * The actual values used may be lower and are limited to hardware capabilities
108  */
109 /* Default maximum number of SGEs in a RDMA write/read */
110 int smbd_max_frmr_depth = 2048;
111
112 /* If payload is less than this byte, use RDMA send/recv not read/write */
113 int rdma_readwrite_threshold = 4096;
114
115 /* Transport logging functions
116  * Logging are defined as classes. They can be OR'ed to define the actual
117  * logging level via module parameter smbd_logging_class
118  * e.g. cifs.smbd_logging_class=0xa0 will log all log_rdma_recv() and
119  * log_rdma_event()
120  */
121 #define LOG_OUTGOING                    0x1
122 #define LOG_INCOMING                    0x2
123 #define LOG_READ                        0x4
124 #define LOG_WRITE                       0x8
125 #define LOG_RDMA_SEND                   0x10
126 #define LOG_RDMA_RECV                   0x20
127 #define LOG_KEEP_ALIVE                  0x40
128 #define LOG_RDMA_EVENT                  0x80
129 #define LOG_RDMA_MR                     0x100
130 static unsigned int smbd_logging_class;
131 module_param(smbd_logging_class, uint, 0644);
132 MODULE_PARM_DESC(smbd_logging_class,
133         "Logging class for SMBD transport 0x0 to 0x100");
134
135 #define ERR             0x0
136 #define INFO            0x1
137 static unsigned int smbd_logging_level = ERR;
138 module_param(smbd_logging_level, uint, 0644);
139 MODULE_PARM_DESC(smbd_logging_level,
140         "Logging level for SMBD transport, 0 (default): error, 1: info");
141
142 #define log_rdma(level, class, fmt, args...)                            \
143 do {                                                                    \
144         if (level <= smbd_logging_level || class & smbd_logging_class)  \
145                 cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
146 } while (0)
147
148 #define log_outgoing(level, fmt, args...) \
149                 log_rdma(level, LOG_OUTGOING, fmt, ##args)
150 #define log_incoming(level, fmt, args...) \
151                 log_rdma(level, LOG_INCOMING, fmt, ##args)
152 #define log_read(level, fmt, args...)   log_rdma(level, LOG_READ, fmt, ##args)
153 #define log_write(level, fmt, args...)  log_rdma(level, LOG_WRITE, fmt, ##args)
154 #define log_rdma_send(level, fmt, args...) \
155                 log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
156 #define log_rdma_recv(level, fmt, args...) \
157                 log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
158 #define log_keep_alive(level, fmt, args...) \
159                 log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
160 #define log_rdma_event(level, fmt, args...) \
161                 log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
162 #define log_rdma_mr(level, fmt, args...) \
163                 log_rdma(level, LOG_RDMA_MR, fmt, ##args)
164
165 /*
166  * Destroy the transport and related RDMA and memory resources
167  * Need to go through all the pending counters and make sure on one is using
168  * the transport while it is destroyed
169  */
170 static void smbd_destroy_rdma_work(struct work_struct *work)
171 {
172         struct smbd_response *response;
173         struct smbd_connection *info =
174                 container_of(work, struct smbd_connection, destroy_work);
175         unsigned long flags;
176
177         log_rdma_event(INFO, "destroying qp\n");
178         ib_drain_qp(info->id->qp);
179         rdma_destroy_qp(info->id);
180
181         /* Unblock all I/O waiting on the send queue */
182         wake_up_interruptible_all(&info->wait_send_queue);
183
184         log_rdma_event(INFO, "cancelling idle timer\n");
185         cancel_delayed_work_sync(&info->idle_timer_work);
186         log_rdma_event(INFO, "cancelling send immediate work\n");
187         cancel_delayed_work_sync(&info->send_immediate_work);
188
189         log_rdma_event(INFO, "wait for all send to finish\n");
190         wait_event(info->wait_smbd_send_pending,
191                 info->smbd_send_pending == 0);
192
193         log_rdma_event(INFO, "wait for all recv to finish\n");
194         wake_up_interruptible(&info->wait_reassembly_queue);
195         wait_event(info->wait_smbd_recv_pending,
196                 info->smbd_recv_pending == 0);
197
198         log_rdma_event(INFO, "wait for all send posted to IB to finish\n");
199         wait_event(info->wait_send_pending,
200                 atomic_read(&info->send_pending) == 0);
201         wait_event(info->wait_send_payload_pending,
202                 atomic_read(&info->send_payload_pending) == 0);
203
204         log_rdma_event(INFO, "freeing mr list\n");
205         wake_up_interruptible_all(&info->wait_mr);
206         wait_event(info->wait_for_mr_cleanup,
207                 atomic_read(&info->mr_used_count) == 0);
208         destroy_mr_list(info);
209
210         /* It's not posssible for upper layer to get to reassembly */
211         log_rdma_event(INFO, "drain the reassembly queue\n");
212         do {
213                 spin_lock_irqsave(&info->reassembly_queue_lock, flags);
214                 response = _get_first_reassembly(info);
215                 if (response) {
216                         list_del(&response->list);
217                         spin_unlock_irqrestore(
218                                 &info->reassembly_queue_lock, flags);
219                         put_receive_buffer(info, response);
220                 }
221         } while (response);
222         spin_unlock_irqrestore(&info->reassembly_queue_lock, flags);
223         info->reassembly_data_length = 0;
224
225         log_rdma_event(INFO, "free receive buffers\n");
226         wait_event(info->wait_receive_queues,
227                 info->count_receive_queue + info->count_empty_packet_queue
228                         == info->receive_credit_max);
229         destroy_receive_buffers(info);
230
231         ib_free_cq(info->send_cq);
232         ib_free_cq(info->recv_cq);
233         ib_dealloc_pd(info->pd);
234         rdma_destroy_id(info->id);
235
236         /* free mempools */
237         mempool_destroy(info->request_mempool);
238         kmem_cache_destroy(info->request_cache);
239
240         mempool_destroy(info->response_mempool);
241         kmem_cache_destroy(info->response_cache);
242
243         info->transport_status = SMBD_DESTROYED;
244         wake_up_all(&info->wait_destroy);
245 }
246
247 static int smbd_process_disconnected(struct smbd_connection *info)
248 {
249         schedule_work(&info->destroy_work);
250         return 0;
251 }
252
253 static void smbd_disconnect_rdma_work(struct work_struct *work)
254 {
255         struct smbd_connection *info =
256                 container_of(work, struct smbd_connection, disconnect_work);
257
258         if (info->transport_status == SMBD_CONNECTED) {
259                 info->transport_status = SMBD_DISCONNECTING;
260                 rdma_disconnect(info->id);
261         }
262 }
263
264 static void smbd_disconnect_rdma_connection(struct smbd_connection *info)
265 {
266         queue_work(info->workqueue, &info->disconnect_work);
267 }
268
269 /* Upcall from RDMA CM */
270 static int smbd_conn_upcall(
271                 struct rdma_cm_id *id, struct rdma_cm_event *event)
272 {
273         struct smbd_connection *info = id->context;
274
275         log_rdma_event(INFO, "event=%d status=%d\n",
276                 event->event, event->status);
277
278         switch (event->event) {
279         case RDMA_CM_EVENT_ADDR_RESOLVED:
280         case RDMA_CM_EVENT_ROUTE_RESOLVED:
281                 info->ri_rc = 0;
282                 complete(&info->ri_done);
283                 break;
284
285         case RDMA_CM_EVENT_ADDR_ERROR:
286                 info->ri_rc = -EHOSTUNREACH;
287                 complete(&info->ri_done);
288                 break;
289
290         case RDMA_CM_EVENT_ROUTE_ERROR:
291                 info->ri_rc = -ENETUNREACH;
292                 complete(&info->ri_done);
293                 break;
294
295         case RDMA_CM_EVENT_ESTABLISHED:
296                 log_rdma_event(INFO, "connected event=%d\n", event->event);
297                 info->transport_status = SMBD_CONNECTED;
298                 wake_up_interruptible(&info->conn_wait);
299                 break;
300
301         case RDMA_CM_EVENT_CONNECT_ERROR:
302         case RDMA_CM_EVENT_UNREACHABLE:
303         case RDMA_CM_EVENT_REJECTED:
304                 log_rdma_event(INFO, "connecting failed event=%d\n", event->event);
305                 info->transport_status = SMBD_DISCONNECTED;
306                 wake_up_interruptible(&info->conn_wait);
307                 break;
308
309         case RDMA_CM_EVENT_DEVICE_REMOVAL:
310         case RDMA_CM_EVENT_DISCONNECTED:
311                 /* This happenes when we fail the negotiation */
312                 if (info->transport_status == SMBD_NEGOTIATE_FAILED) {
313                         info->transport_status = SMBD_DISCONNECTED;
314                         wake_up(&info->conn_wait);
315                         break;
316                 }
317
318                 info->transport_status = SMBD_DISCONNECTED;
319                 smbd_process_disconnected(info);
320                 break;
321
322         default:
323                 break;
324         }
325
326         return 0;
327 }
328
329 /* Upcall from RDMA QP */
330 static void
331 smbd_qp_async_error_upcall(struct ib_event *event, void *context)
332 {
333         struct smbd_connection *info = context;
334
335         log_rdma_event(ERR, "%s on device %s info %p\n",
336                 ib_event_msg(event->event), event->device->name, info);
337
338         switch (event->event) {
339         case IB_EVENT_CQ_ERR:
340         case IB_EVENT_QP_FATAL:
341                 smbd_disconnect_rdma_connection(info);
342
343         default:
344                 break;
345         }
346 }
347
348 static inline void *smbd_request_payload(struct smbd_request *request)
349 {
350         return (void *)request->packet;
351 }
352
353 static inline void *smbd_response_payload(struct smbd_response *response)
354 {
355         return (void *)response->packet;
356 }
357
358 /* Called when a RDMA send is done */
359 static void send_done(struct ib_cq *cq, struct ib_wc *wc)
360 {
361         int i;
362         struct smbd_request *request =
363                 container_of(wc->wr_cqe, struct smbd_request, cqe);
364
365         log_rdma_send(INFO, "smbd_request %p completed wc->status=%d\n",
366                 request, wc->status);
367
368         if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
369                 log_rdma_send(ERR, "wc->status=%d wc->opcode=%d\n",
370                         wc->status, wc->opcode);
371                 smbd_disconnect_rdma_connection(request->info);
372         }
373
374         for (i = 0; i < request->num_sge; i++)
375                 ib_dma_unmap_single(request->info->id->device,
376                         request->sge[i].addr,
377                         request->sge[i].length,
378                         DMA_TO_DEVICE);
379
380         if (request->has_payload) {
381                 if (atomic_dec_and_test(&request->info->send_payload_pending))
382                         wake_up(&request->info->wait_send_payload_pending);
383         } else {
384                 if (atomic_dec_and_test(&request->info->send_pending))
385                         wake_up(&request->info->wait_send_pending);
386         }
387
388         mempool_free(request, request->info->request_mempool);
389 }
390
391 static void dump_smbd_negotiate_resp(struct smbd_negotiate_resp *resp)
392 {
393         log_rdma_event(INFO, "resp message min_version %u max_version %u "
394                 "negotiated_version %u credits_requested %u "
395                 "credits_granted %u status %u max_readwrite_size %u "
396                 "preferred_send_size %u max_receive_size %u "
397                 "max_fragmented_size %u\n",
398                 resp->min_version, resp->max_version, resp->negotiated_version,
399                 resp->credits_requested, resp->credits_granted, resp->status,
400                 resp->max_readwrite_size, resp->preferred_send_size,
401                 resp->max_receive_size, resp->max_fragmented_size);
402 }
403
404 /*
405  * Process a negotiation response message, according to [MS-SMBD]3.1.5.7
406  * response, packet_length: the negotiation response message
407  * return value: true if negotiation is a success, false if failed
408  */
409 static bool process_negotiation_response(
410                 struct smbd_response *response, int packet_length)
411 {
412         struct smbd_connection *info = response->info;
413         struct smbd_negotiate_resp *packet = smbd_response_payload(response);
414
415         if (packet_length < sizeof(struct smbd_negotiate_resp)) {
416                 log_rdma_event(ERR,
417                         "error: packet_length=%d\n", packet_length);
418                 return false;
419         }
420
421         if (le16_to_cpu(packet->negotiated_version) != SMBD_V1) {
422                 log_rdma_event(ERR, "error: negotiated_version=%x\n",
423                         le16_to_cpu(packet->negotiated_version));
424                 return false;
425         }
426         info->protocol = le16_to_cpu(packet->negotiated_version);
427
428         if (packet->credits_requested == 0) {
429                 log_rdma_event(ERR, "error: credits_requested==0\n");
430                 return false;
431         }
432         info->receive_credit_target = le16_to_cpu(packet->credits_requested);
433
434         if (packet->credits_granted == 0) {
435                 log_rdma_event(ERR, "error: credits_granted==0\n");
436                 return false;
437         }
438         atomic_set(&info->send_credits, le16_to_cpu(packet->credits_granted));
439
440         atomic_set(&info->receive_credits, 0);
441
442         if (le32_to_cpu(packet->preferred_send_size) > info->max_receive_size) {
443                 log_rdma_event(ERR, "error: preferred_send_size=%d\n",
444                         le32_to_cpu(packet->preferred_send_size));
445                 return false;
446         }
447         info->max_receive_size = le32_to_cpu(packet->preferred_send_size);
448
449         if (le32_to_cpu(packet->max_receive_size) < SMBD_MIN_RECEIVE_SIZE) {
450                 log_rdma_event(ERR, "error: max_receive_size=%d\n",
451                         le32_to_cpu(packet->max_receive_size));
452                 return false;
453         }
454         info->max_send_size = min_t(int, info->max_send_size,
455                                         le32_to_cpu(packet->max_receive_size));
456
457         if (le32_to_cpu(packet->max_fragmented_size) <
458                         SMBD_MIN_FRAGMENTED_SIZE) {
459                 log_rdma_event(ERR, "error: max_fragmented_size=%d\n",
460                         le32_to_cpu(packet->max_fragmented_size));
461                 return false;
462         }
463         info->max_fragmented_send_size =
464                 le32_to_cpu(packet->max_fragmented_size);
465         info->rdma_readwrite_threshold =
466                 rdma_readwrite_threshold > info->max_fragmented_send_size ?
467                 info->max_fragmented_send_size :
468                 rdma_readwrite_threshold;
469
470
471         info->max_readwrite_size = min_t(u32,
472                         le32_to_cpu(packet->max_readwrite_size),
473                         info->max_frmr_depth * PAGE_SIZE);
474         info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
475
476         return true;
477 }
478
479 /*
480  * Check and schedule to send an immediate packet
481  * This is used to extend credtis to remote peer to keep the transport busy
482  */
483 static void check_and_send_immediate(struct smbd_connection *info)
484 {
485         if (info->transport_status != SMBD_CONNECTED)
486                 return;
487
488         info->send_immediate = true;
489
490         /*
491          * Promptly send a packet if our peer is running low on receive
492          * credits
493          */
494         if (atomic_read(&info->receive_credits) <
495                 info->receive_credit_target - 1)
496                 queue_delayed_work(
497                         info->workqueue, &info->send_immediate_work, 0);
498 }
499
500 static void smbd_post_send_credits(struct work_struct *work)
501 {
502         int ret = 0;
503         int use_receive_queue = 1;
504         int rc;
505         struct smbd_response *response;
506         struct smbd_connection *info =
507                 container_of(work, struct smbd_connection,
508                         post_send_credits_work);
509
510         if (info->transport_status != SMBD_CONNECTED) {
511                 wake_up(&info->wait_receive_queues);
512                 return;
513         }
514
515         if (info->receive_credit_target >
516                 atomic_read(&info->receive_credits)) {
517                 while (true) {
518                         if (use_receive_queue)
519                                 response = get_receive_buffer(info);
520                         else
521                                 response = get_empty_queue_buffer(info);
522                         if (!response) {
523                                 /* now switch to emtpy packet queue */
524                                 if (use_receive_queue) {
525                                         use_receive_queue = 0;
526                                         continue;
527                                 } else
528                                         break;
529                         }
530
531                         response->type = SMBD_TRANSFER_DATA;
532                         response->first_segment = false;
533                         rc = smbd_post_recv(info, response);
534                         if (rc) {
535                                 log_rdma_recv(ERR,
536                                         "post_recv failed rc=%d\n", rc);
537                                 put_receive_buffer(info, response);
538                                 break;
539                         }
540
541                         ret++;
542                 }
543         }
544
545         spin_lock(&info->lock_new_credits_offered);
546         info->new_credits_offered += ret;
547         spin_unlock(&info->lock_new_credits_offered);
548
549         atomic_add(ret, &info->receive_credits);
550
551         /* Check if we can post new receive and grant credits to peer */
552         check_and_send_immediate(info);
553 }
554
555 static void smbd_recv_done_work(struct work_struct *work)
556 {
557         struct smbd_connection *info =
558                 container_of(work, struct smbd_connection, recv_done_work);
559
560         /*
561          * We may have new send credits granted from remote peer
562          * If any sender is blcoked on lack of credets, unblock it
563          */
564         if (atomic_read(&info->send_credits))
565                 wake_up_interruptible(&info->wait_send_queue);
566
567         /*
568          * Check if we need to send something to remote peer to
569          * grant more credits or respond to KEEP_ALIVE packet
570          */
571         check_and_send_immediate(info);
572 }
573
574 /* Called from softirq, when recv is done */
575 static void recv_done(struct ib_cq *cq, struct ib_wc *wc)
576 {
577         struct smbd_data_transfer *data_transfer;
578         struct smbd_response *response =
579                 container_of(wc->wr_cqe, struct smbd_response, cqe);
580         struct smbd_connection *info = response->info;
581         int data_length = 0;
582
583         log_rdma_recv(INFO, "response=%p type=%d wc status=%d wc opcode %d "
584                       "byte_len=%d pkey_index=%x\n",
585                 response, response->type, wc->status, wc->opcode,
586                 wc->byte_len, wc->pkey_index);
587
588         if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_RECV) {
589                 log_rdma_recv(INFO, "wc->status=%d opcode=%d\n",
590                         wc->status, wc->opcode);
591                 smbd_disconnect_rdma_connection(info);
592                 goto error;
593         }
594
595         ib_dma_sync_single_for_cpu(
596                 wc->qp->device,
597                 response->sge.addr,
598                 response->sge.length,
599                 DMA_FROM_DEVICE);
600
601         switch (response->type) {
602         /* SMBD negotiation response */
603         case SMBD_NEGOTIATE_RESP:
604                 dump_smbd_negotiate_resp(smbd_response_payload(response));
605                 info->full_packet_received = true;
606                 info->negotiate_done =
607                         process_negotiation_response(response, wc->byte_len);
608                 complete(&info->negotiate_completion);
609                 break;
610
611         /* SMBD data transfer packet */
612         case SMBD_TRANSFER_DATA:
613                 data_transfer = smbd_response_payload(response);
614                 data_length = le32_to_cpu(data_transfer->data_length);
615
616                 /*
617                  * If this is a packet with data playload place the data in
618                  * reassembly queue and wake up the reading thread
619                  */
620                 if (data_length) {
621                         if (info->full_packet_received)
622                                 response->first_segment = true;
623
624                         if (le32_to_cpu(data_transfer->remaining_data_length))
625                                 info->full_packet_received = false;
626                         else
627                                 info->full_packet_received = true;
628
629                         enqueue_reassembly(
630                                 info,
631                                 response,
632                                 data_length);
633                 } else
634                         put_empty_packet(info, response);
635
636                 if (data_length)
637                         wake_up_interruptible(&info->wait_reassembly_queue);
638
639                 atomic_dec(&info->receive_credits);
640                 info->receive_credit_target =
641                         le16_to_cpu(data_transfer->credits_requested);
642                 atomic_add(le16_to_cpu(data_transfer->credits_granted),
643                         &info->send_credits);
644
645                 log_incoming(INFO, "data flags %d data_offset %d "
646                         "data_length %d remaining_data_length %d\n",
647                         le16_to_cpu(data_transfer->flags),
648                         le32_to_cpu(data_transfer->data_offset),
649                         le32_to_cpu(data_transfer->data_length),
650                         le32_to_cpu(data_transfer->remaining_data_length));
651
652                 /* Send a KEEP_ALIVE response right away if requested */
653                 info->keep_alive_requested = KEEP_ALIVE_NONE;
654                 if (le16_to_cpu(data_transfer->flags) &
655                                 SMB_DIRECT_RESPONSE_REQUESTED) {
656                         info->keep_alive_requested = KEEP_ALIVE_PENDING;
657                 }
658
659                 queue_work(info->workqueue, &info->recv_done_work);
660                 return;
661
662         default:
663                 log_rdma_recv(ERR,
664                         "unexpected response type=%d\n", response->type);
665         }
666
667 error:
668         put_receive_buffer(info, response);
669 }
670
671 static struct rdma_cm_id *smbd_create_id(
672                 struct smbd_connection *info,
673                 struct sockaddr *dstaddr, int port)
674 {
675         struct rdma_cm_id *id;
676         int rc;
677         __be16 *sport;
678
679         id = rdma_create_id(&init_net, smbd_conn_upcall, info,
680                 RDMA_PS_TCP, IB_QPT_RC);
681         if (IS_ERR(id)) {
682                 rc = PTR_ERR(id);
683                 log_rdma_event(ERR, "rdma_create_id() failed %i\n", rc);
684                 return id;
685         }
686
687         if (dstaddr->sa_family == AF_INET6)
688                 sport = &((struct sockaddr_in6 *)dstaddr)->sin6_port;
689         else
690                 sport = &((struct sockaddr_in *)dstaddr)->sin_port;
691
692         *sport = htons(port);
693
694         init_completion(&info->ri_done);
695         info->ri_rc = -ETIMEDOUT;
696
697         rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)dstaddr,
698                 RDMA_RESOLVE_TIMEOUT);
699         if (rc) {
700                 log_rdma_event(ERR, "rdma_resolve_addr() failed %i\n", rc);
701                 goto out;
702         }
703         wait_for_completion_interruptible_timeout(
704                 &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
705         rc = info->ri_rc;
706         if (rc) {
707                 log_rdma_event(ERR, "rdma_resolve_addr() completed %i\n", rc);
708                 goto out;
709         }
710
711         info->ri_rc = -ETIMEDOUT;
712         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
713         if (rc) {
714                 log_rdma_event(ERR, "rdma_resolve_route() failed %i\n", rc);
715                 goto out;
716         }
717         wait_for_completion_interruptible_timeout(
718                 &info->ri_done, msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT));
719         rc = info->ri_rc;
720         if (rc) {
721                 log_rdma_event(ERR, "rdma_resolve_route() completed %i\n", rc);
722                 goto out;
723         }
724
725         return id;
726
727 out:
728         rdma_destroy_id(id);
729         return ERR_PTR(rc);
730 }
731
732 /*
733  * Test if FRWR (Fast Registration Work Requests) is supported on the device
734  * This implementation requries FRWR on RDMA read/write
735  * return value: true if it is supported
736  */
737 static bool frwr_is_supported(struct ib_device_attr *attrs)
738 {
739         if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
740                 return false;
741         if (attrs->max_fast_reg_page_list_len == 0)
742                 return false;
743         return true;
744 }
745
746 static int smbd_ia_open(
747                 struct smbd_connection *info,
748                 struct sockaddr *dstaddr, int port)
749 {
750         int rc;
751
752         info->id = smbd_create_id(info, dstaddr, port);
753         if (IS_ERR(info->id)) {
754                 rc = PTR_ERR(info->id);
755                 goto out1;
756         }
757
758         if (!frwr_is_supported(&info->id->device->attrs)) {
759                 log_rdma_event(ERR,
760                         "Fast Registration Work Requests "
761                         "(FRWR) is not supported\n");
762                 log_rdma_event(ERR,
763                         "Device capability flags = %llx "
764                         "max_fast_reg_page_list_len = %u\n",
765                         info->id->device->attrs.device_cap_flags,
766                         info->id->device->attrs.max_fast_reg_page_list_len);
767                 rc = -EPROTONOSUPPORT;
768                 goto out2;
769         }
770         info->max_frmr_depth = min_t(int,
771                 smbd_max_frmr_depth,
772                 info->id->device->attrs.max_fast_reg_page_list_len);
773         info->mr_type = IB_MR_TYPE_MEM_REG;
774         if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
775                 info->mr_type = IB_MR_TYPE_SG_GAPS;
776
777         info->pd = ib_alloc_pd(info->id->device, 0);
778         if (IS_ERR(info->pd)) {
779                 rc = PTR_ERR(info->pd);
780                 log_rdma_event(ERR, "ib_alloc_pd() returned %d\n", rc);
781                 goto out2;
782         }
783
784         return 0;
785
786 out2:
787         rdma_destroy_id(info->id);
788         info->id = NULL;
789
790 out1:
791         return rc;
792 }
793
794 /*
795  * Send a negotiation request message to the peer
796  * The negotiation procedure is in [MS-SMBD] 3.1.5.2 and 3.1.5.3
797  * After negotiation, the transport is connected and ready for
798  * carrying upper layer SMB payload
799  */
800 static int smbd_post_send_negotiate_req(struct smbd_connection *info)
801 {
802         struct ib_send_wr send_wr, *send_wr_fail;
803         int rc = -ENOMEM;
804         struct smbd_request *request;
805         struct smbd_negotiate_req *packet;
806
807         request = mempool_alloc(info->request_mempool, GFP_KERNEL);
808         if (!request)
809                 return rc;
810
811         request->info = info;
812
813         packet = smbd_request_payload(request);
814         packet->min_version = cpu_to_le16(SMBD_V1);
815         packet->max_version = cpu_to_le16(SMBD_V1);
816         packet->reserved = 0;
817         packet->credits_requested = cpu_to_le16(info->send_credit_target);
818         packet->preferred_send_size = cpu_to_le32(info->max_send_size);
819         packet->max_receive_size = cpu_to_le32(info->max_receive_size);
820         packet->max_fragmented_size =
821                 cpu_to_le32(info->max_fragmented_recv_size);
822
823         request->num_sge = 1;
824         request->sge[0].addr = ib_dma_map_single(
825                                 info->id->device, (void *)packet,
826                                 sizeof(*packet), DMA_TO_DEVICE);
827         if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
828                 rc = -EIO;
829                 goto dma_mapping_failed;
830         }
831
832         request->sge[0].length = sizeof(*packet);
833         request->sge[0].lkey = info->pd->local_dma_lkey;
834
835         ib_dma_sync_single_for_device(
836                 info->id->device, request->sge[0].addr,
837                 request->sge[0].length, DMA_TO_DEVICE);
838
839         request->cqe.done = send_done;
840
841         send_wr.next = NULL;
842         send_wr.wr_cqe = &request->cqe;
843         send_wr.sg_list = request->sge;
844         send_wr.num_sge = request->num_sge;
845         send_wr.opcode = IB_WR_SEND;
846         send_wr.send_flags = IB_SEND_SIGNALED;
847
848         log_rdma_send(INFO, "sge addr=%llx length=%x lkey=%x\n",
849                 request->sge[0].addr,
850                 request->sge[0].length, request->sge[0].lkey);
851
852         request->has_payload = false;
853         atomic_inc(&info->send_pending);
854         rc = ib_post_send(info->id->qp, &send_wr, &send_wr_fail);
855         if (!rc)
856                 return 0;
857
858         /* if we reach here, post send failed */
859         log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
860         atomic_dec(&info->send_pending);
861         ib_dma_unmap_single(info->id->device, request->sge[0].addr,
862                 request->sge[0].length, DMA_TO_DEVICE);
863
864 dma_mapping_failed:
865         mempool_free(request, info->request_mempool);
866         return rc;
867 }
868
869 /*
870  * Extend the credits to remote peer
871  * This implements [MS-SMBD] 3.1.5.9
872  * The idea is that we should extend credits to remote peer as quickly as
873  * it's allowed, to maintain data flow. We allocate as much receive
874  * buffer as possible, and extend the receive credits to remote peer
875  * return value: the new credtis being granted.
876  */
877 static int manage_credits_prior_sending(struct smbd_connection *info)
878 {
879         int new_credits;
880
881         spin_lock(&info->lock_new_credits_offered);
882         new_credits = info->new_credits_offered;
883         info->new_credits_offered = 0;
884         spin_unlock(&info->lock_new_credits_offered);
885
886         return new_credits;
887 }
888
889 /*
890  * Check if we need to send a KEEP_ALIVE message
891  * The idle connection timer triggers a KEEP_ALIVE message when expires
892  * SMB_DIRECT_RESPONSE_REQUESTED is set in the message flag to have peer send
893  * back a response.
894  * return value:
895  * 1 if SMB_DIRECT_RESPONSE_REQUESTED needs to be set
896  * 0: otherwise
897  */
898 static int manage_keep_alive_before_sending(struct smbd_connection *info)
899 {
900         if (info->keep_alive_requested == KEEP_ALIVE_PENDING) {
901                 info->keep_alive_requested = KEEP_ALIVE_SENT;
902                 return 1;
903         }
904         return 0;
905 }
906
907 /*
908  * Build and prepare the SMBD packet header
909  * This function waits for avaialbe send credits and build a SMBD packet
910  * header. The caller then optional append payload to the packet after
911  * the header
912  * intput values
913  * size: the size of the payload
914  * remaining_data_length: remaining data to send if this is part of a
915  * fragmented packet
916  * output values
917  * request_out: the request allocated from this function
918  * return values: 0 on success, otherwise actual error code returned
919  */
920 static int smbd_create_header(struct smbd_connection *info,
921                 int size, int remaining_data_length,
922                 struct smbd_request **request_out)
923 {
924         struct smbd_request *request;
925         struct smbd_data_transfer *packet;
926         int header_length;
927         int rc;
928
929         /* Wait for send credits. A SMBD packet needs one credit */
930         rc = wait_event_interruptible(info->wait_send_queue,
931                 atomic_read(&info->send_credits) > 0 ||
932                 info->transport_status != SMBD_CONNECTED);
933         if (rc)
934                 return rc;
935
936         if (info->transport_status != SMBD_CONNECTED) {
937                 log_outgoing(ERR, "disconnected not sending\n");
938                 return -ENOENT;
939         }
940         atomic_dec(&info->send_credits);
941
942         request = mempool_alloc(info->request_mempool, GFP_KERNEL);
943         if (!request) {
944                 rc = -ENOMEM;
945                 goto err;
946         }
947
948         request->info = info;
949
950         /* Fill in the packet header */
951         packet = smbd_request_payload(request);
952         packet->credits_requested = cpu_to_le16(info->send_credit_target);
953         packet->credits_granted =
954                 cpu_to_le16(manage_credits_prior_sending(info));
955         info->send_immediate = false;
956
957         packet->flags = 0;
958         if (manage_keep_alive_before_sending(info))
959                 packet->flags |= cpu_to_le16(SMB_DIRECT_RESPONSE_REQUESTED);
960
961         packet->reserved = 0;
962         if (!size)
963                 packet->data_offset = 0;
964         else
965                 packet->data_offset = cpu_to_le32(24);
966         packet->data_length = cpu_to_le32(size);
967         packet->remaining_data_length = cpu_to_le32(remaining_data_length);
968         packet->padding = 0;
969
970         log_outgoing(INFO, "credits_requested=%d credits_granted=%d "
971                 "data_offset=%d data_length=%d remaining_data_length=%d\n",
972                 le16_to_cpu(packet->credits_requested),
973                 le16_to_cpu(packet->credits_granted),
974                 le32_to_cpu(packet->data_offset),
975                 le32_to_cpu(packet->data_length),
976                 le32_to_cpu(packet->remaining_data_length));
977
978         /* Map the packet to DMA */
979         header_length = sizeof(struct smbd_data_transfer);
980         /* If this is a packet without payload, don't send padding */
981         if (!size)
982                 header_length = offsetof(struct smbd_data_transfer, padding);
983
984         request->num_sge = 1;
985         request->sge[0].addr = ib_dma_map_single(info->id->device,
986                                                  (void *)packet,
987                                                  header_length,
988                                                  DMA_BIDIRECTIONAL);
989         if (ib_dma_mapping_error(info->id->device, request->sge[0].addr)) {
990                 mempool_free(request, info->request_mempool);
991                 rc = -EIO;
992                 goto err;
993         }
994
995         request->sge[0].length = header_length;
996         request->sge[0].lkey = info->pd->local_dma_lkey;
997
998         *request_out = request;
999         return 0;
1000
1001 err:
1002         atomic_inc(&info->send_credits);
1003         return rc;
1004 }
1005
1006 static void smbd_destroy_header(struct smbd_connection *info,
1007                 struct smbd_request *request)
1008 {
1009
1010         ib_dma_unmap_single(info->id->device,
1011                             request->sge[0].addr,
1012                             request->sge[0].length,
1013                             DMA_TO_DEVICE);
1014         mempool_free(request, info->request_mempool);
1015         atomic_inc(&info->send_credits);
1016 }
1017
1018 /* Post the send request */
1019 static int smbd_post_send(struct smbd_connection *info,
1020                 struct smbd_request *request, bool has_payload)
1021 {
1022         struct ib_send_wr send_wr, *send_wr_fail;
1023         int rc, i;
1024
1025         for (i = 0; i < request->num_sge; i++) {
1026                 log_rdma_send(INFO,
1027                         "rdma_request sge[%d] addr=%llu legnth=%u\n",
1028                         i, request->sge[0].addr, request->sge[0].length);
1029                 ib_dma_sync_single_for_device(
1030                         info->id->device,
1031                         request->sge[i].addr,
1032                         request->sge[i].length,
1033                         DMA_TO_DEVICE);
1034         }
1035
1036         request->cqe.done = send_done;
1037
1038         send_wr.next = NULL;
1039         send_wr.wr_cqe = &request->cqe;
1040         send_wr.sg_list = request->sge;
1041         send_wr.num_sge = request->num_sge;
1042         send_wr.opcode = IB_WR_SEND;
1043         send_wr.send_flags = IB_SEND_SIGNALED;
1044
1045         if (has_payload) {
1046                 request->has_payload = true;
1047                 atomic_inc(&info->send_payload_pending);
1048         } else {
1049                 request->has_payload = false;
1050                 atomic_inc(&info->send_pending);
1051         }
1052
1053         rc = ib_post_send(info->id->qp, &send_wr, &send_wr_fail);
1054         if (rc) {
1055                 log_rdma_send(ERR, "ib_post_send failed rc=%d\n", rc);
1056                 if (has_payload) {
1057                         if (atomic_dec_and_test(&info->send_payload_pending))
1058                                 wake_up(&info->wait_send_payload_pending);
1059                 } else {
1060                         if (atomic_dec_and_test(&info->send_pending))
1061                                 wake_up(&info->wait_send_pending);
1062                 }
1063         } else
1064                 /* Reset timer for idle connection after packet is sent */
1065                 mod_delayed_work(info->workqueue, &info->idle_timer_work,
1066                         info->keep_alive_interval*HZ);
1067
1068         return rc;
1069 }
1070
1071 static int smbd_post_send_sgl(struct smbd_connection *info,
1072         struct scatterlist *sgl, int data_length, int remaining_data_length)
1073 {
1074         int num_sgs;
1075         int i, rc;
1076         struct smbd_request *request;
1077         struct scatterlist *sg;
1078
1079         rc = smbd_create_header(
1080                 info, data_length, remaining_data_length, &request);
1081         if (rc)
1082                 return rc;
1083
1084         num_sgs = sgl ? sg_nents(sgl) : 0;
1085         for_each_sg(sgl, sg, num_sgs, i) {
1086                 request->sge[i+1].addr =
1087                         ib_dma_map_page(info->id->device, sg_page(sg),
1088                                sg->offset, sg->length, DMA_BIDIRECTIONAL);
1089                 if (ib_dma_mapping_error(
1090                                 info->id->device, request->sge[i+1].addr)) {
1091                         rc = -EIO;
1092                         request->sge[i+1].addr = 0;
1093                         goto dma_mapping_failure;
1094                 }
1095                 request->sge[i+1].length = sg->length;
1096                 request->sge[i+1].lkey = info->pd->local_dma_lkey;
1097                 request->num_sge++;
1098         }
1099
1100         rc = smbd_post_send(info, request, data_length);
1101         if (!rc)
1102                 return 0;
1103
1104 dma_mapping_failure:
1105         for (i = 1; i < request->num_sge; i++)
1106                 if (request->sge[i].addr)
1107                         ib_dma_unmap_single(info->id->device,
1108                                             request->sge[i].addr,
1109                                             request->sge[i].length,
1110                                             DMA_TO_DEVICE);
1111         smbd_destroy_header(info, request);
1112         return rc;
1113 }
1114
1115 /*
1116  * Send a page
1117  * page: the page to send
1118  * offset: offset in the page to send
1119  * size: length in the page to send
1120  * remaining_data_length: remaining data to send in this payload
1121  */
1122 static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
1123                 unsigned long offset, size_t size, int remaining_data_length)
1124 {
1125         struct scatterlist sgl;
1126
1127         sg_init_table(&sgl, 1);
1128         sg_set_page(&sgl, page, size, offset);
1129
1130         return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
1131 }
1132
1133 /*
1134  * Send an empty message
1135  * Empty message is used to extend credits to peer to for keep live
1136  * while there is no upper layer payload to send at the time
1137  */
1138 static int smbd_post_send_empty(struct smbd_connection *info)
1139 {
1140         info->count_send_empty++;
1141         return smbd_post_send_sgl(info, NULL, 0, 0);
1142 }
1143
1144 /*
1145  * Send a data buffer
1146  * iov: the iov array describing the data buffers
1147  * n_vec: number of iov array
1148  * remaining_data_length: remaining data to send following this packet
1149  * in segmented SMBD packet
1150  */
1151 static int smbd_post_send_data(
1152         struct smbd_connection *info, struct kvec *iov, int n_vec,
1153         int remaining_data_length)
1154 {
1155         int i;
1156         u32 data_length = 0;
1157         struct scatterlist sgl[SMBDIRECT_MAX_SGE];
1158
1159         if (n_vec > SMBDIRECT_MAX_SGE) {
1160                 cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
1161                 return -ENOMEM;
1162         }
1163
1164         sg_init_table(sgl, n_vec);
1165         for (i = 0; i < n_vec; i++) {
1166                 data_length += iov[i].iov_len;
1167                 sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
1168         }
1169
1170         return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
1171 }
1172
1173 /*
1174  * Post a receive request to the transport
1175  * The remote peer can only send data when a receive request is posted
1176  * The interaction is controlled by send/receive credit system
1177  */
1178 static int smbd_post_recv(
1179                 struct smbd_connection *info, struct smbd_response *response)
1180 {
1181         struct ib_recv_wr recv_wr, *recv_wr_fail = NULL;
1182         int rc = -EIO;
1183
1184         response->sge.addr = ib_dma_map_single(
1185                                 info->id->device, response->packet,
1186                                 info->max_receive_size, DMA_FROM_DEVICE);
1187         if (ib_dma_mapping_error(info->id->device, response->sge.addr))
1188                 return rc;
1189
1190         response->sge.length = info->max_receive_size;
1191         response->sge.lkey = info->pd->local_dma_lkey;
1192
1193         response->cqe.done = recv_done;
1194
1195         recv_wr.wr_cqe = &response->cqe;
1196         recv_wr.next = NULL;
1197         recv_wr.sg_list = &response->sge;
1198         recv_wr.num_sge = 1;
1199
1200         rc = ib_post_recv(info->id->qp, &recv_wr, &recv_wr_fail);
1201         if (rc) {
1202                 ib_dma_unmap_single(info->id->device, response->sge.addr,
1203                                     response->sge.length, DMA_FROM_DEVICE);
1204
1205                 log_rdma_recv(ERR, "ib_post_recv failed rc=%d\n", rc);
1206         }
1207
1208         return rc;
1209 }
1210
1211 /* Perform SMBD negotiate according to [MS-SMBD] 3.1.5.2 */
1212 static int smbd_negotiate(struct smbd_connection *info)
1213 {
1214         int rc;
1215         struct smbd_response *response = get_receive_buffer(info);
1216
1217         response->type = SMBD_NEGOTIATE_RESP;
1218         rc = smbd_post_recv(info, response);
1219         log_rdma_event(INFO,
1220                 "smbd_post_recv rc=%d iov.addr=%llx iov.length=%x "
1221                 "iov.lkey=%x\n",
1222                 rc, response->sge.addr,
1223                 response->sge.length, response->sge.lkey);
1224         if (rc)
1225                 return rc;
1226
1227         init_completion(&info->negotiate_completion);
1228         info->negotiate_done = false;
1229         rc = smbd_post_send_negotiate_req(info);
1230         if (rc)
1231                 return rc;
1232
1233         rc = wait_for_completion_interruptible_timeout(
1234                 &info->negotiate_completion, SMBD_NEGOTIATE_TIMEOUT * HZ);
1235         log_rdma_event(INFO, "wait_for_completion_timeout rc=%d\n", rc);
1236
1237         if (info->negotiate_done)
1238                 return 0;
1239
1240         if (rc == 0)
1241                 rc = -ETIMEDOUT;
1242         else if (rc == -ERESTARTSYS)
1243                 rc = -EINTR;
1244         else
1245                 rc = -ENOTCONN;
1246
1247         return rc;
1248 }
1249
1250 static void put_empty_packet(
1251                 struct smbd_connection *info, struct smbd_response *response)
1252 {
1253         spin_lock(&info->empty_packet_queue_lock);
1254         list_add_tail(&response->list, &info->empty_packet_queue);
1255         info->count_empty_packet_queue++;
1256         spin_unlock(&info->empty_packet_queue_lock);
1257
1258         queue_work(info->workqueue, &info->post_send_credits_work);
1259 }
1260
1261 /*
1262  * Implement Connection.FragmentReassemblyBuffer defined in [MS-SMBD] 3.1.1.1
1263  * This is a queue for reassembling upper layer payload and present to upper
1264  * layer. All the inncoming payload go to the reassembly queue, regardless of
1265  * if reassembly is required. The uuper layer code reads from the queue for all
1266  * incoming payloads.
1267  * Put a received packet to the reassembly queue
1268  * response: the packet received
1269  * data_length: the size of payload in this packet
1270  */
1271 static void enqueue_reassembly(
1272         struct smbd_connection *info,
1273         struct smbd_response *response,
1274         int data_length)
1275 {
1276         spin_lock(&info->reassembly_queue_lock);
1277         list_add_tail(&response->list, &info->reassembly_queue);
1278         info->reassembly_queue_length++;
1279         /*
1280          * Make sure reassembly_data_length is updated after list and
1281          * reassembly_queue_length are updated. On the dequeue side
1282          * reassembly_data_length is checked without a lock to determine
1283          * if reassembly_queue_length and list is up to date
1284          */
1285         virt_wmb();
1286         info->reassembly_data_length += data_length;
1287         spin_unlock(&info->reassembly_queue_lock);
1288         info->count_reassembly_queue++;
1289         info->count_enqueue_reassembly_queue++;
1290 }
1291
1292 /*
1293  * Get the first entry at the front of reassembly queue
1294  * Caller is responsible for locking
1295  * return value: the first entry if any, NULL if queue is empty
1296  */
1297 static struct smbd_response *_get_first_reassembly(struct smbd_connection *info)
1298 {
1299         struct smbd_response *ret = NULL;
1300
1301         if (!list_empty(&info->reassembly_queue)) {
1302                 ret = list_first_entry(
1303                         &info->reassembly_queue,
1304                         struct smbd_response, list);
1305         }
1306         return ret;
1307 }
1308
1309 static struct smbd_response *get_empty_queue_buffer(
1310                 struct smbd_connection *info)
1311 {
1312         struct smbd_response *ret = NULL;
1313         unsigned long flags;
1314
1315         spin_lock_irqsave(&info->empty_packet_queue_lock, flags);
1316         if (!list_empty(&info->empty_packet_queue)) {
1317                 ret = list_first_entry(
1318                         &info->empty_packet_queue,
1319                         struct smbd_response, list);
1320                 list_del(&ret->list);
1321                 info->count_empty_packet_queue--;
1322         }
1323         spin_unlock_irqrestore(&info->empty_packet_queue_lock, flags);
1324
1325         return ret;
1326 }
1327
1328 /*
1329  * Get a receive buffer
1330  * For each remote send, we need to post a receive. The receive buffers are
1331  * pre-allocated in advance.
1332  * return value: the receive buffer, NULL if none is available
1333  */
1334 static struct smbd_response *get_receive_buffer(struct smbd_connection *info)
1335 {
1336         struct smbd_response *ret = NULL;
1337         unsigned long flags;
1338
1339         spin_lock_irqsave(&info->receive_queue_lock, flags);
1340         if (!list_empty(&info->receive_queue)) {
1341                 ret = list_first_entry(
1342                         &info->receive_queue,
1343                         struct smbd_response, list);
1344                 list_del(&ret->list);
1345                 info->count_receive_queue--;
1346                 info->count_get_receive_buffer++;
1347         }
1348         spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1349
1350         return ret;
1351 }
1352
1353 /*
1354  * Return a receive buffer
1355  * Upon returning of a receive buffer, we can post new receive and extend
1356  * more receive credits to remote peer. This is done immediately after a
1357  * receive buffer is returned.
1358  */
1359 static void put_receive_buffer(
1360         struct smbd_connection *info, struct smbd_response *response)
1361 {
1362         unsigned long flags;
1363
1364         ib_dma_unmap_single(info->id->device, response->sge.addr,
1365                 response->sge.length, DMA_FROM_DEVICE);
1366
1367         spin_lock_irqsave(&info->receive_queue_lock, flags);
1368         list_add_tail(&response->list, &info->receive_queue);
1369         info->count_receive_queue++;
1370         info->count_put_receive_buffer++;
1371         spin_unlock_irqrestore(&info->receive_queue_lock, flags);
1372
1373         queue_work(info->workqueue, &info->post_send_credits_work);
1374 }
1375
1376 /* Preallocate all receive buffer on transport establishment */
1377 static int allocate_receive_buffers(struct smbd_connection *info, int num_buf)
1378 {
1379         int i;
1380         struct smbd_response *response;
1381
1382         INIT_LIST_HEAD(&info->reassembly_queue);
1383         spin_lock_init(&info->reassembly_queue_lock);
1384         info->reassembly_data_length = 0;
1385         info->reassembly_queue_length = 0;
1386
1387         INIT_LIST_HEAD(&info->receive_queue);
1388         spin_lock_init(&info->receive_queue_lock);
1389         info->count_receive_queue = 0;
1390
1391         INIT_LIST_HEAD(&info->empty_packet_queue);
1392         spin_lock_init(&info->empty_packet_queue_lock);
1393         info->count_empty_packet_queue = 0;
1394
1395         init_waitqueue_head(&info->wait_receive_queues);
1396
1397         for (i = 0; i < num_buf; i++) {
1398                 response = mempool_alloc(info->response_mempool, GFP_KERNEL);
1399                 if (!response)
1400                         goto allocate_failed;
1401
1402                 response->info = info;
1403                 list_add_tail(&response->list, &info->receive_queue);
1404                 info->count_receive_queue++;
1405         }
1406
1407         return 0;
1408
1409 allocate_failed:
1410         while (!list_empty(&info->receive_queue)) {
1411                 response = list_first_entry(
1412                                 &info->receive_queue,
1413                                 struct smbd_response, list);
1414                 list_del(&response->list);
1415                 info->count_receive_queue--;
1416
1417                 mempool_free(response, info->response_mempool);
1418         }
1419         return -ENOMEM;
1420 }
1421
1422 static void destroy_receive_buffers(struct smbd_connection *info)
1423 {
1424         struct smbd_response *response;
1425
1426         while ((response = get_receive_buffer(info)))
1427                 mempool_free(response, info->response_mempool);
1428
1429         while ((response = get_empty_queue_buffer(info)))
1430                 mempool_free(response, info->response_mempool);
1431 }
1432
1433 /*
1434  * Check and send an immediate or keep alive packet
1435  * The condition to send those packets are defined in [MS-SMBD] 3.1.1.1
1436  * Connection.KeepaliveRequested and Connection.SendImmediate
1437  * The idea is to extend credits to server as soon as it becomes available
1438  */
1439 static void send_immediate_work(struct work_struct *work)
1440 {
1441         struct smbd_connection *info = container_of(
1442                                         work, struct smbd_connection,
1443                                         send_immediate_work.work);
1444
1445         if (info->keep_alive_requested == KEEP_ALIVE_PENDING ||
1446             info->send_immediate) {
1447                 log_keep_alive(INFO, "send an empty message\n");
1448                 smbd_post_send_empty(info);
1449         }
1450 }
1451
1452 /* Implement idle connection timer [MS-SMBD] 3.1.6.2 */
1453 static void idle_connection_timer(struct work_struct *work)
1454 {
1455         struct smbd_connection *info = container_of(
1456                                         work, struct smbd_connection,
1457                                         idle_timer_work.work);
1458
1459         if (info->keep_alive_requested != KEEP_ALIVE_NONE) {
1460                 log_keep_alive(ERR,
1461                         "error status info->keep_alive_requested=%d\n",
1462                         info->keep_alive_requested);
1463                 smbd_disconnect_rdma_connection(info);
1464                 return;
1465         }
1466
1467         log_keep_alive(INFO, "about to send an empty idle message\n");
1468         smbd_post_send_empty(info);
1469
1470         /* Setup the next idle timeout work */
1471         queue_delayed_work(info->workqueue, &info->idle_timer_work,
1472                         info->keep_alive_interval*HZ);
1473 }
1474
1475 /* Destroy this SMBD connection, called from upper layer */
1476 void smbd_destroy(struct smbd_connection *info)
1477 {
1478         log_rdma_event(INFO, "destroying rdma session\n");
1479
1480         /* Kick off the disconnection process */
1481         smbd_disconnect_rdma_connection(info);
1482
1483         log_rdma_event(INFO, "wait for transport being destroyed\n");
1484         wait_event(info->wait_destroy,
1485                 info->transport_status == SMBD_DESTROYED);
1486
1487         destroy_workqueue(info->workqueue);
1488         kfree(info);
1489 }
1490
1491 /*
1492  * Reconnect this SMBD connection, called from upper layer
1493  * return value: 0 on success, or actual error code
1494  */
1495 int smbd_reconnect(struct TCP_Server_Info *server)
1496 {
1497         log_rdma_event(INFO, "reconnecting rdma session\n");
1498
1499         if (!server->smbd_conn) {
1500                 log_rdma_event(ERR, "rdma session already destroyed\n");
1501                 return -EINVAL;
1502         }
1503
1504         /*
1505          * This is possible if transport is disconnected and we haven't received
1506          * notification from RDMA, but upper layer has detected timeout
1507          */
1508         if (server->smbd_conn->transport_status == SMBD_CONNECTED) {
1509                 log_rdma_event(INFO, "disconnecting transport\n");
1510                 smbd_disconnect_rdma_connection(server->smbd_conn);
1511         }
1512
1513         /* wait until the transport is destroyed */
1514         wait_event(server->smbd_conn->wait_destroy,
1515                 server->smbd_conn->transport_status == SMBD_DESTROYED);
1516
1517         destroy_workqueue(server->smbd_conn->workqueue);
1518         kfree(server->smbd_conn);
1519
1520         log_rdma_event(INFO, "creating rdma session\n");
1521         server->smbd_conn = smbd_get_connection(
1522                 server, (struct sockaddr *) &server->dstaddr);
1523
1524         return server->smbd_conn ? 0 : -ENOENT;
1525 }
1526
1527 static void destroy_caches_and_workqueue(struct smbd_connection *info)
1528 {
1529         destroy_receive_buffers(info);
1530         destroy_workqueue(info->workqueue);
1531         mempool_destroy(info->response_mempool);
1532         kmem_cache_destroy(info->response_cache);
1533         mempool_destroy(info->request_mempool);
1534         kmem_cache_destroy(info->request_cache);
1535 }
1536
1537 #define MAX_NAME_LEN    80
1538 static int allocate_caches_and_workqueue(struct smbd_connection *info)
1539 {
1540         char name[MAX_NAME_LEN];
1541         int rc;
1542
1543         snprintf(name, MAX_NAME_LEN, "smbd_request_%p", info);
1544         info->request_cache =
1545                 kmem_cache_create(
1546                         name,
1547                         sizeof(struct smbd_request) +
1548                                 sizeof(struct smbd_data_transfer),
1549                         0, SLAB_HWCACHE_ALIGN, NULL);
1550         if (!info->request_cache)
1551                 return -ENOMEM;
1552
1553         info->request_mempool =
1554                 mempool_create(info->send_credit_target, mempool_alloc_slab,
1555                         mempool_free_slab, info->request_cache);
1556         if (!info->request_mempool)
1557                 goto out1;
1558
1559         snprintf(name, MAX_NAME_LEN, "smbd_response_%p", info);
1560         info->response_cache =
1561                 kmem_cache_create(
1562                         name,
1563                         sizeof(struct smbd_response) +
1564                                 info->max_receive_size,
1565                         0, SLAB_HWCACHE_ALIGN, NULL);
1566         if (!info->response_cache)
1567                 goto out2;
1568
1569         info->response_mempool =
1570                 mempool_create(info->receive_credit_max, mempool_alloc_slab,
1571                        mempool_free_slab, info->response_cache);
1572         if (!info->response_mempool)
1573                 goto out3;
1574
1575         snprintf(name, MAX_NAME_LEN, "smbd_%p", info);
1576         info->workqueue = create_workqueue(name);
1577         if (!info->workqueue)
1578                 goto out4;
1579
1580         rc = allocate_receive_buffers(info, info->receive_credit_max);
1581         if (rc) {
1582                 log_rdma_event(ERR, "failed to allocate receive buffers\n");
1583                 goto out5;
1584         }
1585
1586         return 0;
1587
1588 out5:
1589         destroy_workqueue(info->workqueue);
1590 out4:
1591         mempool_destroy(info->response_mempool);
1592 out3:
1593         kmem_cache_destroy(info->response_cache);
1594 out2:
1595         mempool_destroy(info->request_mempool);
1596 out1:
1597         kmem_cache_destroy(info->request_cache);
1598         return -ENOMEM;
1599 }
1600
1601 /* Create a SMBD connection, called by upper layer */
1602 static struct smbd_connection *_smbd_get_connection(
1603         struct TCP_Server_Info *server, struct sockaddr *dstaddr, int port)
1604 {
1605         int rc;
1606         struct smbd_connection *info;
1607         struct rdma_conn_param conn_param;
1608         struct ib_qp_init_attr qp_attr;
1609         struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
1610         struct ib_port_immutable port_immutable;
1611         u32 ird_ord_hdr[2];
1612
1613         info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
1614         if (!info)
1615                 return NULL;
1616
1617         info->transport_status = SMBD_CONNECTING;
1618         rc = smbd_ia_open(info, dstaddr, port);
1619         if (rc) {
1620                 log_rdma_event(INFO, "smbd_ia_open rc=%d\n", rc);
1621                 goto create_id_failed;
1622         }
1623
1624         if (smbd_send_credit_target > info->id->device->attrs.max_cqe ||
1625             smbd_send_credit_target > info->id->device->attrs.max_qp_wr) {
1626                 log_rdma_event(ERR,
1627                         "consider lowering send_credit_target = %d. "
1628                         "Possible CQE overrun, device "
1629                         "reporting max_cpe %d max_qp_wr %d\n",
1630                         smbd_send_credit_target,
1631                         info->id->device->attrs.max_cqe,
1632                         info->id->device->attrs.max_qp_wr);
1633                 goto config_failed;
1634         }
1635
1636         if (smbd_receive_credit_max > info->id->device->attrs.max_cqe ||
1637             smbd_receive_credit_max > info->id->device->attrs.max_qp_wr) {
1638                 log_rdma_event(ERR,
1639                         "consider lowering receive_credit_max = %d. "
1640                         "Possible CQE overrun, device "
1641                         "reporting max_cpe %d max_qp_wr %d\n",
1642                         smbd_receive_credit_max,
1643                         info->id->device->attrs.max_cqe,
1644                         info->id->device->attrs.max_qp_wr);
1645                 goto config_failed;
1646         }
1647
1648         info->receive_credit_max = smbd_receive_credit_max;
1649         info->send_credit_target = smbd_send_credit_target;
1650         info->max_send_size = smbd_max_send_size;
1651         info->max_fragmented_recv_size = smbd_max_fragmented_recv_size;
1652         info->max_receive_size = smbd_max_receive_size;
1653         info->keep_alive_interval = smbd_keep_alive_interval;
1654
1655         if (info->id->device->attrs.max_sge < SMBDIRECT_MAX_SGE) {
1656                 log_rdma_event(ERR, "warning: device max_sge = %d too small\n",
1657                         info->id->device->attrs.max_sge);
1658                 log_rdma_event(ERR, "Queue Pair creation may fail\n");
1659         }
1660
1661         info->send_cq = NULL;
1662         info->recv_cq = NULL;
1663         info->send_cq = ib_alloc_cq(info->id->device, info,
1664                         info->send_credit_target, 0, IB_POLL_SOFTIRQ);
1665         if (IS_ERR(info->send_cq)) {
1666                 info->send_cq = NULL;
1667                 goto alloc_cq_failed;
1668         }
1669
1670         info->recv_cq = ib_alloc_cq(info->id->device, info,
1671                         info->receive_credit_max, 0, IB_POLL_SOFTIRQ);
1672         if (IS_ERR(info->recv_cq)) {
1673                 info->recv_cq = NULL;
1674                 goto alloc_cq_failed;
1675         }
1676
1677         memset(&qp_attr, 0, sizeof(qp_attr));
1678         qp_attr.event_handler = smbd_qp_async_error_upcall;
1679         qp_attr.qp_context = info;
1680         qp_attr.cap.max_send_wr = info->send_credit_target;
1681         qp_attr.cap.max_recv_wr = info->receive_credit_max;
1682         qp_attr.cap.max_send_sge = SMBDIRECT_MAX_SGE;
1683         qp_attr.cap.max_recv_sge = SMBDIRECT_MAX_SGE;
1684         qp_attr.cap.max_inline_data = 0;
1685         qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
1686         qp_attr.qp_type = IB_QPT_RC;
1687         qp_attr.send_cq = info->send_cq;
1688         qp_attr.recv_cq = info->recv_cq;
1689         qp_attr.port_num = ~0;
1690
1691         rc = rdma_create_qp(info->id, info->pd, &qp_attr);
1692         if (rc) {
1693                 log_rdma_event(ERR, "rdma_create_qp failed %i\n", rc);
1694                 goto create_qp_failed;
1695         }
1696
1697         memset(&conn_param, 0, sizeof(conn_param));
1698         conn_param.initiator_depth = 0;
1699
1700         conn_param.responder_resources =
1701                 info->id->device->attrs.max_qp_rd_atom
1702                         < SMBD_CM_RESPONDER_RESOURCES ?
1703                 info->id->device->attrs.max_qp_rd_atom :
1704                 SMBD_CM_RESPONDER_RESOURCES;
1705         info->responder_resources = conn_param.responder_resources;
1706         log_rdma_mr(INFO, "responder_resources=%d\n",
1707                 info->responder_resources);
1708
1709         /* Need to send IRD/ORD in private data for iWARP */
1710         info->id->device->get_port_immutable(
1711                 info->id->device, info->id->port_num, &port_immutable);
1712         if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
1713                 ird_ord_hdr[0] = info->responder_resources;
1714                 ird_ord_hdr[1] = 1;
1715                 conn_param.private_data = ird_ord_hdr;
1716                 conn_param.private_data_len = sizeof(ird_ord_hdr);
1717         } else {
1718                 conn_param.private_data = NULL;
1719                 conn_param.private_data_len = 0;
1720         }
1721
1722         conn_param.retry_count = SMBD_CM_RETRY;
1723         conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
1724         conn_param.flow_control = 0;
1725         init_waitqueue_head(&info->wait_destroy);
1726
1727         log_rdma_event(INFO, "connecting to IP %pI4 port %d\n",
1728                 &addr_in->sin_addr, port);
1729
1730         init_waitqueue_head(&info->conn_wait);
1731         rc = rdma_connect(info->id, &conn_param);
1732         if (rc) {
1733                 log_rdma_event(ERR, "rdma_connect() failed with %i\n", rc);
1734                 goto rdma_connect_failed;
1735         }
1736
1737         wait_event_interruptible(
1738                 info->conn_wait, info->transport_status != SMBD_CONNECTING);
1739
1740         if (info->transport_status != SMBD_CONNECTED) {
1741                 log_rdma_event(ERR, "rdma_connect failed port=%d\n", port);
1742                 goto rdma_connect_failed;
1743         }
1744
1745         log_rdma_event(INFO, "rdma_connect connected\n");
1746
1747         rc = allocate_caches_and_workqueue(info);
1748         if (rc) {
1749                 log_rdma_event(ERR, "cache allocation failed\n");
1750                 goto allocate_cache_failed;
1751         }
1752
1753         init_waitqueue_head(&info->wait_send_queue);
1754         init_waitqueue_head(&info->wait_reassembly_queue);
1755
1756         INIT_DELAYED_WORK(&info->idle_timer_work, idle_connection_timer);
1757         INIT_DELAYED_WORK(&info->send_immediate_work, send_immediate_work);
1758         queue_delayed_work(info->workqueue, &info->idle_timer_work,
1759                 info->keep_alive_interval*HZ);
1760
1761         init_waitqueue_head(&info->wait_smbd_send_pending);
1762         info->smbd_send_pending = 0;
1763
1764         init_waitqueue_head(&info->wait_smbd_recv_pending);
1765         info->smbd_recv_pending = 0;
1766
1767         init_waitqueue_head(&info->wait_send_pending);
1768         atomic_set(&info->send_pending, 0);
1769
1770         init_waitqueue_head(&info->wait_send_payload_pending);
1771         atomic_set(&info->send_payload_pending, 0);
1772
1773         INIT_WORK(&info->disconnect_work, smbd_disconnect_rdma_work);
1774         INIT_WORK(&info->destroy_work, smbd_destroy_rdma_work);
1775         INIT_WORK(&info->recv_done_work, smbd_recv_done_work);
1776         INIT_WORK(&info->post_send_credits_work, smbd_post_send_credits);
1777         info->new_credits_offered = 0;
1778         spin_lock_init(&info->lock_new_credits_offered);
1779
1780         rc = smbd_negotiate(info);
1781         if (rc) {
1782                 log_rdma_event(ERR, "smbd_negotiate rc=%d\n", rc);
1783                 goto negotiation_failed;
1784         }
1785
1786         rc = allocate_mr_list(info);
1787         if (rc) {
1788                 log_rdma_mr(ERR, "memory registration allocation failed\n");
1789                 goto allocate_mr_failed;
1790         }
1791
1792         return info;
1793
1794 allocate_mr_failed:
1795         /* At this point, need to a full transport shutdown */
1796         smbd_destroy(info);
1797         return NULL;
1798
1799 negotiation_failed:
1800         cancel_delayed_work_sync(&info->idle_timer_work);
1801         destroy_caches_and_workqueue(info);
1802         info->transport_status = SMBD_NEGOTIATE_FAILED;
1803         init_waitqueue_head(&info->conn_wait);
1804         rdma_disconnect(info->id);
1805         wait_event(info->conn_wait,
1806                 info->transport_status == SMBD_DISCONNECTED);
1807
1808 allocate_cache_failed:
1809 rdma_connect_failed:
1810         rdma_destroy_qp(info->id);
1811
1812 create_qp_failed:
1813 alloc_cq_failed:
1814         if (info->send_cq)
1815                 ib_free_cq(info->send_cq);
1816         if (info->recv_cq)
1817                 ib_free_cq(info->recv_cq);
1818
1819 config_failed:
1820         ib_dealloc_pd(info->pd);
1821         rdma_destroy_id(info->id);
1822
1823 create_id_failed:
1824         kfree(info);
1825         return NULL;
1826 }
1827
1828 struct smbd_connection *smbd_get_connection(
1829         struct TCP_Server_Info *server, struct sockaddr *dstaddr)
1830 {
1831         struct smbd_connection *ret;
1832         int port = SMBD_PORT;
1833
1834 try_again:
1835         ret = _smbd_get_connection(server, dstaddr, port);
1836
1837         /* Try SMB_PORT if SMBD_PORT doesn't work */
1838         if (!ret && port == SMBD_PORT) {
1839                 port = SMB_PORT;
1840                 goto try_again;
1841         }
1842         return ret;
1843 }
1844
1845 /*
1846  * Receive data from receive reassembly queue
1847  * All the incoming data packets are placed in reassembly queue
1848  * buf: the buffer to read data into
1849  * size: the length of data to read
1850  * return value: actual data read
1851  * Note: this implementation copies the data from reassebmly queue to receive
1852  * buffers used by upper layer. This is not the optimal code path. A better way
1853  * to do it is to not have upper layer allocate its receive buffers but rather
1854  * borrow the buffer from reassembly queue, and return it after data is
1855  * consumed. But this will require more changes to upper layer code, and also
1856  * need to consider packet boundaries while they still being reassembled.
1857  */
1858 static int smbd_recv_buf(struct smbd_connection *info, char *buf,
1859                 unsigned int size)
1860 {
1861         struct smbd_response *response;
1862         struct smbd_data_transfer *data_transfer;
1863         int to_copy, to_read, data_read, offset;
1864         u32 data_length, remaining_data_length, data_offset;
1865         int rc;
1866
1867 again:
1868         if (info->transport_status != SMBD_CONNECTED) {
1869                 log_read(ERR, "disconnected\n");
1870                 return -ENODEV;
1871         }
1872
1873         /*
1874          * No need to hold the reassembly queue lock all the time as we are
1875          * the only one reading from the front of the queue. The transport
1876          * may add more entries to the back of the queue at the same time
1877          */
1878         log_read(INFO, "size=%d info->reassembly_data_length=%d\n", size,
1879                 info->reassembly_data_length);
1880         if (info->reassembly_data_length >= size) {
1881                 int queue_length;
1882                 int queue_removed = 0;
1883
1884                 /*
1885                  * Need to make sure reassembly_data_length is read before
1886                  * reading reassembly_queue_length and calling
1887                  * _get_first_reassembly. This call is lock free
1888                  * as we never read at the end of the queue which are being
1889                  * updated in SOFTIRQ as more data is received
1890                  */
1891                 virt_rmb();
1892                 queue_length = info->reassembly_queue_length;
1893                 data_read = 0;
1894                 to_read = size;
1895                 offset = info->first_entry_offset;
1896                 while (data_read < size) {
1897                         response = _get_first_reassembly(info);
1898                         data_transfer = smbd_response_payload(response);
1899                         data_length = le32_to_cpu(data_transfer->data_length);
1900                         remaining_data_length =
1901                                 le32_to_cpu(
1902                                         data_transfer->remaining_data_length);
1903                         data_offset = le32_to_cpu(data_transfer->data_offset);
1904
1905                         /*
1906                          * The upper layer expects RFC1002 length at the
1907                          * beginning of the payload. Return it to indicate
1908                          * the total length of the packet. This minimize the
1909                          * change to upper layer packet processing logic. This
1910                          * will be eventually remove when an intermediate
1911                          * transport layer is added
1912                          */
1913                         if (response->first_segment && size == 4) {
1914                                 unsigned int rfc1002_len =
1915                                         data_length + remaining_data_length;
1916                                 *((__be32 *)buf) = cpu_to_be32(rfc1002_len);
1917                                 data_read = 4;
1918                                 response->first_segment = false;
1919                                 log_read(INFO, "returning rfc1002 length %d\n",
1920                                         rfc1002_len);
1921                                 goto read_rfc1002_done;
1922                         }
1923
1924                         to_copy = min_t(int, data_length - offset, to_read);
1925                         memcpy(
1926                                 buf + data_read,
1927                                 (char *)data_transfer + data_offset + offset,
1928                                 to_copy);
1929
1930                         /* move on to the next buffer? */
1931                         if (to_copy == data_length - offset) {
1932                                 queue_length--;
1933                                 /*
1934                                  * No need to lock if we are not at the
1935                                  * end of the queue
1936                                  */
1937                                 if (!queue_length)
1938                                         spin_lock_irq(
1939                                                 &info->reassembly_queue_lock);
1940                                 list_del(&response->list);
1941                                 queue_removed++;
1942                                 if (!queue_length)
1943                                         spin_unlock_irq(
1944                                                 &info->reassembly_queue_lock);
1945
1946                                 info->count_reassembly_queue--;
1947                                 info->count_dequeue_reassembly_queue++;
1948                                 put_receive_buffer(info, response);
1949                                 offset = 0;
1950                                 log_read(INFO, "put_receive_buffer offset=0\n");
1951                         } else
1952                                 offset += to_copy;
1953
1954                         to_read -= to_copy;
1955                         data_read += to_copy;
1956
1957                         log_read(INFO, "_get_first_reassembly memcpy %d bytes "
1958                                 "data_transfer_length-offset=%d after that "
1959                                 "to_read=%d data_read=%d offset=%d\n",
1960                                 to_copy, data_length - offset,
1961                                 to_read, data_read, offset);
1962                 }
1963
1964                 spin_lock_irq(&info->reassembly_queue_lock);
1965                 info->reassembly_data_length -= data_read;
1966                 info->reassembly_queue_length -= queue_removed;
1967                 spin_unlock_irq(&info->reassembly_queue_lock);
1968
1969                 info->first_entry_offset = offset;
1970                 log_read(INFO, "returning to thread data_read=%d "
1971                         "reassembly_data_length=%d first_entry_offset=%d\n",
1972                         data_read, info->reassembly_data_length,
1973                         info->first_entry_offset);
1974 read_rfc1002_done:
1975                 return data_read;
1976         }
1977
1978         log_read(INFO, "wait_event on more data\n");
1979         rc = wait_event_interruptible(
1980                 info->wait_reassembly_queue,
1981                 info->reassembly_data_length >= size ||
1982                         info->transport_status != SMBD_CONNECTED);
1983         /* Don't return any data if interrupted */
1984         if (rc)
1985                 return -ENODEV;
1986
1987         goto again;
1988 }
1989
1990 /*
1991  * Receive a page from receive reassembly queue
1992  * page: the page to read data into
1993  * to_read: the length of data to read
1994  * return value: actual data read
1995  */
1996 static int smbd_recv_page(struct smbd_connection *info,
1997                 struct page *page, unsigned int to_read)
1998 {
1999         int ret;
2000         char *to_address;
2001
2002         /* make sure we have the page ready for read */
2003         ret = wait_event_interruptible(
2004                 info->wait_reassembly_queue,
2005                 info->reassembly_data_length >= to_read ||
2006                         info->transport_status != SMBD_CONNECTED);
2007         if (ret)
2008                 return 0;
2009
2010         /* now we can read from reassembly queue and not sleep */
2011         to_address = kmap_atomic(page);
2012
2013         log_read(INFO, "reading from page=%p address=%p to_read=%d\n",
2014                 page, to_address, to_read);
2015
2016         ret = smbd_recv_buf(info, to_address, to_read);
2017         kunmap_atomic(to_address);
2018
2019         return ret;
2020 }
2021
2022 /*
2023  * Receive data from transport
2024  * msg: a msghdr point to the buffer, can be ITER_KVEC or ITER_BVEC
2025  * return: total bytes read, or 0. SMB Direct will not do partial read.
2026  */
2027 int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
2028 {
2029         char *buf;
2030         struct page *page;
2031         unsigned int to_read;
2032         int rc;
2033
2034         info->smbd_recv_pending++;
2035
2036         switch (msg->msg_iter.type) {
2037         case READ | ITER_KVEC:
2038                 buf = msg->msg_iter.kvec->iov_base;
2039                 to_read = msg->msg_iter.kvec->iov_len;
2040                 rc = smbd_recv_buf(info, buf, to_read);
2041                 break;
2042
2043         case READ | ITER_BVEC:
2044                 page = msg->msg_iter.bvec->bv_page;
2045                 to_read = msg->msg_iter.bvec->bv_len;
2046                 rc = smbd_recv_page(info, page, to_read);
2047                 break;
2048
2049         default:
2050                 /* It's a bug in upper layer to get there */
2051                 cifs_dbg(VFS, "CIFS: invalid msg type %d\n",
2052                         msg->msg_iter.type);
2053                 rc = -EIO;
2054         }
2055
2056         info->smbd_recv_pending--;
2057         wake_up(&info->wait_smbd_recv_pending);
2058
2059         /* SMBDirect will read it all or nothing */
2060         if (rc > 0)
2061                 msg->msg_iter.count = 0;
2062         return rc;
2063 }
2064
2065 /*
2066  * Send data to transport
2067  * Each rqst is transported as a SMBDirect payload
2068  * rqst: the data to write
2069  * return value: 0 if successfully write, otherwise error code
2070  */
2071 int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst)
2072 {
2073         struct kvec vec;
2074         int nvecs;
2075         int size;
2076         int buflen = 0, remaining_data_length;
2077         int start, i, j;
2078         int max_iov_size =
2079                 info->max_send_size - sizeof(struct smbd_data_transfer);
2080         struct kvec iov[SMBDIRECT_MAX_SGE];
2081         int rc;
2082
2083         info->smbd_send_pending++;
2084         if (info->transport_status != SMBD_CONNECTED) {
2085                 rc = -ENODEV;
2086                 goto done;
2087         }
2088
2089         /*
2090          * This usually means a configuration error
2091          * We use RDMA read/write for packet size > rdma_readwrite_threshold
2092          * as long as it's properly configured we should never get into this
2093          * situation
2094          */
2095         if (rqst->rq_nvec + rqst->rq_npages > SMBDIRECT_MAX_SGE) {
2096                 log_write(ERR, "maximum send segment %x exceeding %x\n",
2097                          rqst->rq_nvec + rqst->rq_npages, SMBDIRECT_MAX_SGE);
2098                 rc = -EINVAL;
2099                 goto done;
2100         }
2101
2102         /*
2103          * Remove the RFC1002 length defined in MS-SMB2 section 2.1
2104          * It is used only for TCP transport
2105          * In future we may want to add a transport layer under protocol
2106          * layer so this will only be issued to TCP transport
2107          */
2108         iov[0].iov_base = (char *)rqst->rq_iov[0].iov_base + 4;
2109         iov[0].iov_len = rqst->rq_iov[0].iov_len - 4;
2110         buflen += iov[0].iov_len;
2111
2112         /* total up iov array first */
2113         for (i = 1; i < rqst->rq_nvec; i++) {
2114                 iov[i].iov_base = rqst->rq_iov[i].iov_base;
2115                 iov[i].iov_len = rqst->rq_iov[i].iov_len;
2116                 buflen += iov[i].iov_len;
2117         }
2118
2119         /* add in the page array if there is one */
2120         if (rqst->rq_npages) {
2121                 buflen += rqst->rq_pagesz * (rqst->rq_npages - 1);
2122                 buflen += rqst->rq_tailsz;
2123         }
2124
2125         if (buflen + sizeof(struct smbd_data_transfer) >
2126                 info->max_fragmented_send_size) {
2127                 log_write(ERR, "payload size %d > max size %d\n",
2128                         buflen, info->max_fragmented_send_size);
2129                 rc = -EINVAL;
2130                 goto done;
2131         }
2132
2133         remaining_data_length = buflen;
2134
2135         log_write(INFO, "rqst->rq_nvec=%d rqst->rq_npages=%d rq_pagesz=%d "
2136                 "rq_tailsz=%d buflen=%d\n",
2137                 rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
2138                 rqst->rq_tailsz, buflen);
2139
2140         start = i = iov[0].iov_len ? 0 : 1;
2141         buflen = 0;
2142         while (true) {
2143                 buflen += iov[i].iov_len;
2144                 if (buflen > max_iov_size) {
2145                         if (i > start) {
2146                                 remaining_data_length -=
2147                                         (buflen-iov[i].iov_len);
2148                                 log_write(INFO, "sending iov[] from start=%d "
2149                                         "i=%d nvecs=%d "
2150                                         "remaining_data_length=%d\n",
2151                                         start, i, i-start,
2152                                         remaining_data_length);
2153                                 rc = smbd_post_send_data(
2154                                         info, &iov[start], i-start,
2155                                         remaining_data_length);
2156                                 if (rc)
2157                                         goto done;
2158                         } else {
2159                                 /* iov[start] is too big, break it */
2160                                 nvecs = (buflen+max_iov_size-1)/max_iov_size;
2161                                 log_write(INFO, "iov[%d] iov_base=%p buflen=%d"
2162                                         " break to %d vectors\n",
2163                                         start, iov[start].iov_base,
2164                                         buflen, nvecs);
2165                                 for (j = 0; j < nvecs; j++) {
2166                                         vec.iov_base =
2167                                                 (char *)iov[start].iov_base +
2168                                                 j*max_iov_size;
2169                                         vec.iov_len = max_iov_size;
2170                                         if (j == nvecs-1)
2171                                                 vec.iov_len =
2172                                                         buflen -
2173                                                         max_iov_size*(nvecs-1);
2174                                         remaining_data_length -= vec.iov_len;
2175                                         log_write(INFO,
2176                                                 "sending vec j=%d iov_base=%p"
2177                                                 " iov_len=%zu "
2178                                                 "remaining_data_length=%d\n",
2179                                                 j, vec.iov_base, vec.iov_len,
2180                                                 remaining_data_length);
2181                                         rc = smbd_post_send_data(
2182                                                 info, &vec, 1,
2183                                                 remaining_data_length);
2184                                         if (rc)
2185                                                 goto done;
2186                                 }
2187                                 i++;
2188                         }
2189                         start = i;
2190                         buflen = 0;
2191                 } else {
2192                         i++;
2193                         if (i == rqst->rq_nvec) {
2194                                 /* send out all remaining vecs */
2195                                 remaining_data_length -= buflen;
2196                                 log_write(INFO,
2197                                         "sending iov[] from start=%d i=%d "
2198                                         "nvecs=%d remaining_data_length=%d\n",
2199                                         start, i, i-start,
2200                                         remaining_data_length);
2201                                 rc = smbd_post_send_data(info, &iov[start],
2202                                         i-start, remaining_data_length);
2203                                 if (rc)
2204                                         goto done;
2205                                 break;
2206                         }
2207                 }
2208                 log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
2209         }
2210
2211         /* now sending pages if there are any */
2212         for (i = 0; i < rqst->rq_npages; i++) {
2213                 buflen = (i == rqst->rq_npages-1) ?
2214                         rqst->rq_tailsz : rqst->rq_pagesz;
2215                 nvecs = (buflen + max_iov_size - 1) / max_iov_size;
2216                 log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
2217                         buflen, nvecs);
2218                 for (j = 0; j < nvecs; j++) {
2219                         size = max_iov_size;
2220                         if (j == nvecs-1)
2221                                 size = buflen - j*max_iov_size;
2222                         remaining_data_length -= size;
2223                         log_write(INFO, "sending pages i=%d offset=%d size=%d"
2224                                 " remaining_data_length=%d\n",
2225                                 i, j*max_iov_size, size, remaining_data_length);
2226                         rc = smbd_post_send_page(
2227                                 info, rqst->rq_pages[i], j*max_iov_size,
2228                                 size, remaining_data_length);
2229                         if (rc)
2230                                 goto done;
2231                 }
2232         }
2233
2234 done:
2235         /*
2236          * As an optimization, we don't wait for individual I/O to finish
2237          * before sending the next one.
2238          * Send them all and wait for pending send count to get to 0
2239          * that means all the I/Os have been out and we are good to return
2240          */
2241
2242         wait_event(info->wait_send_payload_pending,
2243                 atomic_read(&info->send_payload_pending) == 0);
2244
2245         info->smbd_send_pending--;
2246         wake_up(&info->wait_smbd_send_pending);
2247
2248         return rc;
2249 }
2250
2251 static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
2252 {
2253         struct smbd_mr *mr;
2254         struct ib_cqe *cqe;
2255
2256         if (wc->status) {
2257                 log_rdma_mr(ERR, "status=%d\n", wc->status);
2258                 cqe = wc->wr_cqe;
2259                 mr = container_of(cqe, struct smbd_mr, cqe);
2260                 smbd_disconnect_rdma_connection(mr->conn);
2261         }
2262 }
2263
2264 /*
2265  * The work queue function that recovers MRs
2266  * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
2267  * again. Both calls are slow, so finish them in a workqueue. This will not
2268  * block I/O path.
2269  * There is one workqueue that recovers MRs, there is no need to lock as the
2270  * I/O requests calling smbd_register_mr will never update the links in the
2271  * mr_list.
2272  */
2273 static void smbd_mr_recovery_work(struct work_struct *work)
2274 {
2275         struct smbd_connection *info =
2276                 container_of(work, struct smbd_connection, mr_recovery_work);
2277         struct smbd_mr *smbdirect_mr;
2278         int rc;
2279
2280         list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
2281                 if (smbdirect_mr->state == MR_INVALIDATED ||
2282                         smbdirect_mr->state == MR_ERROR) {
2283
2284                         if (smbdirect_mr->state == MR_INVALIDATED) {
2285                                 ib_dma_unmap_sg(
2286                                         info->id->device, smbdirect_mr->sgl,
2287                                         smbdirect_mr->sgl_count,
2288                                         smbdirect_mr->dir);
2289                                 smbdirect_mr->state = MR_READY;
2290                         } else if (smbdirect_mr->state == MR_ERROR) {
2291
2292                                 /* recover this MR entry */
2293                                 rc = ib_dereg_mr(smbdirect_mr->mr);
2294                                 if (rc) {
2295                                         log_rdma_mr(ERR,
2296                                                 "ib_dereg_mr faield rc=%x\n",
2297                                                 rc);
2298                                         smbd_disconnect_rdma_connection(info);
2299                                 }
2300
2301                                 smbdirect_mr->mr = ib_alloc_mr(
2302                                         info->pd, info->mr_type,
2303                                         info->max_frmr_depth);
2304                                 if (IS_ERR(smbdirect_mr->mr)) {
2305                                         log_rdma_mr(ERR,
2306                                                 "ib_alloc_mr failed mr_type=%x "
2307                                                 "max_frmr_depth=%x\n",
2308                                                 info->mr_type,
2309                                                 info->max_frmr_depth);
2310                                         smbd_disconnect_rdma_connection(info);
2311                                 }
2312
2313                                 smbdirect_mr->state = MR_READY;
2314                         }
2315                         /* smbdirect_mr->state is updated by this function
2316                          * and is read and updated by I/O issuing CPUs trying
2317                          * to get a MR, the call to atomic_inc_return
2318                          * implicates a memory barrier and guarantees this
2319                          * value is updated before waking up any calls to
2320                          * get_mr() from the I/O issuing CPUs
2321                          */
2322                         if (atomic_inc_return(&info->mr_ready_count) == 1)
2323                                 wake_up_interruptible(&info->wait_mr);
2324                 }
2325         }
2326 }
2327
2328 static void destroy_mr_list(struct smbd_connection *info)
2329 {
2330         struct smbd_mr *mr, *tmp;
2331
2332         cancel_work_sync(&info->mr_recovery_work);
2333         list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
2334                 if (mr->state == MR_INVALIDATED)
2335                         ib_dma_unmap_sg(info->id->device, mr->sgl,
2336                                 mr->sgl_count, mr->dir);
2337                 ib_dereg_mr(mr->mr);
2338                 kfree(mr->sgl);
2339                 kfree(mr);
2340         }
2341 }
2342
2343 /*
2344  * Allocate MRs used for RDMA read/write
2345  * The number of MRs will not exceed hardware capability in responder_resources
2346  * All MRs are kept in mr_list. The MR can be recovered after it's used
2347  * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
2348  * as MRs are used and recovered for I/O, but the list links will not change
2349  */
2350 static int allocate_mr_list(struct smbd_connection *info)
2351 {
2352         int i;
2353         struct smbd_mr *smbdirect_mr, *tmp;
2354
2355         INIT_LIST_HEAD(&info->mr_list);
2356         init_waitqueue_head(&info->wait_mr);
2357         spin_lock_init(&info->mr_list_lock);
2358         atomic_set(&info->mr_ready_count, 0);
2359         atomic_set(&info->mr_used_count, 0);
2360         init_waitqueue_head(&info->wait_for_mr_cleanup);
2361         /* Allocate more MRs (2x) than hardware responder_resources */
2362         for (i = 0; i < info->responder_resources * 2; i++) {
2363                 smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
2364                 if (!smbdirect_mr)
2365                         goto out;
2366                 smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
2367                                         info->max_frmr_depth);
2368                 if (IS_ERR(smbdirect_mr->mr)) {
2369                         log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x "
2370                                 "max_frmr_depth=%x\n",
2371                                 info->mr_type, info->max_frmr_depth);
2372                         goto out;
2373                 }
2374                 smbdirect_mr->sgl = kcalloc(
2375                                         info->max_frmr_depth,
2376                                         sizeof(struct scatterlist),
2377                                         GFP_KERNEL);
2378                 if (!smbdirect_mr->sgl) {
2379                         log_rdma_mr(ERR, "failed to allocate sgl\n");
2380                         ib_dereg_mr(smbdirect_mr->mr);
2381                         goto out;
2382                 }
2383                 smbdirect_mr->state = MR_READY;
2384                 smbdirect_mr->conn = info;
2385
2386                 list_add_tail(&smbdirect_mr->list, &info->mr_list);
2387                 atomic_inc(&info->mr_ready_count);
2388         }
2389         INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
2390         return 0;
2391
2392 out:
2393         kfree(smbdirect_mr);
2394
2395         list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
2396                 ib_dereg_mr(smbdirect_mr->mr);
2397                 kfree(smbdirect_mr->sgl);
2398                 kfree(smbdirect_mr);
2399         }
2400         return -ENOMEM;
2401 }
2402
2403 /*
2404  * Get a MR from mr_list. This function waits until there is at least one
2405  * MR available in the list. It may access the list while the
2406  * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
2407  * as they never modify the same places. However, there may be several CPUs
2408  * issueing I/O trying to get MR at the same time, mr_list_lock is used to
2409  * protect this situation.
2410  */
2411 static struct smbd_mr *get_mr(struct smbd_connection *info)
2412 {
2413         struct smbd_mr *ret;
2414         int rc;
2415 again:
2416         rc = wait_event_interruptible(info->wait_mr,
2417                 atomic_read(&info->mr_ready_count) ||
2418                 info->transport_status != SMBD_CONNECTED);
2419         if (rc) {
2420                 log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
2421                 return NULL;
2422         }
2423
2424         if (info->transport_status != SMBD_CONNECTED) {
2425                 log_rdma_mr(ERR, "info->transport_status=%x\n",
2426                         info->transport_status);
2427                 return NULL;
2428         }
2429
2430         spin_lock(&info->mr_list_lock);
2431         list_for_each_entry(ret, &info->mr_list, list) {
2432                 if (ret->state == MR_READY) {
2433                         ret->state = MR_REGISTERED;
2434                         spin_unlock(&info->mr_list_lock);
2435                         atomic_dec(&info->mr_ready_count);
2436                         atomic_inc(&info->mr_used_count);
2437                         return ret;
2438                 }
2439         }
2440
2441         spin_unlock(&info->mr_list_lock);
2442         /*
2443          * It is possible that we could fail to get MR because other processes may
2444          * try to acquire a MR at the same time. If this is the case, retry it.
2445          */
2446         goto again;
2447 }
2448
2449 /*
2450  * Register memory for RDMA read/write
2451  * pages[]: the list of pages to register memory with
2452  * num_pages: the number of pages to register
2453  * tailsz: if non-zero, the bytes to register in the last page
2454  * writing: true if this is a RDMA write (SMB read), false for RDMA read
2455  * need_invalidate: true if this MR needs to be locally invalidated after I/O
2456  * return value: the MR registered, NULL if failed.
2457  */
2458 struct smbd_mr *smbd_register_mr(
2459         struct smbd_connection *info, struct page *pages[], int num_pages,
2460         int tailsz, bool writing, bool need_invalidate)
2461 {
2462         struct smbd_mr *smbdirect_mr;
2463         int rc, i;
2464         enum dma_data_direction dir;
2465         struct ib_reg_wr *reg_wr;
2466         struct ib_send_wr *bad_wr;
2467
2468         if (num_pages > info->max_frmr_depth) {
2469                 log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
2470                         num_pages, info->max_frmr_depth);
2471                 return NULL;
2472         }
2473
2474         smbdirect_mr = get_mr(info);
2475         if (!smbdirect_mr) {
2476                 log_rdma_mr(ERR, "get_mr returning NULL\n");
2477                 return NULL;
2478         }
2479         smbdirect_mr->need_invalidate = need_invalidate;
2480         smbdirect_mr->sgl_count = num_pages;
2481         sg_init_table(smbdirect_mr->sgl, num_pages);
2482
2483         for (i = 0; i < num_pages - 1; i++)
2484                 sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
2485
2486         sg_set_page(&smbdirect_mr->sgl[i], pages[i],
2487                 tailsz ? tailsz : PAGE_SIZE, 0);
2488
2489         dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
2490         smbdirect_mr->dir = dir;
2491         rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
2492         if (!rc) {
2493                 log_rdma_mr(INFO, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
2494                         num_pages, dir, rc);
2495                 goto dma_map_error;
2496         }
2497
2498         rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
2499                 NULL, PAGE_SIZE);
2500         if (rc != num_pages) {
2501                 log_rdma_mr(INFO,
2502                         "ib_map_mr_sg failed rc = %x num_pages = %x\n",
2503                         rc, num_pages);
2504                 goto map_mr_error;
2505         }
2506
2507         ib_update_fast_reg_key(smbdirect_mr->mr,
2508                 ib_inc_rkey(smbdirect_mr->mr->rkey));
2509         reg_wr = &smbdirect_mr->wr;
2510         reg_wr->wr.opcode = IB_WR_REG_MR;
2511         smbdirect_mr->cqe.done = register_mr_done;
2512         reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
2513         reg_wr->wr.num_sge = 0;
2514         reg_wr->wr.send_flags = IB_SEND_SIGNALED;
2515         reg_wr->mr = smbdirect_mr->mr;
2516         reg_wr->key = smbdirect_mr->mr->rkey;
2517         reg_wr->access = writing ?
2518                         IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
2519                         IB_ACCESS_REMOTE_READ;
2520
2521         /*
2522          * There is no need for waiting for complemtion on ib_post_send
2523          * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
2524          * on the next ib_post_send when we actaully send I/O to remote peer
2525          */
2526         rc = ib_post_send(info->id->qp, &reg_wr->wr, &bad_wr);
2527         if (!rc)
2528                 return smbdirect_mr;
2529
2530         log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
2531                 rc, reg_wr->key);
2532
2533         /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
2534 map_mr_error:
2535         ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
2536                 smbdirect_mr->sgl_count, smbdirect_mr->dir);
2537
2538 dma_map_error:
2539         smbdirect_mr->state = MR_ERROR;
2540         if (atomic_dec_and_test(&info->mr_used_count))
2541                 wake_up(&info->wait_for_mr_cleanup);
2542
2543         return NULL;
2544 }
2545
2546 static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
2547 {
2548         struct smbd_mr *smbdirect_mr;
2549         struct ib_cqe *cqe;
2550
2551         cqe = wc->wr_cqe;
2552         smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
2553         smbdirect_mr->state = MR_INVALIDATED;
2554         if (wc->status != IB_WC_SUCCESS) {
2555                 log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
2556                 smbdirect_mr->state = MR_ERROR;
2557         }
2558         complete(&smbdirect_mr->invalidate_done);
2559 }
2560
2561 /*
2562  * Deregister a MR after I/O is done
2563  * This function may wait if remote invalidation is not used
2564  * and we have to locally invalidate the buffer to prevent data is being
2565  * modified by remote peer after upper layer consumes it
2566  */
2567 int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
2568 {
2569         struct ib_send_wr *wr, *bad_wr;
2570         struct smbd_connection *info = smbdirect_mr->conn;
2571         int rc = 0;
2572
2573         if (smbdirect_mr->need_invalidate) {
2574                 /* Need to finish local invalidation before returning */
2575                 wr = &smbdirect_mr->inv_wr;
2576                 wr->opcode = IB_WR_LOCAL_INV;
2577                 smbdirect_mr->cqe.done = local_inv_done;
2578                 wr->wr_cqe = &smbdirect_mr->cqe;
2579                 wr->num_sge = 0;
2580                 wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
2581                 wr->send_flags = IB_SEND_SIGNALED;
2582
2583                 init_completion(&smbdirect_mr->invalidate_done);
2584                 rc = ib_post_send(info->id->qp, wr, &bad_wr);
2585                 if (rc) {
2586                         log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
2587                         smbd_disconnect_rdma_connection(info);
2588                         goto done;
2589                 }
2590                 wait_for_completion(&smbdirect_mr->invalidate_done);
2591                 smbdirect_mr->need_invalidate = false;
2592         } else
2593                 /*
2594                  * For remote invalidation, just set it to MR_INVALIDATED
2595                  * and defer to mr_recovery_work to recover the MR for next use
2596                  */
2597                 smbdirect_mr->state = MR_INVALIDATED;
2598
2599         /*
2600          * Schedule the work to do MR recovery for future I/Os
2601          * MR recovery is slow and we don't want it to block the current I/O
2602          */
2603         queue_work(info->workqueue, &info->mr_recovery_work);
2604
2605 done:
2606         if (atomic_dec_and_test(&info->mr_used_count))
2607                 wake_up(&info->wait_for_mr_cleanup);
2608
2609         return rc;
2610 }