]> git.samba.org - sfrench/cifs-2.6.git/blob - drivers/block/rnbd/rnbd-clt.c
zonefs: convert zonefs to use the new mount api
[sfrench/cifs-2.6.git] / drivers / block / rnbd / rnbd-clt.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * RDMA Network Block Driver
4  *
5  * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6  * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7  * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8  */
9
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13 #include <linux/module.h>
14 #include <linux/blkdev.h>
15 #include <linux/hdreg.h>
16 #include <linux/scatterlist.h>
17 #include <linux/idr.h>
18
19 #include "rnbd-clt.h"
20
21 MODULE_DESCRIPTION("RDMA Network Block Device Client");
22 MODULE_LICENSE("GPL");
23
24 static int rnbd_client_major;
25 static DEFINE_IDA(index_ida);
26 static DEFINE_MUTEX(sess_lock);
27 static LIST_HEAD(sess_list);
28 static struct workqueue_struct *rnbd_clt_wq;
29
30 /*
31  * Maximum number of partitions an instance can have.
32  * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
33  */
34 #define RNBD_PART_BITS          6
35
36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess)
37 {
38         return refcount_inc_not_zero(&sess->refcount);
39 }
40
41 static void free_sess(struct rnbd_clt_session *sess);
42
43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess)
44 {
45         might_sleep();
46
47         if (refcount_dec_and_test(&sess->refcount))
48                 free_sess(sess);
49 }
50
51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev)
52 {
53         might_sleep();
54
55         if (!refcount_dec_and_test(&dev->refcount))
56                 return;
57
58         ida_free(&index_ida, dev->clt_device_id);
59         kfree(dev->hw_queues);
60         kfree(dev->pathname);
61         rnbd_clt_put_sess(dev->sess);
62         mutex_destroy(&dev->lock);
63         kfree(dev);
64 }
65
66 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev)
67 {
68         return refcount_inc_not_zero(&dev->refcount);
69 }
70
71 static void rnbd_clt_change_capacity(struct rnbd_clt_dev *dev,
72                                     sector_t new_nsectors)
73 {
74         if (get_capacity(dev->gd) == new_nsectors)
75                 return;
76
77         /*
78          * If the size changed, we need to revalidate it
79          */
80         rnbd_clt_info(dev, "Device size changed from %llu to %llu sectors\n",
81                       get_capacity(dev->gd), new_nsectors);
82         set_capacity_and_notify(dev->gd, new_nsectors);
83 }
84
85 static int process_msg_open_rsp(struct rnbd_clt_dev *dev,
86                                 struct rnbd_msg_open_rsp *rsp)
87 {
88         struct kobject *gd_kobj;
89         int err = 0;
90
91         mutex_lock(&dev->lock);
92         if (dev->dev_state == DEV_STATE_UNMAPPED) {
93                 rnbd_clt_info(dev,
94                                "Ignoring Open-Response message from server for  unmapped device\n");
95                 err = -ENOENT;
96                 goto out;
97         }
98         if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) {
99                 u64 nsectors = le64_to_cpu(rsp->nsectors);
100
101                 rnbd_clt_change_capacity(dev, nsectors);
102                 gd_kobj = &disk_to_dev(dev->gd)->kobj;
103                 kobject_uevent(gd_kobj, KOBJ_ONLINE);
104                 rnbd_clt_info(dev, "Device online, device remapped successfully\n");
105         }
106         if (!rsp->logical_block_size) {
107                 err = -EINVAL;
108                 goto out;
109         }
110         dev->device_id = le32_to_cpu(rsp->device_id);
111         dev->dev_state = DEV_STATE_MAPPED;
112
113 out:
114         mutex_unlock(&dev->lock);
115
116         return err;
117 }
118
119 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, sector_t newsize)
120 {
121         int ret = 0;
122
123         mutex_lock(&dev->lock);
124         if (dev->dev_state != DEV_STATE_MAPPED) {
125                 pr_err("Failed to set new size of the device, device is not opened\n");
126                 ret = -ENOENT;
127                 goto out;
128         }
129         rnbd_clt_change_capacity(dev, newsize);
130
131 out:
132         mutex_unlock(&dev->lock);
133
134         return ret;
135 }
136
137 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q)
138 {
139         if (WARN_ON(!q->hctx))
140                 return;
141
142         /* We can come here from interrupt, thus async=true */
143         blk_mq_run_hw_queue(q->hctx, true);
144 }
145
146 enum {
147         RNBD_DELAY_IFBUSY = -1,
148 };
149
150 /**
151  * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun
152  * @sess:       Session to find a queue for
153  * @cpu:        Cpu to start the search from
154  *
155  * Description:
156  *     Each CPU has a list of HW queues, which needs to be rerun.  If a list
157  *     is not empty - it is marked with a bit.  This function finds first
158  *     set bit in a bitmap and returns corresponding CPU list.
159  */
160 static struct rnbd_cpu_qlist *
161 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu)
162 {
163         int bit;
164
165         /* Search from cpu to nr_cpu_ids */
166         bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu);
167         if (bit < nr_cpu_ids) {
168                 return per_cpu_ptr(sess->cpu_queues, bit);
169         } else if (cpu != 0) {
170                 /* Search from 0 to cpu */
171                 bit = find_first_bit(sess->cpu_queues_bm, cpu);
172                 if (bit < cpu)
173                         return per_cpu_ptr(sess->cpu_queues, bit);
174         }
175
176         return NULL;
177 }
178
179 static inline int nxt_cpu(int cpu)
180 {
181         return (cpu + 1) % nr_cpu_ids;
182 }
183
184 /**
185  * rnbd_rerun_if_needed() - rerun next queue marked as stopped
186  * @sess:       Session to rerun a queue on
187  *
188  * Description:
189  *     Each CPU has it's own list of HW queues, which should be rerun.
190  *     Function finds such list with HW queues, takes a list lock, picks up
191  *     the first HW queue out of the list and requeues it.
192  *
193  * Return:
194  *     True if the queue was requeued, false otherwise.
195  *
196  * Context:
197  *     Does not matter.
198  */
199 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess)
200 {
201         struct rnbd_queue *q = NULL;
202         struct rnbd_cpu_qlist *cpu_q;
203         unsigned long flags;
204         int *cpup;
205
206         /*
207          * To keep fairness and not to let other queues starve we always
208          * try to wake up someone else in round-robin manner.  That of course
209          * increases latency but queues always have a chance to be executed.
210          */
211         cpup = get_cpu_ptr(sess->cpu_rr);
212         for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q;
213              cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) {
214                 if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags))
215                         continue;
216                 if (!test_bit(cpu_q->cpu, sess->cpu_queues_bm))
217                         goto unlock;
218                 q = list_first_entry_or_null(&cpu_q->requeue_list,
219                                              typeof(*q), requeue_list);
220                 if (WARN_ON(!q))
221                         goto clear_bit;
222                 list_del_init(&q->requeue_list);
223                 clear_bit_unlock(0, &q->in_list);
224
225                 if (list_empty(&cpu_q->requeue_list)) {
226                         /* Clear bit if nothing is left */
227 clear_bit:
228                         clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
229                 }
230 unlock:
231                 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
232
233                 if (q)
234                         break;
235         }
236
237         /**
238          * Saves the CPU that is going to be requeued on the per-cpu var. Just
239          * incrementing it doesn't work because rnbd_get_cpu_qlist() will
240          * always return the first CPU with something on the queue list when the
241          * value stored on the var is greater than the last CPU with something
242          * on the list.
243          */
244         if (cpu_q)
245                 *cpup = cpu_q->cpu;
246         put_cpu_ptr(sess->cpu_rr);
247
248         if (q)
249                 rnbd_clt_dev_requeue(q);
250
251         return q;
252 }
253
254 /**
255  * rnbd_rerun_all_if_idle() - rerun all queues left in the list if
256  *                               session is idling (there are no requests
257  *                               in-flight).
258  * @sess:       Session to rerun the queues on
259  *
260  * Description:
261  *     This function tries to rerun all stopped queues if there are no
262  *     requests in-flight anymore.  This function tries to solve an obvious
263  *     problem, when number of tags < than number of queues (hctx), which
264  *     are stopped and put to sleep.  If last permit, which has been just put,
265  *     does not wake up all left queues (hctxs), IO requests hang forever.
266  *
267  *     That can happen when all number of permits, say N, have been exhausted
268  *     from one CPU, and we have many block devices per session, say M.
269  *     Each block device has it's own queue (hctx) for each CPU, so eventually
270  *     we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
271  *     If number of permits N < M x nr_cpu_ids finally we will get an IO hang.
272  *
273  *     To avoid this hang last caller of rnbd_put_permit() (last caller is the
274  *     one who observes sess->busy == 0) must wake up all remaining queues.
275  *
276  * Context:
277  *     Does not matter.
278  */
279 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess)
280 {
281         bool requeued;
282
283         do {
284                 requeued = rnbd_rerun_if_needed(sess);
285         } while (atomic_read(&sess->busy) == 0 && requeued);
286 }
287
288 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess,
289                                              enum rtrs_clt_con_type con_type,
290                                              enum wait_type wait)
291 {
292         struct rtrs_permit *permit;
293
294         permit = rtrs_clt_get_permit(sess->rtrs, con_type, wait);
295         if (permit)
296                 /* We have a subtle rare case here, when all permits can be
297                  * consumed before busy counter increased.  This is safe,
298                  * because loser will get NULL as a permit, observe 0 busy
299                  * counter and immediately restart the queue himself.
300                  */
301                 atomic_inc(&sess->busy);
302
303         return permit;
304 }
305
306 static void rnbd_put_permit(struct rnbd_clt_session *sess,
307                              struct rtrs_permit *permit)
308 {
309         rtrs_clt_put_permit(sess->rtrs, permit);
310         atomic_dec(&sess->busy);
311         /* Paired with rnbd_clt_dev_add_to_requeue().  Decrement first
312          * and then check queue bits.
313          */
314         smp_mb__after_atomic();
315         rnbd_rerun_all_if_idle(sess);
316 }
317
318 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess,
319                                      enum rtrs_clt_con_type con_type,
320                                      enum wait_type wait)
321 {
322         struct rnbd_iu *iu;
323         struct rtrs_permit *permit;
324
325         iu = kzalloc(sizeof(*iu), GFP_KERNEL);
326         if (!iu)
327                 return NULL;
328
329         permit = rnbd_get_permit(sess, con_type, wait);
330         if (!permit) {
331                 kfree(iu);
332                 return NULL;
333         }
334
335         iu->permit = permit;
336         /*
337          * 1st reference is dropped after finishing sending a "user" message,
338          * 2nd reference is dropped after confirmation with the response is
339          * returned.
340          * 1st and 2nd can happen in any order, so the rnbd_iu should be
341          * released (rtrs_permit returned to rtrs) only after both
342          * are finished.
343          */
344         atomic_set(&iu->refcount, 2);
345         init_waitqueue_head(&iu->comp.wait);
346         iu->comp.errno = INT_MAX;
347
348         if (sg_alloc_table(&iu->sgt, 1, GFP_KERNEL)) {
349                 rnbd_put_permit(sess, permit);
350                 kfree(iu);
351                 return NULL;
352         }
353
354         return iu;
355 }
356
357 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu)
358 {
359         if (atomic_dec_and_test(&iu->refcount)) {
360                 sg_free_table(&iu->sgt);
361                 rnbd_put_permit(sess, iu->permit);
362                 kfree(iu);
363         }
364 }
365
366 static void rnbd_softirq_done_fn(struct request *rq)
367 {
368         struct rnbd_clt_dev *dev        = rq->q->disk->private_data;
369         struct rnbd_clt_session *sess   = dev->sess;
370         struct rnbd_iu *iu;
371
372         iu = blk_mq_rq_to_pdu(rq);
373         sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
374         rnbd_put_permit(sess, iu->permit);
375         blk_mq_end_request(rq, errno_to_blk_status(iu->errno));
376 }
377
378 static void msg_io_conf(void *priv, int errno)
379 {
380         struct rnbd_iu *iu = priv;
381         struct rnbd_clt_dev *dev = iu->dev;
382         struct request *rq = iu->rq;
383         int rw = rq_data_dir(rq);
384
385         iu->errno = errno;
386
387         blk_mq_complete_request(rq);
388
389         if (errno)
390                 rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n",
391                                  rw == READ ? "read" : "write", errno);
392 }
393
394 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno)
395 {
396         iu->comp.errno = errno;
397         wake_up(&iu->comp.wait);
398 }
399
400 static void msg_conf(void *priv, int errno)
401 {
402         struct rnbd_iu *iu = priv;
403
404         iu->errno = errno;
405         schedule_work(&iu->work);
406 }
407
408 static int send_usr_msg(struct rtrs_clt_sess *rtrs, int dir,
409                         struct rnbd_iu *iu, struct kvec *vec,
410                         size_t len, struct scatterlist *sg, unsigned int sg_len,
411                         void (*conf)(struct work_struct *work),
412                         int *errno, int wait)
413 {
414         int err;
415         struct rtrs_clt_req_ops req_ops;
416
417         INIT_WORK(&iu->work, conf);
418         req_ops = (struct rtrs_clt_req_ops) {
419                 .priv = iu,
420                 .conf_fn = msg_conf,
421         };
422         err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit,
423                                 vec, 1, len, sg, sg_len);
424         if (!err && wait) {
425                 wait_event(iu->comp.wait, iu->comp.errno != INT_MAX);
426                 *errno = iu->comp.errno;
427         } else {
428                 *errno = 0;
429         }
430
431         return err;
432 }
433
434 static void msg_close_conf(struct work_struct *work)
435 {
436         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
437         struct rnbd_clt_dev *dev = iu->dev;
438
439         wake_up_iu_comp(iu, iu->errno);
440         rnbd_put_iu(dev->sess, iu);
441         rnbd_clt_put_dev(dev);
442 }
443
444 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id,
445                           enum wait_type wait)
446 {
447         struct rnbd_clt_session *sess = dev->sess;
448         struct rnbd_msg_close msg;
449         struct rnbd_iu *iu;
450         struct kvec vec = {
451                 .iov_base = &msg,
452                 .iov_len  = sizeof(msg)
453         };
454         int err, errno;
455
456         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
457         if (!iu)
458                 return -ENOMEM;
459
460         iu->buf = NULL;
461         iu->dev = dev;
462
463         msg.hdr.type    = cpu_to_le16(RNBD_MSG_CLOSE);
464         msg.device_id   = cpu_to_le32(device_id);
465
466         WARN_ON(!rnbd_clt_get_dev(dev));
467         err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0,
468                            msg_close_conf, &errno, wait);
469         if (err) {
470                 rnbd_clt_put_dev(dev);
471                 rnbd_put_iu(sess, iu);
472         } else {
473                 err = errno;
474         }
475
476         rnbd_put_iu(sess, iu);
477         return err;
478 }
479
480 static void msg_open_conf(struct work_struct *work)
481 {
482         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
483         struct rnbd_msg_open_rsp *rsp = iu->buf;
484         struct rnbd_clt_dev *dev = iu->dev;
485         int errno = iu->errno;
486         bool from_map = false;
487
488         /* INIT state is only triggered from rnbd_clt_map_device */
489         if (dev->dev_state == DEV_STATE_INIT)
490                 from_map = true;
491
492         if (errno) {
493                 rnbd_clt_err(dev,
494                               "Opening failed, server responded: %d\n",
495                               errno);
496         } else {
497                 errno = process_msg_open_rsp(dev, rsp);
498                 if (errno) {
499                         u32 device_id = le32_to_cpu(rsp->device_id);
500                         /*
501                          * If server thinks its fine, but we fail to process
502                          * then be nice and send a close to server.
503                          */
504                         send_msg_close(dev, device_id, RTRS_PERMIT_NOWAIT);
505                 }
506         }
507         /* We free rsp in rnbd_clt_map_device for map scenario */
508         if (!from_map)
509                 kfree(rsp);
510         wake_up_iu_comp(iu, errno);
511         rnbd_put_iu(dev->sess, iu);
512         rnbd_clt_put_dev(dev);
513 }
514
515 static void msg_sess_info_conf(struct work_struct *work)
516 {
517         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
518         struct rnbd_msg_sess_info_rsp *rsp = iu->buf;
519         struct rnbd_clt_session *sess = iu->sess;
520
521         if (!iu->errno)
522                 sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR);
523
524         kfree(rsp);
525         wake_up_iu_comp(iu, iu->errno);
526         rnbd_put_iu(sess, iu);
527         rnbd_clt_put_sess(sess);
528 }
529
530 static int send_msg_open(struct rnbd_clt_dev *dev, enum wait_type wait)
531 {
532         struct rnbd_clt_session *sess = dev->sess;
533         struct rnbd_msg_open_rsp *rsp;
534         struct rnbd_msg_open msg;
535         struct rnbd_iu *iu;
536         struct kvec vec = {
537                 .iov_base = &msg,
538                 .iov_len  = sizeof(msg)
539         };
540         int err, errno;
541
542         rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
543         if (!rsp)
544                 return -ENOMEM;
545
546         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
547         if (!iu) {
548                 kfree(rsp);
549                 return -ENOMEM;
550         }
551
552         iu->buf = rsp;
553         iu->dev = dev;
554
555         sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
556
557         msg.hdr.type    = cpu_to_le16(RNBD_MSG_OPEN);
558         msg.access_mode = dev->access_mode;
559         strscpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name));
560
561         WARN_ON(!rnbd_clt_get_dev(dev));
562         err = send_usr_msg(sess->rtrs, READ, iu,
563                            &vec, sizeof(*rsp), iu->sgt.sgl, 1,
564                            msg_open_conf, &errno, wait);
565         if (err) {
566                 rnbd_clt_put_dev(dev);
567                 rnbd_put_iu(sess, iu);
568                 kfree(rsp);
569         } else {
570                 err = errno;
571         }
572
573         rnbd_put_iu(sess, iu);
574         return err;
575 }
576
577 static int send_msg_sess_info(struct rnbd_clt_session *sess, enum wait_type wait)
578 {
579         struct rnbd_msg_sess_info_rsp *rsp;
580         struct rnbd_msg_sess_info msg;
581         struct rnbd_iu *iu;
582         struct kvec vec = {
583                 .iov_base = &msg,
584                 .iov_len  = sizeof(msg)
585         };
586         int err, errno;
587
588         rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
589         if (!rsp)
590                 return -ENOMEM;
591
592         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
593         if (!iu) {
594                 kfree(rsp);
595                 return -ENOMEM;
596         }
597
598         iu->buf = rsp;
599         iu->sess = sess;
600         sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
601
602         msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO);
603         msg.ver      = RNBD_PROTO_VER_MAJOR;
604
605         if (!rnbd_clt_get_sess(sess)) {
606                 /*
607                  * That can happen only in one case, when RTRS has restablished
608                  * the connection and link_ev() is called, but session is almost
609                  * dead, last reference on session is put and caller is waiting
610                  * for RTRS to close everything.
611                  */
612                 err = -ENODEV;
613                 goto put_iu;
614         }
615         err = send_usr_msg(sess->rtrs, READ, iu,
616                            &vec, sizeof(*rsp), iu->sgt.sgl, 1,
617                            msg_sess_info_conf, &errno, wait);
618         if (err) {
619                 rnbd_clt_put_sess(sess);
620 put_iu:
621                 rnbd_put_iu(sess, iu);
622                 kfree(rsp);
623         } else {
624                 err = errno;
625         }
626         rnbd_put_iu(sess, iu);
627         return err;
628 }
629
630 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess)
631 {
632         struct rnbd_clt_dev *dev;
633         struct kobject *gd_kobj;
634
635         mutex_lock(&sess->lock);
636         list_for_each_entry(dev, &sess->devs_list, list) {
637                 rnbd_clt_err(dev, "Device disconnected.\n");
638
639                 mutex_lock(&dev->lock);
640                 if (dev->dev_state == DEV_STATE_MAPPED) {
641                         dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED;
642                         gd_kobj = &disk_to_dev(dev->gd)->kobj;
643                         kobject_uevent(gd_kobj, KOBJ_OFFLINE);
644                 }
645                 mutex_unlock(&dev->lock);
646         }
647         mutex_unlock(&sess->lock);
648 }
649
650 static void remap_devs(struct rnbd_clt_session *sess)
651 {
652         struct rnbd_clt_dev *dev;
653         struct rtrs_attrs attrs;
654         int err;
655
656         /*
657          * Careful here: we are called from RTRS link event directly,
658          * thus we can't send any RTRS request and wait for response
659          * or RTRS will not be able to complete request with failure
660          * if something goes wrong (failing of outstanding requests
661          * happens exactly from the context where we are blocking now).
662          *
663          * So to avoid deadlocks each usr message sent from here must
664          * be asynchronous.
665          */
666
667         err = send_msg_sess_info(sess, RTRS_PERMIT_NOWAIT);
668         if (err) {
669                 pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err);
670                 return;
671         }
672
673         err = rtrs_clt_query(sess->rtrs, &attrs);
674         if (err) {
675                 pr_err("rtrs_clt_query(\"%s\"): %d\n", sess->sessname, err);
676                 return;
677         }
678         mutex_lock(&sess->lock);
679         sess->max_io_size = attrs.max_io_size;
680
681         list_for_each_entry(dev, &sess->devs_list, list) {
682                 bool skip;
683
684                 mutex_lock(&dev->lock);
685                 skip = (dev->dev_state == DEV_STATE_INIT);
686                 mutex_unlock(&dev->lock);
687                 if (skip)
688                         /*
689                          * When device is establishing connection for the first
690                          * time - do not remap, it will be closed soon.
691                          */
692                         continue;
693
694                 rnbd_clt_info(dev, "session reconnected, remapping device\n");
695                 err = send_msg_open(dev, RTRS_PERMIT_NOWAIT);
696                 if (err) {
697                         rnbd_clt_err(dev, "send_msg_open(): %d\n", err);
698                         break;
699                 }
700         }
701         mutex_unlock(&sess->lock);
702 }
703
704 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev)
705 {
706         struct rnbd_clt_session *sess = priv;
707
708         switch (ev) {
709         case RTRS_CLT_LINK_EV_DISCONNECTED:
710                 set_dev_states_to_disconnected(sess);
711                 break;
712         case RTRS_CLT_LINK_EV_RECONNECTED:
713                 remap_devs(sess);
714                 break;
715         default:
716                 pr_err("Unknown session event received (%d), session: %s\n",
717                        ev, sess->sessname);
718         }
719 }
720
721 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues)
722 {
723         unsigned int cpu;
724         struct rnbd_cpu_qlist *cpu_q;
725
726         for_each_possible_cpu(cpu) {
727                 cpu_q = per_cpu_ptr(cpu_queues, cpu);
728
729                 cpu_q->cpu = cpu;
730                 INIT_LIST_HEAD(&cpu_q->requeue_list);
731                 spin_lock_init(&cpu_q->requeue_lock);
732         }
733 }
734
735 static void destroy_mq_tags(struct rnbd_clt_session *sess)
736 {
737         if (sess->tag_set.tags)
738                 blk_mq_free_tag_set(&sess->tag_set);
739 }
740
741 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess)
742 {
743         sess->rtrs_ready = true;
744         wake_up_all(&sess->rtrs_waitq);
745 }
746
747 static void close_rtrs(struct rnbd_clt_session *sess)
748 {
749         might_sleep();
750
751         if (!IS_ERR_OR_NULL(sess->rtrs)) {
752                 rtrs_clt_close(sess->rtrs);
753                 sess->rtrs = NULL;
754                 wake_up_rtrs_waiters(sess);
755         }
756 }
757
758 static void free_sess(struct rnbd_clt_session *sess)
759 {
760         WARN_ON(!list_empty(&sess->devs_list));
761
762         might_sleep();
763
764         close_rtrs(sess);
765         destroy_mq_tags(sess);
766         if (!list_empty(&sess->list)) {
767                 mutex_lock(&sess_lock);
768                 list_del(&sess->list);
769                 mutex_unlock(&sess_lock);
770         }
771         free_percpu(sess->cpu_queues);
772         free_percpu(sess->cpu_rr);
773         mutex_destroy(&sess->lock);
774         kfree(sess);
775 }
776
777 static struct rnbd_clt_session *alloc_sess(const char *sessname)
778 {
779         struct rnbd_clt_session *sess;
780         int err, cpu;
781
782         sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE);
783         if (!sess)
784                 return ERR_PTR(-ENOMEM);
785         strscpy(sess->sessname, sessname, sizeof(sess->sessname));
786         atomic_set(&sess->busy, 0);
787         mutex_init(&sess->lock);
788         INIT_LIST_HEAD(&sess->devs_list);
789         INIT_LIST_HEAD(&sess->list);
790         bitmap_zero(sess->cpu_queues_bm, num_possible_cpus());
791         init_waitqueue_head(&sess->rtrs_waitq);
792         refcount_set(&sess->refcount, 1);
793
794         sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist);
795         if (!sess->cpu_queues) {
796                 err = -ENOMEM;
797                 goto err;
798         }
799         rnbd_init_cpu_qlists(sess->cpu_queues);
800
801         /*
802          * That is simple percpu variable which stores cpu indices, which are
803          * incremented on each access.  We need that for the sake of fairness
804          * to wake up queues in a round-robin manner.
805          */
806         sess->cpu_rr = alloc_percpu(int);
807         if (!sess->cpu_rr) {
808                 err = -ENOMEM;
809                 goto err;
810         }
811         for_each_possible_cpu(cpu)
812                 * per_cpu_ptr(sess->cpu_rr, cpu) = cpu;
813
814         return sess;
815
816 err:
817         free_sess(sess);
818
819         return ERR_PTR(err);
820 }
821
822 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess)
823 {
824         wait_event(sess->rtrs_waitq, sess->rtrs_ready);
825         if (IS_ERR_OR_NULL(sess->rtrs))
826                 return -ECONNRESET;
827
828         return 0;
829 }
830
831 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess)
832         __releases(&sess_lock)
833         __acquires(&sess_lock)
834 {
835         DEFINE_WAIT(wait);
836
837         prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE);
838         if (IS_ERR_OR_NULL(sess->rtrs)) {
839                 finish_wait(&sess->rtrs_waitq, &wait);
840                 return;
841         }
842         mutex_unlock(&sess_lock);
843         /* loop in caller, see __find_and_get_sess().
844          * You can't leave mutex locked and call schedule(), you will catch a
845          * deadlock with a caller of free_sess(), which has just put the last
846          * reference and is about to take the sess_lock in order to delete
847          * the session from the list.
848          */
849         schedule();
850         mutex_lock(&sess_lock);
851 }
852
853 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname)
854         __releases(&sess_lock)
855         __acquires(&sess_lock)
856 {
857         struct rnbd_clt_session *sess, *sn;
858         int err;
859
860 again:
861         list_for_each_entry_safe(sess, sn, &sess_list, list) {
862                 if (strcmp(sessname, sess->sessname))
863                         continue;
864
865                 if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs))
866                         /*
867                          * No RTRS connection, session is dying.
868                          */
869                         continue;
870
871                 if (rnbd_clt_get_sess(sess)) {
872                         /*
873                          * Alive session is found, wait for RTRS connection.
874                          */
875                         mutex_unlock(&sess_lock);
876                         err = wait_for_rtrs_connection(sess);
877                         if (err)
878                                 rnbd_clt_put_sess(sess);
879                         mutex_lock(&sess_lock);
880
881                         if (err)
882                                 /* Session is dying, repeat the loop */
883                                 goto again;
884
885                         return sess;
886                 }
887                 /*
888                  * Ref is 0, session is dying, wait for RTRS disconnect
889                  * in order to avoid session names clashes.
890                  */
891                 wait_for_rtrs_disconnection(sess);
892                 /*
893                  * RTRS is disconnected and soon session will be freed,
894                  * so repeat a loop.
895                  */
896                 goto again;
897         }
898
899         return NULL;
900 }
901
902 /* caller is responsible for initializing 'first' to false */
903 static struct
904 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first)
905 {
906         struct rnbd_clt_session *sess = NULL;
907
908         mutex_lock(&sess_lock);
909         sess = __find_and_get_sess(sessname);
910         if (!sess) {
911                 sess = alloc_sess(sessname);
912                 if (IS_ERR(sess)) {
913                         mutex_unlock(&sess_lock);
914                         return sess;
915                 }
916                 list_add(&sess->list, &sess_list);
917                 *first = true;
918         }
919         mutex_unlock(&sess_lock);
920
921         return sess;
922 }
923
924 static int rnbd_client_open(struct gendisk *disk, blk_mode_t mode)
925 {
926         struct rnbd_clt_dev *dev = disk->private_data;
927
928         if (get_disk_ro(dev->gd) && (mode & BLK_OPEN_WRITE))
929                 return -EPERM;
930
931         if (dev->dev_state == DEV_STATE_UNMAPPED ||
932             !rnbd_clt_get_dev(dev))
933                 return -EIO;
934
935         return 0;
936 }
937
938 static void rnbd_client_release(struct gendisk *gen)
939 {
940         struct rnbd_clt_dev *dev = gen->private_data;
941
942         rnbd_clt_put_dev(dev);
943 }
944
945 static int rnbd_client_getgeo(struct block_device *block_device,
946                               struct hd_geometry *geo)
947 {
948         u64 size;
949         struct rnbd_clt_dev *dev = block_device->bd_disk->private_data;
950         struct queue_limits *limit = &dev->queue->limits;
951
952         size = dev->size * (limit->logical_block_size / SECTOR_SIZE);
953         geo->cylinders  = size >> 6;    /* size/64 */
954         geo->heads      = 4;
955         geo->sectors    = 16;
956         geo->start      = 0;
957
958         return 0;
959 }
960
961 static const struct block_device_operations rnbd_client_ops = {
962         .owner          = THIS_MODULE,
963         .open           = rnbd_client_open,
964         .release        = rnbd_client_release,
965         .getgeo         = rnbd_client_getgeo
966 };
967
968 /* The amount of data that belongs to an I/O and the amount of data that
969  * should be read or written to the disk (bi_size) can differ.
970  *
971  * E.g. When WRITE_SAME is used, only a small amount of data is
972  * transferred that is then written repeatedly over a lot of sectors.
973  *
974  * Get the size of data to be transferred via RTRS by summing up the size
975  * of the scather-gather list entries.
976  */
977 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len)
978 {
979         struct scatterlist *sg;
980         size_t tsize = 0;
981         int i;
982
983         for_each_sg(sglist, sg, len, i)
984                 tsize += sg->length;
985         return tsize;
986 }
987
988 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev,
989                                      struct request *rq,
990                                      struct rnbd_iu *iu)
991 {
992         struct rtrs_clt_sess *rtrs = dev->sess->rtrs;
993         struct rtrs_permit *permit = iu->permit;
994         struct rnbd_msg_io msg;
995         struct rtrs_clt_req_ops req_ops;
996         unsigned int sg_cnt = 0;
997         struct kvec vec;
998         size_t size;
999         int err;
1000
1001         iu->rq          = rq;
1002         iu->dev         = dev;
1003         msg.sector      = cpu_to_le64(blk_rq_pos(rq));
1004         msg.bi_size     = cpu_to_le32(blk_rq_bytes(rq));
1005         msg.rw          = cpu_to_le32(rq_to_rnbd_flags(rq));
1006         msg.prio        = cpu_to_le16(req_get_ioprio(rq));
1007
1008         /*
1009          * We only support discards/WRITE_ZEROES with single segment for now.
1010          * See queue limits.
1011          */
1012         if ((req_op(rq) != REQ_OP_DISCARD) && (req_op(rq) != REQ_OP_WRITE_ZEROES))
1013                 sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sgt.sgl);
1014
1015         if (sg_cnt == 0)
1016                 sg_mark_end(&iu->sgt.sgl[0]);
1017
1018         msg.hdr.type    = cpu_to_le16(RNBD_MSG_IO);
1019         msg.device_id   = cpu_to_le32(dev->device_id);
1020
1021         vec = (struct kvec) {
1022                 .iov_base = &msg,
1023                 .iov_len  = sizeof(msg)
1024         };
1025         size = rnbd_clt_get_sg_size(iu->sgt.sgl, sg_cnt);
1026         req_ops = (struct rtrs_clt_req_ops) {
1027                 .priv = iu,
1028                 .conf_fn = msg_io_conf,
1029         };
1030         err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit,
1031                                &vec, 1, size, iu->sgt.sgl, sg_cnt);
1032         if (err) {
1033                 rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n",
1034                                  err);
1035                 return err;
1036         }
1037
1038         return 0;
1039 }
1040
1041 /**
1042  * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy
1043  * @dev:        Device to be checked
1044  * @q:          Queue to be added to the requeue list if required
1045  *
1046  * Description:
1047  *     If session is busy, that means someone will requeue us when resources
1048  *     are freed.  If session is not doing anything - device is not added to
1049  *     the list and @false is returned.
1050  */
1051 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev,
1052                                                 struct rnbd_queue *q)
1053 {
1054         struct rnbd_clt_session *sess = dev->sess;
1055         struct rnbd_cpu_qlist *cpu_q;
1056         unsigned long flags;
1057         bool added = true;
1058         bool need_set;
1059
1060         cpu_q = get_cpu_ptr(sess->cpu_queues);
1061         spin_lock_irqsave(&cpu_q->requeue_lock, flags);
1062
1063         if (!test_and_set_bit_lock(0, &q->in_list)) {
1064                 if (WARN_ON(!list_empty(&q->requeue_list)))
1065                         goto unlock;
1066
1067                 need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm);
1068                 if (need_set) {
1069                         set_bit(cpu_q->cpu, sess->cpu_queues_bm);
1070                         /* Paired with rnbd_put_permit(). Set a bit first
1071                          * and then observe the busy counter.
1072                          */
1073                         smp_mb__before_atomic();
1074                 }
1075                 if (atomic_read(&sess->busy)) {
1076                         list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
1077                 } else {
1078                         /* Very unlikely, but possible: busy counter was
1079                          * observed as zero.  Drop all bits and return
1080                          * false to restart the queue by ourselves.
1081                          */
1082                         if (need_set)
1083                                 clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
1084                         clear_bit_unlock(0, &q->in_list);
1085                         added = false;
1086                 }
1087         }
1088 unlock:
1089         spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
1090         put_cpu_ptr(sess->cpu_queues);
1091
1092         return added;
1093 }
1094
1095 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev,
1096                                         struct blk_mq_hw_ctx *hctx,
1097                                         int delay)
1098 {
1099         struct rnbd_queue *q = hctx->driver_data;
1100
1101         if (delay != RNBD_DELAY_IFBUSY)
1102                 blk_mq_delay_run_hw_queue(hctx, delay);
1103         else if (!rnbd_clt_dev_add_to_requeue(dev, q))
1104                 /*
1105                  * If session is not busy we have to restart
1106                  * the queue ourselves.
1107                  */
1108                 blk_mq_delay_run_hw_queue(hctx, 10/*ms*/);
1109 }
1110
1111 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx,
1112                                    const struct blk_mq_queue_data *bd)
1113 {
1114         struct request *rq = bd->rq;
1115         struct rnbd_clt_dev *dev = rq->q->disk->private_data;
1116         struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1117         int err;
1118         blk_status_t ret = BLK_STS_IOERR;
1119
1120         if (dev->dev_state != DEV_STATE_MAPPED)
1121                 return BLK_STS_IOERR;
1122
1123         iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON,
1124                                       RTRS_PERMIT_NOWAIT);
1125         if (!iu->permit) {
1126                 rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY);
1127                 return BLK_STS_RESOURCE;
1128         }
1129
1130         iu->sgt.sgl = iu->first_sgl;
1131         err = sg_alloc_table_chained(&iu->sgt,
1132                                      /* Even-if the request has no segment,
1133                                       * sglist must have one entry at least.
1134                                       */
1135                                      blk_rq_nr_phys_segments(rq) ? : 1,
1136                                      iu->sgt.sgl,
1137                                      RNBD_INLINE_SG_CNT);
1138         if (err) {
1139                 rnbd_clt_err_rl(dev, "sg_alloc_table_chained ret=%d\n", err);
1140                 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1141                 rnbd_put_permit(dev->sess, iu->permit);
1142                 return BLK_STS_RESOURCE;
1143         }
1144
1145         blk_mq_start_request(rq);
1146         err = rnbd_client_xfer_request(dev, rq, iu);
1147         if (err == 0)
1148                 return BLK_STS_OK;
1149         if (err == -EAGAIN || err == -ENOMEM) {
1150                 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1151                 ret = BLK_STS_RESOURCE;
1152         }
1153         sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
1154         rnbd_put_permit(dev->sess, iu->permit);
1155         return ret;
1156 }
1157
1158 static int rnbd_rdma_poll(struct blk_mq_hw_ctx *hctx, struct io_comp_batch *iob)
1159 {
1160         struct rnbd_queue *q = hctx->driver_data;
1161         struct rnbd_clt_dev *dev = q->dev;
1162
1163         return rtrs_clt_rdma_cq_direct(dev->sess->rtrs, hctx->queue_num);
1164 }
1165
1166 static void rnbd_rdma_map_queues(struct blk_mq_tag_set *set)
1167 {
1168         struct rnbd_clt_session *sess = set->driver_data;
1169
1170         /* shared read/write queues */
1171         set->map[HCTX_TYPE_DEFAULT].nr_queues = num_online_cpus();
1172         set->map[HCTX_TYPE_DEFAULT].queue_offset = 0;
1173         set->map[HCTX_TYPE_READ].nr_queues = num_online_cpus();
1174         set->map[HCTX_TYPE_READ].queue_offset = 0;
1175         blk_mq_map_queues(&set->map[HCTX_TYPE_DEFAULT]);
1176         blk_mq_map_queues(&set->map[HCTX_TYPE_READ]);
1177
1178         if (sess->nr_poll_queues) {
1179                 /* dedicated queue for poll */
1180                 set->map[HCTX_TYPE_POLL].nr_queues = sess->nr_poll_queues;
1181                 set->map[HCTX_TYPE_POLL].queue_offset = set->map[HCTX_TYPE_READ].queue_offset +
1182                         set->map[HCTX_TYPE_READ].nr_queues;
1183                 blk_mq_map_queues(&set->map[HCTX_TYPE_POLL]);
1184                 pr_info("[session=%s] mapped %d/%d/%d default/read/poll queues.\n",
1185                         sess->sessname,
1186                         set->map[HCTX_TYPE_DEFAULT].nr_queues,
1187                         set->map[HCTX_TYPE_READ].nr_queues,
1188                         set->map[HCTX_TYPE_POLL].nr_queues);
1189         } else {
1190                 pr_info("[session=%s] mapped %d/%d default/read queues.\n",
1191                         sess->sessname,
1192                         set->map[HCTX_TYPE_DEFAULT].nr_queues,
1193                         set->map[HCTX_TYPE_READ].nr_queues);
1194         }
1195 }
1196
1197 static struct blk_mq_ops rnbd_mq_ops = {
1198         .queue_rq       = rnbd_queue_rq,
1199         .complete       = rnbd_softirq_done_fn,
1200         .map_queues     = rnbd_rdma_map_queues,
1201         .poll           = rnbd_rdma_poll,
1202 };
1203
1204 static int setup_mq_tags(struct rnbd_clt_session *sess)
1205 {
1206         struct blk_mq_tag_set *tag_set = &sess->tag_set;
1207
1208         memset(tag_set, 0, sizeof(*tag_set));
1209         tag_set->ops            = &rnbd_mq_ops;
1210         tag_set->queue_depth    = sess->queue_depth;
1211         tag_set->numa_node              = NUMA_NO_NODE;
1212         tag_set->flags          = BLK_MQ_F_SHOULD_MERGE |
1213                                   BLK_MQ_F_TAG_QUEUE_SHARED;
1214         tag_set->cmd_size       = sizeof(struct rnbd_iu) + RNBD_RDMA_SGL_SIZE;
1215
1216         /* for HCTX_TYPE_DEFAULT, HCTX_TYPE_READ, HCTX_TYPE_POLL */
1217         tag_set->nr_maps        = sess->nr_poll_queues ? HCTX_MAX_TYPES : 2;
1218         /*
1219          * HCTX_TYPE_DEFAULT and HCTX_TYPE_READ share one set of queues
1220          * others are for HCTX_TYPE_POLL
1221          */
1222         tag_set->nr_hw_queues   = num_online_cpus() + sess->nr_poll_queues;
1223         tag_set->driver_data    = sess;
1224
1225         return blk_mq_alloc_tag_set(tag_set);
1226 }
1227
1228 static struct rnbd_clt_session *
1229 find_and_get_or_create_sess(const char *sessname,
1230                             const struct rtrs_addr *paths,
1231                             size_t path_cnt, u16 port_nr, u32 nr_poll_queues)
1232 {
1233         struct rnbd_clt_session *sess;
1234         struct rtrs_attrs attrs;
1235         int err;
1236         bool first = false;
1237         struct rtrs_clt_ops rtrs_ops;
1238
1239         sess = find_or_create_sess(sessname, &first);
1240         if (sess == ERR_PTR(-ENOMEM)) {
1241                 return ERR_PTR(-ENOMEM);
1242         } else if ((nr_poll_queues && !first) ||  (!nr_poll_queues && sess->nr_poll_queues)) {
1243                 /*
1244                  * A device MUST have its own session to use the polling-mode.
1245                  * It must fail to map new device with the same session.
1246                  */
1247                 err = -EINVAL;
1248                 goto put_sess;
1249         }
1250
1251         if (!first)
1252                 return sess;
1253
1254         if (!path_cnt) {
1255                 pr_err("Session %s not found, and path parameter not given", sessname);
1256                 err = -ENXIO;
1257                 goto put_sess;
1258         }
1259
1260         rtrs_ops = (struct rtrs_clt_ops) {
1261                 .priv = sess,
1262                 .link_ev = rnbd_clt_link_ev,
1263         };
1264         /*
1265          * Nothing was found, establish rtrs connection and proceed further.
1266          */
1267         sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
1268                                    paths, path_cnt, port_nr,
1269                                    0, /* Do not use pdu of rtrs */
1270                                    RECONNECT_DELAY,
1271                                    MAX_RECONNECTS, nr_poll_queues);
1272         if (IS_ERR(sess->rtrs)) {
1273                 err = PTR_ERR(sess->rtrs);
1274                 goto wake_up_and_put;
1275         }
1276
1277         err = rtrs_clt_query(sess->rtrs, &attrs);
1278         if (err)
1279                 goto close_rtrs;
1280
1281         sess->max_io_size = attrs.max_io_size;
1282         sess->queue_depth = attrs.queue_depth;
1283         sess->nr_poll_queues = nr_poll_queues;
1284         sess->max_segments = attrs.max_segments;
1285
1286         err = setup_mq_tags(sess);
1287         if (err)
1288                 goto close_rtrs;
1289
1290         err = send_msg_sess_info(sess, RTRS_PERMIT_WAIT);
1291         if (err)
1292                 goto close_rtrs;
1293
1294         wake_up_rtrs_waiters(sess);
1295
1296         return sess;
1297
1298 close_rtrs:
1299         close_rtrs(sess);
1300 put_sess:
1301         rnbd_clt_put_sess(sess);
1302
1303         return ERR_PTR(err);
1304
1305 wake_up_and_put:
1306         wake_up_rtrs_waiters(sess);
1307         goto put_sess;
1308 }
1309
1310 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev,
1311                                        struct rnbd_queue *q,
1312                                        struct blk_mq_hw_ctx *hctx)
1313 {
1314         INIT_LIST_HEAD(&q->requeue_list);
1315         q->dev  = dev;
1316         q->hctx = hctx;
1317 }
1318
1319 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev)
1320 {
1321         unsigned long i;
1322         struct blk_mq_hw_ctx *hctx;
1323         struct rnbd_queue *q;
1324
1325         queue_for_each_hw_ctx(dev->queue, hctx, i) {
1326                 q = &dev->hw_queues[i];
1327                 rnbd_init_hw_queue(dev, q, hctx);
1328                 hctx->driver_data = q;
1329         }
1330 }
1331
1332 static void setup_request_queue(struct rnbd_clt_dev *dev,
1333                                 struct rnbd_msg_open_rsp *rsp)
1334 {
1335         blk_queue_logical_block_size(dev->queue,
1336                                      le16_to_cpu(rsp->logical_block_size));
1337         blk_queue_physical_block_size(dev->queue,
1338                                       le16_to_cpu(rsp->physical_block_size));
1339         blk_queue_max_hw_sectors(dev->queue,
1340                                  dev->sess->max_io_size / SECTOR_SIZE);
1341
1342         /*
1343          * we don't support discards to "discontiguous" segments
1344          * in on request
1345          */
1346         blk_queue_max_discard_segments(dev->queue, 1);
1347
1348         blk_queue_max_discard_sectors(dev->queue,
1349                                       le32_to_cpu(rsp->max_discard_sectors));
1350         dev->queue->limits.discard_granularity =
1351                                         le32_to_cpu(rsp->discard_granularity);
1352         dev->queue->limits.discard_alignment =
1353                                         le32_to_cpu(rsp->discard_alignment);
1354         if (le16_to_cpu(rsp->secure_discard))
1355                 blk_queue_max_secure_erase_sectors(dev->queue,
1356                                         le32_to_cpu(rsp->max_discard_sectors));
1357         blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue);
1358         blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue);
1359         blk_queue_max_segments(dev->queue, dev->sess->max_segments);
1360         blk_queue_io_opt(dev->queue, dev->sess->max_io_size);
1361         blk_queue_virt_boundary(dev->queue, SZ_4K - 1);
1362         blk_queue_write_cache(dev->queue,
1363                               !!(rsp->cache_policy & RNBD_WRITEBACK),
1364                               !!(rsp->cache_policy & RNBD_FUA));
1365         blk_queue_max_write_zeroes_sectors(dev->queue,
1366                                            le32_to_cpu(rsp->max_write_zeroes_sectors));
1367 }
1368
1369 static int rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev,
1370                                    struct rnbd_msg_open_rsp *rsp, int idx)
1371 {
1372         int err;
1373
1374         dev->gd->major          = rnbd_client_major;
1375         dev->gd->first_minor    = idx << RNBD_PART_BITS;
1376         dev->gd->minors         = 1 << RNBD_PART_BITS;
1377         dev->gd->fops           = &rnbd_client_ops;
1378         dev->gd->queue          = dev->queue;
1379         dev->gd->private_data   = dev;
1380         snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d",
1381                  idx);
1382         pr_debug("disk_name=%s, capacity=%llu\n",
1383                  dev->gd->disk_name,
1384                  le64_to_cpu(rsp->nsectors) *
1385                  (le16_to_cpu(rsp->logical_block_size) / SECTOR_SIZE));
1386
1387         set_capacity(dev->gd, le64_to_cpu(rsp->nsectors));
1388
1389         if (dev->access_mode == RNBD_ACCESS_RO)
1390                 set_disk_ro(dev->gd, true);
1391
1392         /*
1393          * Network device does not need rotational
1394          */
1395         blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue);
1396         err = add_disk(dev->gd);
1397         if (err)
1398                 put_disk(dev->gd);
1399
1400         return err;
1401 }
1402
1403 static int rnbd_client_setup_device(struct rnbd_clt_dev *dev,
1404                                     struct rnbd_msg_open_rsp *rsp)
1405 {
1406         int idx = dev->clt_device_id;
1407
1408         dev->size = le64_to_cpu(rsp->nsectors) *
1409                         le16_to_cpu(rsp->logical_block_size);
1410
1411         dev->gd = blk_mq_alloc_disk(&dev->sess->tag_set, dev);
1412         if (IS_ERR(dev->gd))
1413                 return PTR_ERR(dev->gd);
1414         dev->queue = dev->gd->queue;
1415         rnbd_init_mq_hw_queues(dev);
1416
1417         setup_request_queue(dev, rsp);
1418         return rnbd_clt_setup_gen_disk(dev, rsp, idx);
1419 }
1420
1421 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess,
1422                                       enum rnbd_access_mode access_mode,
1423                                       const char *pathname,
1424                                       u32 nr_poll_queues)
1425 {
1426         struct rnbd_clt_dev *dev;
1427         int ret;
1428
1429         dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE);
1430         if (!dev)
1431                 return ERR_PTR(-ENOMEM);
1432
1433         /*
1434          * nr_cpu_ids: the number of softirq queues
1435          * nr_poll_queues: the number of polling queues
1436          */
1437         dev->hw_queues = kcalloc(nr_cpu_ids + nr_poll_queues,
1438                                  sizeof(*dev->hw_queues),
1439                                  GFP_KERNEL);
1440         if (!dev->hw_queues) {
1441                 ret = -ENOMEM;
1442                 goto out_alloc;
1443         }
1444
1445         ret = ida_alloc_max(&index_ida, (1 << (MINORBITS - RNBD_PART_BITS)) - 1,
1446                             GFP_KERNEL);
1447         if (ret < 0) {
1448                 pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
1449                        pathname, sess->sessname, ret);
1450                 goto out_queues;
1451         }
1452
1453         dev->pathname = kstrdup(pathname, GFP_KERNEL);
1454         if (!dev->pathname) {
1455                 ret = -ENOMEM;
1456                 goto out_queues;
1457         }
1458
1459         dev->clt_device_id      = ret;
1460         dev->sess               = sess;
1461         dev->access_mode        = access_mode;
1462         dev->nr_poll_queues     = nr_poll_queues;
1463         mutex_init(&dev->lock);
1464         refcount_set(&dev->refcount, 1);
1465         dev->dev_state = DEV_STATE_INIT;
1466
1467         /*
1468          * Here we called from sysfs entry, thus clt-sysfs is
1469          * responsible that session will not disappear.
1470          */
1471         WARN_ON(!rnbd_clt_get_sess(sess));
1472
1473         return dev;
1474
1475 out_queues:
1476         kfree(dev->hw_queues);
1477 out_alloc:
1478         kfree(dev);
1479         return ERR_PTR(ret);
1480 }
1481
1482 static bool __exists_dev(const char *pathname, const char *sessname)
1483 {
1484         struct rnbd_clt_session *sess;
1485         struct rnbd_clt_dev *dev;
1486         bool found = false;
1487
1488         list_for_each_entry(sess, &sess_list, list) {
1489                 if (sessname && strncmp(sess->sessname, sessname,
1490                                         sizeof(sess->sessname)))
1491                         continue;
1492                 mutex_lock(&sess->lock);
1493                 list_for_each_entry(dev, &sess->devs_list, list) {
1494                         if (strlen(dev->pathname) == strlen(pathname) &&
1495                             !strcmp(dev->pathname, pathname)) {
1496                                 found = true;
1497                                 break;
1498                         }
1499                 }
1500                 mutex_unlock(&sess->lock);
1501                 if (found)
1502                         break;
1503         }
1504
1505         return found;
1506 }
1507
1508 static bool exists_devpath(const char *pathname, const char *sessname)
1509 {
1510         bool found;
1511
1512         mutex_lock(&sess_lock);
1513         found = __exists_dev(pathname, sessname);
1514         mutex_unlock(&sess_lock);
1515
1516         return found;
1517 }
1518
1519 static bool insert_dev_if_not_exists_devpath(struct rnbd_clt_dev *dev)
1520 {
1521         bool found;
1522         struct rnbd_clt_session *sess = dev->sess;
1523
1524         mutex_lock(&sess_lock);
1525         found = __exists_dev(dev->pathname, sess->sessname);
1526         if (!found) {
1527                 mutex_lock(&sess->lock);
1528                 list_add_tail(&dev->list, &sess->devs_list);
1529                 mutex_unlock(&sess->lock);
1530         }
1531         mutex_unlock(&sess_lock);
1532
1533         return found;
1534 }
1535
1536 static void delete_dev(struct rnbd_clt_dev *dev)
1537 {
1538         struct rnbd_clt_session *sess = dev->sess;
1539
1540         mutex_lock(&sess->lock);
1541         list_del(&dev->list);
1542         mutex_unlock(&sess->lock);
1543 }
1544
1545 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname,
1546                                            struct rtrs_addr *paths,
1547                                            size_t path_cnt, u16 port_nr,
1548                                            const char *pathname,
1549                                            enum rnbd_access_mode access_mode,
1550                                            u32 nr_poll_queues)
1551 {
1552         struct rnbd_clt_session *sess;
1553         struct rnbd_clt_dev *dev;
1554         int ret, errno;
1555         struct rnbd_msg_open_rsp *rsp;
1556         struct rnbd_msg_open msg;
1557         struct rnbd_iu *iu;
1558         struct kvec vec = {
1559                 .iov_base = &msg,
1560                 .iov_len  = sizeof(msg)
1561         };
1562
1563         if (exists_devpath(pathname, sessname))
1564                 return ERR_PTR(-EEXIST);
1565
1566         sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr, nr_poll_queues);
1567         if (IS_ERR(sess))
1568                 return ERR_CAST(sess);
1569
1570         dev = init_dev(sess, access_mode, pathname, nr_poll_queues);
1571         if (IS_ERR(dev)) {
1572                 pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %pe\n",
1573                        pathname, sess->sessname, dev);
1574                 ret = PTR_ERR(dev);
1575                 goto put_sess;
1576         }
1577         if (insert_dev_if_not_exists_devpath(dev)) {
1578                 ret = -EEXIST;
1579                 goto put_dev;
1580         }
1581
1582         rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
1583         if (!rsp) {
1584                 ret = -ENOMEM;
1585                 goto del_dev;
1586         }
1587
1588         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
1589         if (!iu) {
1590                 ret = -ENOMEM;
1591                 kfree(rsp);
1592                 goto del_dev;
1593         }
1594         iu->buf = rsp;
1595         iu->dev = dev;
1596         sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
1597
1598         msg.hdr.type    = cpu_to_le16(RNBD_MSG_OPEN);
1599         msg.access_mode = dev->access_mode;
1600         strscpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name));
1601
1602         WARN_ON(!rnbd_clt_get_dev(dev));
1603         ret = send_usr_msg(sess->rtrs, READ, iu,
1604                            &vec, sizeof(*rsp), iu->sgt.sgl, 1,
1605                            msg_open_conf, &errno, RTRS_PERMIT_WAIT);
1606         if (ret) {
1607                 rnbd_clt_put_dev(dev);
1608                 rnbd_put_iu(sess, iu);
1609         } else {
1610                 ret = errno;
1611         }
1612         if (ret) {
1613                 rnbd_clt_err(dev,
1614                               "map_device: failed, can't open remote device, err: %d\n",
1615                               ret);
1616                 goto put_iu;
1617         }
1618         mutex_lock(&dev->lock);
1619         pr_debug("Opened remote device: session=%s, path='%s'\n",
1620                  sess->sessname, pathname);
1621         ret = rnbd_client_setup_device(dev, rsp);
1622         if (ret) {
1623                 rnbd_clt_err(dev,
1624                               "map_device: Failed to configure device, err: %d\n",
1625                               ret);
1626                 mutex_unlock(&dev->lock);
1627                 goto send_close;
1628         }
1629
1630         rnbd_clt_info(dev,
1631                        "map_device: Device mapped as %s (nsectors: %llu, logical_block_size: %d, physical_block_size: %d, max_write_zeroes_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, wc: %d, fua: %d)\n",
1632                        dev->gd->disk_name, le64_to_cpu(rsp->nsectors),
1633                        le16_to_cpu(rsp->logical_block_size),
1634                        le16_to_cpu(rsp->physical_block_size),
1635                        le32_to_cpu(rsp->max_write_zeroes_sectors),
1636                        le32_to_cpu(rsp->max_discard_sectors),
1637                        le32_to_cpu(rsp->discard_granularity),
1638                        le32_to_cpu(rsp->discard_alignment),
1639                        le16_to_cpu(rsp->secure_discard),
1640                        sess->max_segments, sess->max_io_size / SECTOR_SIZE,
1641                        !!(rsp->cache_policy & RNBD_WRITEBACK),
1642                        !!(rsp->cache_policy & RNBD_FUA));
1643
1644         mutex_unlock(&dev->lock);
1645         kfree(rsp);
1646         rnbd_put_iu(sess, iu);
1647         rnbd_clt_put_sess(sess);
1648
1649         return dev;
1650
1651 send_close:
1652         send_msg_close(dev, dev->device_id, RTRS_PERMIT_WAIT);
1653 put_iu:
1654         kfree(rsp);
1655         rnbd_put_iu(sess, iu);
1656 del_dev:
1657         delete_dev(dev);
1658 put_dev:
1659         rnbd_clt_put_dev(dev);
1660 put_sess:
1661         rnbd_clt_put_sess(sess);
1662
1663         return ERR_PTR(ret);
1664 }
1665
1666 static void destroy_gen_disk(struct rnbd_clt_dev *dev)
1667 {
1668         del_gendisk(dev->gd);
1669         put_disk(dev->gd);
1670 }
1671
1672 static void destroy_sysfs(struct rnbd_clt_dev *dev,
1673                           const struct attribute *sysfs_self)
1674 {
1675         rnbd_clt_remove_dev_symlink(dev);
1676         if (dev->kobj.state_initialized) {
1677                 if (sysfs_self)
1678                         /* To avoid deadlock firstly remove itself */
1679                         sysfs_remove_file_self(&dev->kobj, sysfs_self);
1680                 kobject_del(&dev->kobj);
1681                 kobject_put(&dev->kobj);
1682         }
1683 }
1684
1685 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force,
1686                            const struct attribute *sysfs_self)
1687 {
1688         struct rnbd_clt_session *sess = dev->sess;
1689         int refcount, ret = 0;
1690         bool was_mapped;
1691
1692         mutex_lock(&dev->lock);
1693         if (dev->dev_state == DEV_STATE_UNMAPPED) {
1694                 rnbd_clt_info(dev, "Device is already being unmapped\n");
1695                 ret = -EALREADY;
1696                 goto err;
1697         }
1698         refcount = refcount_read(&dev->refcount);
1699         if (!force && refcount > 1) {
1700                 rnbd_clt_err(dev,
1701                               "Closing device failed, device is in use, (%d device users)\n",
1702                               refcount - 1);
1703                 ret = -EBUSY;
1704                 goto err;
1705         }
1706         was_mapped = (dev->dev_state == DEV_STATE_MAPPED);
1707         dev->dev_state = DEV_STATE_UNMAPPED;
1708         mutex_unlock(&dev->lock);
1709
1710         delete_dev(dev);
1711         destroy_sysfs(dev, sysfs_self);
1712         destroy_gen_disk(dev);
1713         if (was_mapped && sess->rtrs)
1714                 send_msg_close(dev, dev->device_id, RTRS_PERMIT_WAIT);
1715
1716         rnbd_clt_info(dev, "Device is unmapped\n");
1717
1718         /* Likely last reference put */
1719         rnbd_clt_put_dev(dev);
1720
1721         /*
1722          * Here device and session can be vanished!
1723          */
1724
1725         return 0;
1726 err:
1727         mutex_unlock(&dev->lock);
1728
1729         return ret;
1730 }
1731
1732 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev)
1733 {
1734         int err;
1735
1736         mutex_lock(&dev->lock);
1737         if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED)
1738                 err = 0;
1739         else if (dev->dev_state == DEV_STATE_UNMAPPED)
1740                 err = -ENODEV;
1741         else if (dev->dev_state == DEV_STATE_MAPPED)
1742                 err = -EALREADY;
1743         else
1744                 err = -EBUSY;
1745         mutex_unlock(&dev->lock);
1746         if (!err) {
1747                 rnbd_clt_info(dev, "Remapping device.\n");
1748                 err = send_msg_open(dev, RTRS_PERMIT_WAIT);
1749                 if (err)
1750                         rnbd_clt_err(dev, "remap_device: %d\n", err);
1751         }
1752
1753         return err;
1754 }
1755
1756 static void unmap_device_work(struct work_struct *work)
1757 {
1758         struct rnbd_clt_dev *dev;
1759
1760         dev = container_of(work, typeof(*dev), unmap_on_rmmod_work);
1761         rnbd_clt_unmap_device(dev, true, NULL);
1762 }
1763
1764 static void rnbd_destroy_sessions(void)
1765 {
1766         struct rnbd_clt_session *sess, *sn;
1767         struct rnbd_clt_dev *dev, *tn;
1768
1769         /* Firstly forbid access through sysfs interface */
1770         rnbd_clt_destroy_sysfs_files();
1771
1772         /*
1773          * Here at this point there is no any concurrent access to sessions
1774          * list and devices list:
1775          *   1. New session or device can't be created - session sysfs files
1776          *      are removed.
1777          *   2. Device or session can't be removed - module reference is taken
1778          *      into account in unmap device sysfs callback.
1779          *   3. No IO requests inflight - each file open of block_dev increases
1780          *      module reference in get_disk().
1781          *
1782          * But still there can be user requests inflights, which are sent by
1783          * asynchronous send_msg_*() functions, thus before unmapping devices
1784          * RTRS session must be explicitly closed.
1785          */
1786
1787         list_for_each_entry_safe(sess, sn, &sess_list, list) {
1788                 if (!rnbd_clt_get_sess(sess))
1789                         continue;
1790                 close_rtrs(sess);
1791                 list_for_each_entry_safe(dev, tn, &sess->devs_list, list) {
1792                         /*
1793                          * Here unmap happens in parallel for only one reason:
1794                          * del_gendisk() takes around half a second, so
1795                          * on huge amount of devices the whole module unload
1796                          * procedure takes minutes.
1797                          */
1798                         INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work);
1799                         queue_work(rnbd_clt_wq, &dev->unmap_on_rmmod_work);
1800                 }
1801                 rnbd_clt_put_sess(sess);
1802         }
1803         /* Wait for all scheduled unmap works */
1804         flush_workqueue(rnbd_clt_wq);
1805         WARN_ON(!list_empty(&sess_list));
1806 }
1807
1808 static int __init rnbd_client_init(void)
1809 {
1810         int err = 0;
1811
1812         BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4);
1813         BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36);
1814         BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36);
1815         BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264);
1816         BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8);
1817         BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56);
1818         rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd");
1819         if (rnbd_client_major <= 0) {
1820                 pr_err("Failed to load module, block device registration failed\n");
1821                 return -EBUSY;
1822         }
1823
1824         err = rnbd_clt_create_sysfs_files();
1825         if (err) {
1826                 pr_err("Failed to load module, creating sysfs device files failed, err: %d\n",
1827                        err);
1828                 unregister_blkdev(rnbd_client_major, "rnbd");
1829                 return err;
1830         }
1831         rnbd_clt_wq = alloc_workqueue("rnbd_clt_wq", 0, 0);
1832         if (!rnbd_clt_wq) {
1833                 pr_err("Failed to load module, alloc_workqueue failed.\n");
1834                 rnbd_clt_destroy_sysfs_files();
1835                 unregister_blkdev(rnbd_client_major, "rnbd");
1836                 err = -ENOMEM;
1837         }
1838
1839         return err;
1840 }
1841
1842 static void __exit rnbd_client_exit(void)
1843 {
1844         rnbd_destroy_sessions();
1845         unregister_blkdev(rnbd_client_major, "rnbd");
1846         ida_destroy(&index_ida);
1847         destroy_workqueue(rnbd_clt_wq);
1848 }
1849
1850 module_init(rnbd_client_init);
1851 module_exit(rnbd_client_exit);