Merge tag 'pinctrl-v5.1-1' of git://git.kernel.org/pub/scm/linux/kernel/git/linusw...
[sfrench/cifs-2.6.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/parser.h>
38 #include <linux/bsearch.h>
39
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
44 #include <linux/fs.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
49
50 #include "rbd_types.h"
51
52 #define RBD_DEBUG       /* Activate rbd_assert() calls */
53
54 /*
55  * Increment the given counter and return its updated value.
56  * If the counter is already 0 it will not be incremented.
57  * If the counter is already at its maximum value returns
58  * -EINVAL without updating it.
59  */
60 static int atomic_inc_return_safe(atomic_t *v)
61 {
62         unsigned int counter;
63
64         counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65         if (counter <= (unsigned int)INT_MAX)
66                 return (int)counter;
67
68         atomic_dec(v);
69
70         return -EINVAL;
71 }
72
73 /* Decrement the counter.  Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
75 {
76         int counter;
77
78         counter = atomic_dec_return(v);
79         if (counter >= 0)
80                 return counter;
81
82         atomic_inc(v);
83
84         return -EINVAL;
85 }
86
87 #define RBD_DRV_NAME "rbd"
88
89 #define RBD_MINORS_PER_MAJOR            256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
91
92 #define RBD_MAX_PARENT_CHAIN_LEN        16
93
94 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN   \
96                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
99
100 #define RBD_SNAP_HEAD_NAME      "-"
101
102 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
103
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX    64
107
108 #define RBD_OBJ_PREFIX_LEN_MAX  64
109
110 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
111 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
112
113 /* Feature bits */
114
115 #define RBD_FEATURE_LAYERING            (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
118 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
119 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
120
121 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
122                                  RBD_FEATURE_STRIPINGV2 |       \
123                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
124                                  RBD_FEATURE_DATA_POOL |        \
125                                  RBD_FEATURE_OPERATIONS)
126
127 /* Features supported by this (client software) implementation. */
128
129 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
130
131 /*
132  * An RBD device name will be "rbd#", where the "rbd" comes from
133  * RBD_DRV_NAME above, and # is a unique integer identifier.
134  */
135 #define DEV_NAME_LEN            32
136
137 /*
138  * block device image metadata (in-memory version)
139  */
140 struct rbd_image_header {
141         /* These six fields never change for a given rbd image */
142         char *object_prefix;
143         __u8 obj_order;
144         u64 stripe_unit;
145         u64 stripe_count;
146         s64 data_pool_id;
147         u64 features;           /* Might be changeable someday? */
148
149         /* The remaining fields need to be updated occasionally */
150         u64 image_size;
151         struct ceph_snap_context *snapc;
152         char *snap_names;       /* format 1 only */
153         u64 *snap_sizes;        /* format 1 only */
154 };
155
156 /*
157  * An rbd image specification.
158  *
159  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
160  * identify an image.  Each rbd_dev structure includes a pointer to
161  * an rbd_spec structure that encapsulates this identity.
162  *
163  * Each of the id's in an rbd_spec has an associated name.  For a
164  * user-mapped image, the names are supplied and the id's associated
165  * with them are looked up.  For a layered image, a parent image is
166  * defined by the tuple, and the names are looked up.
167  *
168  * An rbd_dev structure contains a parent_spec pointer which is
169  * non-null if the image it represents is a child in a layered
170  * image.  This pointer will refer to the rbd_spec structure used
171  * by the parent rbd_dev for its own identity (i.e., the structure
172  * is shared between the parent and child).
173  *
174  * Since these structures are populated once, during the discovery
175  * phase of image construction, they are effectively immutable so
176  * we make no effort to synchronize access to them.
177  *
178  * Note that code herein does not assume the image name is known (it
179  * could be a null pointer).
180  */
181 struct rbd_spec {
182         u64             pool_id;
183         const char      *pool_name;
184         const char      *pool_ns;       /* NULL if default, never "" */
185
186         const char      *image_id;
187         const char      *image_name;
188
189         u64             snap_id;
190         const char      *snap_name;
191
192         struct kref     kref;
193 };
194
195 /*
196  * an instance of the client.  multiple devices may share an rbd client.
197  */
198 struct rbd_client {
199         struct ceph_client      *client;
200         struct kref             kref;
201         struct list_head        node;
202 };
203
204 struct rbd_img_request;
205
206 enum obj_request_type {
207         OBJ_REQUEST_NODATA = 1,
208         OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
209         OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
210         OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
211 };
212
213 enum obj_operation_type {
214         OBJ_OP_READ = 1,
215         OBJ_OP_WRITE,
216         OBJ_OP_DISCARD,
217 };
218
219 /*
220  * Writes go through the following state machine to deal with
221  * layering:
222  *
223  *                       need copyup
224  * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
225  *        |     ^                              |
226  *        v     \------------------------------/
227  *      done
228  *        ^
229  *        |
230  * RBD_OBJ_WRITE_FLAT
231  *
232  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
233  * there is a parent or not.
234  */
235 enum rbd_obj_write_state {
236         RBD_OBJ_WRITE_FLAT = 1,
237         RBD_OBJ_WRITE_GUARD,
238         RBD_OBJ_WRITE_COPYUP,
239 };
240
241 struct rbd_obj_request {
242         struct ceph_object_extent ex;
243         union {
244                 bool                    tried_parent;   /* for reads */
245                 enum rbd_obj_write_state write_state;   /* for writes */
246         };
247
248         struct rbd_img_request  *img_request;
249         struct ceph_file_extent *img_extents;
250         u32                     num_img_extents;
251
252         union {
253                 struct ceph_bio_iter    bio_pos;
254                 struct {
255                         struct ceph_bvec_iter   bvec_pos;
256                         u32                     bvec_count;
257                         u32                     bvec_idx;
258                 };
259         };
260         struct bio_vec          *copyup_bvecs;
261         u32                     copyup_bvec_count;
262
263         struct ceph_osd_request *osd_req;
264
265         u64                     xferred;        /* bytes transferred */
266         int                     result;
267
268         struct kref             kref;
269 };
270
271 enum img_req_flags {
272         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
273         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
274 };
275
276 struct rbd_img_request {
277         struct rbd_device       *rbd_dev;
278         enum obj_operation_type op_type;
279         enum obj_request_type   data_type;
280         unsigned long           flags;
281         union {
282                 u64                     snap_id;        /* for reads */
283                 struct ceph_snap_context *snapc;        /* for writes */
284         };
285         union {
286                 struct request          *rq;            /* block request */
287                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
288         };
289         spinlock_t              completion_lock;
290         u64                     xferred;/* aggregate bytes transferred */
291         int                     result; /* first nonzero obj_request result */
292
293         struct list_head        object_extents; /* obj_req.ex structs */
294         u32                     obj_request_count;
295         u32                     pending_count;
296
297         struct kref             kref;
298 };
299
300 #define for_each_obj_request(ireq, oreq) \
301         list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
302 #define for_each_obj_request_safe(ireq, oreq, n) \
303         list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
304
305 enum rbd_watch_state {
306         RBD_WATCH_STATE_UNREGISTERED,
307         RBD_WATCH_STATE_REGISTERED,
308         RBD_WATCH_STATE_ERROR,
309 };
310
311 enum rbd_lock_state {
312         RBD_LOCK_STATE_UNLOCKED,
313         RBD_LOCK_STATE_LOCKED,
314         RBD_LOCK_STATE_RELEASING,
315 };
316
317 /* WatchNotify::ClientId */
318 struct rbd_client_id {
319         u64 gid;
320         u64 handle;
321 };
322
323 struct rbd_mapping {
324         u64                     size;
325         u64                     features;
326 };
327
328 /*
329  * a single device
330  */
331 struct rbd_device {
332         int                     dev_id;         /* blkdev unique id */
333
334         int                     major;          /* blkdev assigned major */
335         int                     minor;
336         struct gendisk          *disk;          /* blkdev's gendisk and rq */
337
338         u32                     image_format;   /* Either 1 or 2 */
339         struct rbd_client       *rbd_client;
340
341         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342
343         spinlock_t              lock;           /* queue, flags, open_count */
344
345         struct rbd_image_header header;
346         unsigned long           flags;          /* possibly lock protected */
347         struct rbd_spec         *spec;
348         struct rbd_options      *opts;
349         char                    *config_info;   /* add{,_single_major} string */
350
351         struct ceph_object_id   header_oid;
352         struct ceph_object_locator header_oloc;
353
354         struct ceph_file_layout layout;         /* used for all rbd requests */
355
356         struct mutex            watch_mutex;
357         enum rbd_watch_state    watch_state;
358         struct ceph_osd_linger_request *watch_handle;
359         u64                     watch_cookie;
360         struct delayed_work     watch_dwork;
361
362         struct rw_semaphore     lock_rwsem;
363         enum rbd_lock_state     lock_state;
364         char                    lock_cookie[32];
365         struct rbd_client_id    owner_cid;
366         struct work_struct      acquired_lock_work;
367         struct work_struct      released_lock_work;
368         struct delayed_work     lock_dwork;
369         struct work_struct      unlock_work;
370         wait_queue_head_t       lock_waitq;
371
372         struct workqueue_struct *task_wq;
373
374         struct rbd_spec         *parent_spec;
375         u64                     parent_overlap;
376         atomic_t                parent_ref;
377         struct rbd_device       *parent;
378
379         /* Block layer tags. */
380         struct blk_mq_tag_set   tag_set;
381
382         /* protects updating the header */
383         struct rw_semaphore     header_rwsem;
384
385         struct rbd_mapping      mapping;
386
387         struct list_head        node;
388
389         /* sysfs related */
390         struct device           dev;
391         unsigned long           open_count;     /* protected by lock */
392 };
393
394 /*
395  * Flag bits for rbd_dev->flags:
396  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
397  *   by rbd_dev->lock
398  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
399  */
400 enum rbd_dev_flags {
401         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
402         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
403         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
404 };
405
406 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
407
408 static LIST_HEAD(rbd_dev_list);    /* devices */
409 static DEFINE_SPINLOCK(rbd_dev_list_lock);
410
411 static LIST_HEAD(rbd_client_list);              /* clients */
412 static DEFINE_SPINLOCK(rbd_client_list_lock);
413
414 /* Slab caches for frequently-allocated structures */
415
416 static struct kmem_cache        *rbd_img_request_cache;
417 static struct kmem_cache        *rbd_obj_request_cache;
418
419 static int rbd_major;
420 static DEFINE_IDA(rbd_dev_id_ida);
421
422 static struct workqueue_struct *rbd_wq;
423
424 /*
425  * single-major requires >= 0.75 version of userspace rbd utility.
426  */
427 static bool single_major = true;
428 module_param(single_major, bool, 0444);
429 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
430
431 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
432 static ssize_t remove_store(struct bus_type *bus, const char *buf,
433                             size_t count);
434 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
435                                       size_t count);
436 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
437                                          size_t count);
438 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
439
440 static int rbd_dev_id_to_minor(int dev_id)
441 {
442         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
443 }
444
445 static int minor_to_rbd_dev_id(int minor)
446 {
447         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
448 }
449
450 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
451 {
452         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
453                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
454 }
455
456 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
457 {
458         bool is_lock_owner;
459
460         down_read(&rbd_dev->lock_rwsem);
461         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
462         up_read(&rbd_dev->lock_rwsem);
463         return is_lock_owner;
464 }
465
466 static ssize_t supported_features_show(struct bus_type *bus, char *buf)
467 {
468         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
469 }
470
471 static BUS_ATTR_WO(add);
472 static BUS_ATTR_WO(remove);
473 static BUS_ATTR_WO(add_single_major);
474 static BUS_ATTR_WO(remove_single_major);
475 static BUS_ATTR_RO(supported_features);
476
477 static struct attribute *rbd_bus_attrs[] = {
478         &bus_attr_add.attr,
479         &bus_attr_remove.attr,
480         &bus_attr_add_single_major.attr,
481         &bus_attr_remove_single_major.attr,
482         &bus_attr_supported_features.attr,
483         NULL,
484 };
485
486 static umode_t rbd_bus_is_visible(struct kobject *kobj,
487                                   struct attribute *attr, int index)
488 {
489         if (!single_major &&
490             (attr == &bus_attr_add_single_major.attr ||
491              attr == &bus_attr_remove_single_major.attr))
492                 return 0;
493
494         return attr->mode;
495 }
496
497 static const struct attribute_group rbd_bus_group = {
498         .attrs = rbd_bus_attrs,
499         .is_visible = rbd_bus_is_visible,
500 };
501 __ATTRIBUTE_GROUPS(rbd_bus);
502
503 static struct bus_type rbd_bus_type = {
504         .name           = "rbd",
505         .bus_groups     = rbd_bus_groups,
506 };
507
508 static void rbd_root_dev_release(struct device *dev)
509 {
510 }
511
512 static struct device rbd_root_dev = {
513         .init_name =    "rbd",
514         .release =      rbd_root_dev_release,
515 };
516
517 static __printf(2, 3)
518 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
519 {
520         struct va_format vaf;
521         va_list args;
522
523         va_start(args, fmt);
524         vaf.fmt = fmt;
525         vaf.va = &args;
526
527         if (!rbd_dev)
528                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
529         else if (rbd_dev->disk)
530                 printk(KERN_WARNING "%s: %s: %pV\n",
531                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
532         else if (rbd_dev->spec && rbd_dev->spec->image_name)
533                 printk(KERN_WARNING "%s: image %s: %pV\n",
534                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
535         else if (rbd_dev->spec && rbd_dev->spec->image_id)
536                 printk(KERN_WARNING "%s: id %s: %pV\n",
537                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
538         else    /* punt */
539                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
540                         RBD_DRV_NAME, rbd_dev, &vaf);
541         va_end(args);
542 }
543
544 #ifdef RBD_DEBUG
545 #define rbd_assert(expr)                                                \
546                 if (unlikely(!(expr))) {                                \
547                         printk(KERN_ERR "\nAssertion failure in %s() "  \
548                                                 "at line %d:\n\n"       \
549                                         "\trbd_assert(%s);\n\n",        \
550                                         __func__, __LINE__, #expr);     \
551                         BUG();                                          \
552                 }
553 #else /* !RBD_DEBUG */
554 #  define rbd_assert(expr)      ((void) 0)
555 #endif /* !RBD_DEBUG */
556
557 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
558
559 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
560 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
561 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
562 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
563 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
564                                         u64 snap_id);
565 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
566                                 u8 *order, u64 *snap_size);
567 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
568                 u64 *snap_features);
569
570 static int rbd_open(struct block_device *bdev, fmode_t mode)
571 {
572         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
573         bool removing = false;
574
575         spin_lock_irq(&rbd_dev->lock);
576         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
577                 removing = true;
578         else
579                 rbd_dev->open_count++;
580         spin_unlock_irq(&rbd_dev->lock);
581         if (removing)
582                 return -ENOENT;
583
584         (void) get_device(&rbd_dev->dev);
585
586         return 0;
587 }
588
589 static void rbd_release(struct gendisk *disk, fmode_t mode)
590 {
591         struct rbd_device *rbd_dev = disk->private_data;
592         unsigned long open_count_before;
593
594         spin_lock_irq(&rbd_dev->lock);
595         open_count_before = rbd_dev->open_count--;
596         spin_unlock_irq(&rbd_dev->lock);
597         rbd_assert(open_count_before > 0);
598
599         put_device(&rbd_dev->dev);
600 }
601
602 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
603 {
604         int ro;
605
606         if (get_user(ro, (int __user *)arg))
607                 return -EFAULT;
608
609         /* Snapshots can't be marked read-write */
610         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
611                 return -EROFS;
612
613         /* Let blkdev_roset() handle it */
614         return -ENOTTY;
615 }
616
617 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
618                         unsigned int cmd, unsigned long arg)
619 {
620         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
621         int ret;
622
623         switch (cmd) {
624         case BLKROSET:
625                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
626                 break;
627         default:
628                 ret = -ENOTTY;
629         }
630
631         return ret;
632 }
633
634 #ifdef CONFIG_COMPAT
635 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
636                                 unsigned int cmd, unsigned long arg)
637 {
638         return rbd_ioctl(bdev, mode, cmd, arg);
639 }
640 #endif /* CONFIG_COMPAT */
641
642 static const struct block_device_operations rbd_bd_ops = {
643         .owner                  = THIS_MODULE,
644         .open                   = rbd_open,
645         .release                = rbd_release,
646         .ioctl                  = rbd_ioctl,
647 #ifdef CONFIG_COMPAT
648         .compat_ioctl           = rbd_compat_ioctl,
649 #endif
650 };
651
652 /*
653  * Initialize an rbd client instance.  Success or not, this function
654  * consumes ceph_opts.  Caller holds client_mutex.
655  */
656 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
657 {
658         struct rbd_client *rbdc;
659         int ret = -ENOMEM;
660
661         dout("%s:\n", __func__);
662         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
663         if (!rbdc)
664                 goto out_opt;
665
666         kref_init(&rbdc->kref);
667         INIT_LIST_HEAD(&rbdc->node);
668
669         rbdc->client = ceph_create_client(ceph_opts, rbdc);
670         if (IS_ERR(rbdc->client))
671                 goto out_rbdc;
672         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
673
674         ret = ceph_open_session(rbdc->client);
675         if (ret < 0)
676                 goto out_client;
677
678         spin_lock(&rbd_client_list_lock);
679         list_add_tail(&rbdc->node, &rbd_client_list);
680         spin_unlock(&rbd_client_list_lock);
681
682         dout("%s: rbdc %p\n", __func__, rbdc);
683
684         return rbdc;
685 out_client:
686         ceph_destroy_client(rbdc->client);
687 out_rbdc:
688         kfree(rbdc);
689 out_opt:
690         if (ceph_opts)
691                 ceph_destroy_options(ceph_opts);
692         dout("%s: error %d\n", __func__, ret);
693
694         return ERR_PTR(ret);
695 }
696
697 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
698 {
699         kref_get(&rbdc->kref);
700
701         return rbdc;
702 }
703
704 /*
705  * Find a ceph client with specific addr and configuration.  If
706  * found, bump its reference count.
707  */
708 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
709 {
710         struct rbd_client *client_node;
711         bool found = false;
712
713         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
714                 return NULL;
715
716         spin_lock(&rbd_client_list_lock);
717         list_for_each_entry(client_node, &rbd_client_list, node) {
718                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
719                         __rbd_get_client(client_node);
720
721                         found = true;
722                         break;
723                 }
724         }
725         spin_unlock(&rbd_client_list_lock);
726
727         return found ? client_node : NULL;
728 }
729
730 /*
731  * (Per device) rbd map options
732  */
733 enum {
734         Opt_queue_depth,
735         Opt_lock_timeout,
736         Opt_last_int,
737         /* int args above */
738         Opt_pool_ns,
739         Opt_last_string,
740         /* string args above */
741         Opt_read_only,
742         Opt_read_write,
743         Opt_lock_on_read,
744         Opt_exclusive,
745         Opt_notrim,
746         Opt_err
747 };
748
749 static match_table_t rbd_opts_tokens = {
750         {Opt_queue_depth, "queue_depth=%d"},
751         {Opt_lock_timeout, "lock_timeout=%d"},
752         /* int args above */
753         {Opt_pool_ns, "_pool_ns=%s"},
754         /* string args above */
755         {Opt_read_only, "read_only"},
756         {Opt_read_only, "ro"},          /* Alternate spelling */
757         {Opt_read_write, "read_write"},
758         {Opt_read_write, "rw"},         /* Alternate spelling */
759         {Opt_lock_on_read, "lock_on_read"},
760         {Opt_exclusive, "exclusive"},
761         {Opt_notrim, "notrim"},
762         {Opt_err, NULL}
763 };
764
765 struct rbd_options {
766         int     queue_depth;
767         unsigned long   lock_timeout;
768         bool    read_only;
769         bool    lock_on_read;
770         bool    exclusive;
771         bool    trim;
772 };
773
774 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
775 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
776 #define RBD_READ_ONLY_DEFAULT   false
777 #define RBD_LOCK_ON_READ_DEFAULT false
778 #define RBD_EXCLUSIVE_DEFAULT   false
779 #define RBD_TRIM_DEFAULT        true
780
781 struct parse_rbd_opts_ctx {
782         struct rbd_spec         *spec;
783         struct rbd_options      *opts;
784 };
785
786 static int parse_rbd_opts_token(char *c, void *private)
787 {
788         struct parse_rbd_opts_ctx *pctx = private;
789         substring_t argstr[MAX_OPT_ARGS];
790         int token, intval, ret;
791
792         token = match_token(c, rbd_opts_tokens, argstr);
793         if (token < Opt_last_int) {
794                 ret = match_int(&argstr[0], &intval);
795                 if (ret < 0) {
796                         pr_err("bad option arg (not int) at '%s'\n", c);
797                         return ret;
798                 }
799                 dout("got int token %d val %d\n", token, intval);
800         } else if (token > Opt_last_int && token < Opt_last_string) {
801                 dout("got string token %d val %s\n", token, argstr[0].from);
802         } else {
803                 dout("got token %d\n", token);
804         }
805
806         switch (token) {
807         case Opt_queue_depth:
808                 if (intval < 1) {
809                         pr_err("queue_depth out of range\n");
810                         return -EINVAL;
811                 }
812                 pctx->opts->queue_depth = intval;
813                 break;
814         case Opt_lock_timeout:
815                 /* 0 is "wait forever" (i.e. infinite timeout) */
816                 if (intval < 0 || intval > INT_MAX / 1000) {
817                         pr_err("lock_timeout out of range\n");
818                         return -EINVAL;
819                 }
820                 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
821                 break;
822         case Opt_pool_ns:
823                 kfree(pctx->spec->pool_ns);
824                 pctx->spec->pool_ns = match_strdup(argstr);
825                 if (!pctx->spec->pool_ns)
826                         return -ENOMEM;
827                 break;
828         case Opt_read_only:
829                 pctx->opts->read_only = true;
830                 break;
831         case Opt_read_write:
832                 pctx->opts->read_only = false;
833                 break;
834         case Opt_lock_on_read:
835                 pctx->opts->lock_on_read = true;
836                 break;
837         case Opt_exclusive:
838                 pctx->opts->exclusive = true;
839                 break;
840         case Opt_notrim:
841                 pctx->opts->trim = false;
842                 break;
843         default:
844                 /* libceph prints "bad option" msg */
845                 return -EINVAL;
846         }
847
848         return 0;
849 }
850
851 static char* obj_op_name(enum obj_operation_type op_type)
852 {
853         switch (op_type) {
854         case OBJ_OP_READ:
855                 return "read";
856         case OBJ_OP_WRITE:
857                 return "write";
858         case OBJ_OP_DISCARD:
859                 return "discard";
860         default:
861                 return "???";
862         }
863 }
864
865 /*
866  * Destroy ceph client
867  *
868  * Caller must hold rbd_client_list_lock.
869  */
870 static void rbd_client_release(struct kref *kref)
871 {
872         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
873
874         dout("%s: rbdc %p\n", __func__, rbdc);
875         spin_lock(&rbd_client_list_lock);
876         list_del(&rbdc->node);
877         spin_unlock(&rbd_client_list_lock);
878
879         ceph_destroy_client(rbdc->client);
880         kfree(rbdc);
881 }
882
883 /*
884  * Drop reference to ceph client node. If it's not referenced anymore, release
885  * it.
886  */
887 static void rbd_put_client(struct rbd_client *rbdc)
888 {
889         if (rbdc)
890                 kref_put(&rbdc->kref, rbd_client_release);
891 }
892
893 static int wait_for_latest_osdmap(struct ceph_client *client)
894 {
895         u64 newest_epoch;
896         int ret;
897
898         ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
899         if (ret)
900                 return ret;
901
902         if (client->osdc.osdmap->epoch >= newest_epoch)
903                 return 0;
904
905         ceph_osdc_maybe_request_map(&client->osdc);
906         return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
907                                      client->options->mount_timeout);
908 }
909
910 /*
911  * Get a ceph client with specific addr and configuration, if one does
912  * not exist create it.  Either way, ceph_opts is consumed by this
913  * function.
914  */
915 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
916 {
917         struct rbd_client *rbdc;
918         int ret;
919
920         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
921         rbdc = rbd_client_find(ceph_opts);
922         if (rbdc) {
923                 ceph_destroy_options(ceph_opts);
924
925                 /*
926                  * Using an existing client.  Make sure ->pg_pools is up to
927                  * date before we look up the pool id in do_rbd_add().
928                  */
929                 ret = wait_for_latest_osdmap(rbdc->client);
930                 if (ret) {
931                         rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
932                         rbd_put_client(rbdc);
933                         rbdc = ERR_PTR(ret);
934                 }
935         } else {
936                 rbdc = rbd_client_create(ceph_opts);
937         }
938         mutex_unlock(&client_mutex);
939
940         return rbdc;
941 }
942
943 static bool rbd_image_format_valid(u32 image_format)
944 {
945         return image_format == 1 || image_format == 2;
946 }
947
948 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
949 {
950         size_t size;
951         u32 snap_count;
952
953         /* The header has to start with the magic rbd header text */
954         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
955                 return false;
956
957         /* The bio layer requires at least sector-sized I/O */
958
959         if (ondisk->options.order < SECTOR_SHIFT)
960                 return false;
961
962         /* If we use u64 in a few spots we may be able to loosen this */
963
964         if (ondisk->options.order > 8 * sizeof (int) - 1)
965                 return false;
966
967         /*
968          * The size of a snapshot header has to fit in a size_t, and
969          * that limits the number of snapshots.
970          */
971         snap_count = le32_to_cpu(ondisk->snap_count);
972         size = SIZE_MAX - sizeof (struct ceph_snap_context);
973         if (snap_count > size / sizeof (__le64))
974                 return false;
975
976         /*
977          * Not only that, but the size of the entire the snapshot
978          * header must also be representable in a size_t.
979          */
980         size -= snap_count * sizeof (__le64);
981         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
982                 return false;
983
984         return true;
985 }
986
987 /*
988  * returns the size of an object in the image
989  */
990 static u32 rbd_obj_bytes(struct rbd_image_header *header)
991 {
992         return 1U << header->obj_order;
993 }
994
995 static void rbd_init_layout(struct rbd_device *rbd_dev)
996 {
997         if (rbd_dev->header.stripe_unit == 0 ||
998             rbd_dev->header.stripe_count == 0) {
999                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1000                 rbd_dev->header.stripe_count = 1;
1001         }
1002
1003         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1004         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1005         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1006         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1007                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1008         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1009 }
1010
1011 /*
1012  * Fill an rbd image header with information from the given format 1
1013  * on-disk header.
1014  */
1015 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1016                                  struct rbd_image_header_ondisk *ondisk)
1017 {
1018         struct rbd_image_header *header = &rbd_dev->header;
1019         bool first_time = header->object_prefix == NULL;
1020         struct ceph_snap_context *snapc;
1021         char *object_prefix = NULL;
1022         char *snap_names = NULL;
1023         u64 *snap_sizes = NULL;
1024         u32 snap_count;
1025         int ret = -ENOMEM;
1026         u32 i;
1027
1028         /* Allocate this now to avoid having to handle failure below */
1029
1030         if (first_time) {
1031                 object_prefix = kstrndup(ondisk->object_prefix,
1032                                          sizeof(ondisk->object_prefix),
1033                                          GFP_KERNEL);
1034                 if (!object_prefix)
1035                         return -ENOMEM;
1036         }
1037
1038         /* Allocate the snapshot context and fill it in */
1039
1040         snap_count = le32_to_cpu(ondisk->snap_count);
1041         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1042         if (!snapc)
1043                 goto out_err;
1044         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1045         if (snap_count) {
1046                 struct rbd_image_snap_ondisk *snaps;
1047                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1048
1049                 /* We'll keep a copy of the snapshot names... */
1050
1051                 if (snap_names_len > (u64)SIZE_MAX)
1052                         goto out_2big;
1053                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1054                 if (!snap_names)
1055                         goto out_err;
1056
1057                 /* ...as well as the array of their sizes. */
1058                 snap_sizes = kmalloc_array(snap_count,
1059                                            sizeof(*header->snap_sizes),
1060                                            GFP_KERNEL);
1061                 if (!snap_sizes)
1062                         goto out_err;
1063
1064                 /*
1065                  * Copy the names, and fill in each snapshot's id
1066                  * and size.
1067                  *
1068                  * Note that rbd_dev_v1_header_info() guarantees the
1069                  * ondisk buffer we're working with has
1070                  * snap_names_len bytes beyond the end of the
1071                  * snapshot id array, this memcpy() is safe.
1072                  */
1073                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1074                 snaps = ondisk->snaps;
1075                 for (i = 0; i < snap_count; i++) {
1076                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1077                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1078                 }
1079         }
1080
1081         /* We won't fail any more, fill in the header */
1082
1083         if (first_time) {
1084                 header->object_prefix = object_prefix;
1085                 header->obj_order = ondisk->options.order;
1086                 rbd_init_layout(rbd_dev);
1087         } else {
1088                 ceph_put_snap_context(header->snapc);
1089                 kfree(header->snap_names);
1090                 kfree(header->snap_sizes);
1091         }
1092
1093         /* The remaining fields always get updated (when we refresh) */
1094
1095         header->image_size = le64_to_cpu(ondisk->image_size);
1096         header->snapc = snapc;
1097         header->snap_names = snap_names;
1098         header->snap_sizes = snap_sizes;
1099
1100         return 0;
1101 out_2big:
1102         ret = -EIO;
1103 out_err:
1104         kfree(snap_sizes);
1105         kfree(snap_names);
1106         ceph_put_snap_context(snapc);
1107         kfree(object_prefix);
1108
1109         return ret;
1110 }
1111
1112 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1113 {
1114         const char *snap_name;
1115
1116         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1117
1118         /* Skip over names until we find the one we are looking for */
1119
1120         snap_name = rbd_dev->header.snap_names;
1121         while (which--)
1122                 snap_name += strlen(snap_name) + 1;
1123
1124         return kstrdup(snap_name, GFP_KERNEL);
1125 }
1126
1127 /*
1128  * Snapshot id comparison function for use with qsort()/bsearch().
1129  * Note that result is for snapshots in *descending* order.
1130  */
1131 static int snapid_compare_reverse(const void *s1, const void *s2)
1132 {
1133         u64 snap_id1 = *(u64 *)s1;
1134         u64 snap_id2 = *(u64 *)s2;
1135
1136         if (snap_id1 < snap_id2)
1137                 return 1;
1138         return snap_id1 == snap_id2 ? 0 : -1;
1139 }
1140
1141 /*
1142  * Search a snapshot context to see if the given snapshot id is
1143  * present.
1144  *
1145  * Returns the position of the snapshot id in the array if it's found,
1146  * or BAD_SNAP_INDEX otherwise.
1147  *
1148  * Note: The snapshot array is in kept sorted (by the osd) in
1149  * reverse order, highest snapshot id first.
1150  */
1151 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1152 {
1153         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1154         u64 *found;
1155
1156         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1157                                 sizeof (snap_id), snapid_compare_reverse);
1158
1159         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1160 }
1161
1162 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1163                                         u64 snap_id)
1164 {
1165         u32 which;
1166         const char *snap_name;
1167
1168         which = rbd_dev_snap_index(rbd_dev, snap_id);
1169         if (which == BAD_SNAP_INDEX)
1170                 return ERR_PTR(-ENOENT);
1171
1172         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1173         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1174 }
1175
1176 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1177 {
1178         if (snap_id == CEPH_NOSNAP)
1179                 return RBD_SNAP_HEAD_NAME;
1180
1181         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1182         if (rbd_dev->image_format == 1)
1183                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1184
1185         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1186 }
1187
1188 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1189                                 u64 *snap_size)
1190 {
1191         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1192         if (snap_id == CEPH_NOSNAP) {
1193                 *snap_size = rbd_dev->header.image_size;
1194         } else if (rbd_dev->image_format == 1) {
1195                 u32 which;
1196
1197                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1198                 if (which == BAD_SNAP_INDEX)
1199                         return -ENOENT;
1200
1201                 *snap_size = rbd_dev->header.snap_sizes[which];
1202         } else {
1203                 u64 size = 0;
1204                 int ret;
1205
1206                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1207                 if (ret)
1208                         return ret;
1209
1210                 *snap_size = size;
1211         }
1212         return 0;
1213 }
1214
1215 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1216                         u64 *snap_features)
1217 {
1218         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1219         if (snap_id == CEPH_NOSNAP) {
1220                 *snap_features = rbd_dev->header.features;
1221         } else if (rbd_dev->image_format == 1) {
1222                 *snap_features = 0;     /* No features for format 1 */
1223         } else {
1224                 u64 features = 0;
1225                 int ret;
1226
1227                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1228                 if (ret)
1229                         return ret;
1230
1231                 *snap_features = features;
1232         }
1233         return 0;
1234 }
1235
1236 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1237 {
1238         u64 snap_id = rbd_dev->spec->snap_id;
1239         u64 size = 0;
1240         u64 features = 0;
1241         int ret;
1242
1243         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1244         if (ret)
1245                 return ret;
1246         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1247         if (ret)
1248                 return ret;
1249
1250         rbd_dev->mapping.size = size;
1251         rbd_dev->mapping.features = features;
1252
1253         return 0;
1254 }
1255
1256 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1257 {
1258         rbd_dev->mapping.size = 0;
1259         rbd_dev->mapping.features = 0;
1260 }
1261
1262 static void zero_bvec(struct bio_vec *bv)
1263 {
1264         void *buf;
1265         unsigned long flags;
1266
1267         buf = bvec_kmap_irq(bv, &flags);
1268         memset(buf, 0, bv->bv_len);
1269         flush_dcache_page(bv->bv_page);
1270         bvec_kunmap_irq(buf, &flags);
1271 }
1272
1273 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1274 {
1275         struct ceph_bio_iter it = *bio_pos;
1276
1277         ceph_bio_iter_advance(&it, off);
1278         ceph_bio_iter_advance_step(&it, bytes, ({
1279                 zero_bvec(&bv);
1280         }));
1281 }
1282
1283 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1284 {
1285         struct ceph_bvec_iter it = *bvec_pos;
1286
1287         ceph_bvec_iter_advance(&it, off);
1288         ceph_bvec_iter_advance_step(&it, bytes, ({
1289                 zero_bvec(&bv);
1290         }));
1291 }
1292
1293 /*
1294  * Zero a range in @obj_req data buffer defined by a bio (list) or
1295  * (private) bio_vec array.
1296  *
1297  * @off is relative to the start of the data buffer.
1298  */
1299 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1300                                u32 bytes)
1301 {
1302         switch (obj_req->img_request->data_type) {
1303         case OBJ_REQUEST_BIO:
1304                 zero_bios(&obj_req->bio_pos, off, bytes);
1305                 break;
1306         case OBJ_REQUEST_BVECS:
1307         case OBJ_REQUEST_OWN_BVECS:
1308                 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1309                 break;
1310         default:
1311                 rbd_assert(0);
1312         }
1313 }
1314
1315 static void rbd_obj_request_destroy(struct kref *kref);
1316 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1317 {
1318         rbd_assert(obj_request != NULL);
1319         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1320                 kref_read(&obj_request->kref));
1321         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1322 }
1323
1324 static void rbd_img_request_get(struct rbd_img_request *img_request)
1325 {
1326         dout("%s: img %p (was %d)\n", __func__, img_request,
1327              kref_read(&img_request->kref));
1328         kref_get(&img_request->kref);
1329 }
1330
1331 static void rbd_img_request_destroy(struct kref *kref);
1332 static void rbd_img_request_put(struct rbd_img_request *img_request)
1333 {
1334         rbd_assert(img_request != NULL);
1335         dout("%s: img %p (was %d)\n", __func__, img_request,
1336                 kref_read(&img_request->kref));
1337         kref_put(&img_request->kref, rbd_img_request_destroy);
1338 }
1339
1340 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1341                                         struct rbd_obj_request *obj_request)
1342 {
1343         rbd_assert(obj_request->img_request == NULL);
1344
1345         /* Image request now owns object's original reference */
1346         obj_request->img_request = img_request;
1347         img_request->obj_request_count++;
1348         img_request->pending_count++;
1349         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1350 }
1351
1352 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1353                                         struct rbd_obj_request *obj_request)
1354 {
1355         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1356         list_del(&obj_request->ex.oe_item);
1357         rbd_assert(img_request->obj_request_count > 0);
1358         img_request->obj_request_count--;
1359         rbd_assert(obj_request->img_request == img_request);
1360         rbd_obj_request_put(obj_request);
1361 }
1362
1363 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1364 {
1365         struct ceph_osd_request *osd_req = obj_request->osd_req;
1366
1367         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1368              obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1369              obj_request->ex.oe_len, osd_req);
1370         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1371 }
1372
1373 /*
1374  * The default/initial value for all image request flags is 0.  Each
1375  * is conditionally set to 1 at image request initialization time
1376  * and currently never change thereafter.
1377  */
1378 static void img_request_layered_set(struct rbd_img_request *img_request)
1379 {
1380         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1381         smp_mb();
1382 }
1383
1384 static void img_request_layered_clear(struct rbd_img_request *img_request)
1385 {
1386         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1387         smp_mb();
1388 }
1389
1390 static bool img_request_layered_test(struct rbd_img_request *img_request)
1391 {
1392         smp_mb();
1393         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1394 }
1395
1396 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1397 {
1398         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1399
1400         return !obj_req->ex.oe_off &&
1401                obj_req->ex.oe_len == rbd_dev->layout.object_size;
1402 }
1403
1404 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1405 {
1406         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1407
1408         return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1409                                         rbd_dev->layout.object_size;
1410 }
1411
1412 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1413 {
1414         return ceph_file_extents_bytes(obj_req->img_extents,
1415                                        obj_req->num_img_extents);
1416 }
1417
1418 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1419 {
1420         switch (img_req->op_type) {
1421         case OBJ_OP_READ:
1422                 return false;
1423         case OBJ_OP_WRITE:
1424         case OBJ_OP_DISCARD:
1425                 return true;
1426         default:
1427                 BUG();
1428         }
1429 }
1430
1431 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1432
1433 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1434 {
1435         struct rbd_obj_request *obj_req = osd_req->r_priv;
1436
1437         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1438              osd_req->r_result, obj_req);
1439         rbd_assert(osd_req == obj_req->osd_req);
1440
1441         obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1442         if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1443                 obj_req->xferred = osd_req->r_result;
1444         else
1445                 /*
1446                  * Writes aren't allowed to return a data payload.  In some
1447                  * guarded write cases (e.g. stat + zero on an empty object)
1448                  * a stat response makes it through, but we don't care.
1449                  */
1450                 obj_req->xferred = 0;
1451
1452         rbd_obj_handle_request(obj_req);
1453 }
1454
1455 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1456 {
1457         struct ceph_osd_request *osd_req = obj_request->osd_req;
1458
1459         osd_req->r_flags = CEPH_OSD_FLAG_READ;
1460         osd_req->r_snapid = obj_request->img_request->snap_id;
1461 }
1462
1463 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1464 {
1465         struct ceph_osd_request *osd_req = obj_request->osd_req;
1466
1467         osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1468         ktime_get_real_ts64(&osd_req->r_mtime);
1469         osd_req->r_data_offset = obj_request->ex.oe_off;
1470 }
1471
1472 static struct ceph_osd_request *
1473 rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
1474 {
1475         struct rbd_img_request *img_req = obj_req->img_request;
1476         struct rbd_device *rbd_dev = img_req->rbd_dev;
1477         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1478         struct ceph_osd_request *req;
1479         const char *name_format = rbd_dev->image_format == 1 ?
1480                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1481
1482         req = ceph_osdc_alloc_request(osdc,
1483                         (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1484                         num_ops, false, GFP_NOIO);
1485         if (!req)
1486                 return NULL;
1487
1488         req->r_callback = rbd_osd_req_callback;
1489         req->r_priv = obj_req;
1490
1491         /*
1492          * Data objects may be stored in a separate pool, but always in
1493          * the same namespace in that pool as the header in its pool.
1494          */
1495         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1496         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1497
1498         if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1499                         rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
1500                 goto err_req;
1501
1502         return req;
1503
1504 err_req:
1505         ceph_osdc_put_request(req);
1506         return NULL;
1507 }
1508
1509 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1510 {
1511         ceph_osdc_put_request(osd_req);
1512 }
1513
1514 static struct rbd_obj_request *rbd_obj_request_create(void)
1515 {
1516         struct rbd_obj_request *obj_request;
1517
1518         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1519         if (!obj_request)
1520                 return NULL;
1521
1522         ceph_object_extent_init(&obj_request->ex);
1523         kref_init(&obj_request->kref);
1524
1525         dout("%s %p\n", __func__, obj_request);
1526         return obj_request;
1527 }
1528
1529 static void rbd_obj_request_destroy(struct kref *kref)
1530 {
1531         struct rbd_obj_request *obj_request;
1532         u32 i;
1533
1534         obj_request = container_of(kref, struct rbd_obj_request, kref);
1535
1536         dout("%s: obj %p\n", __func__, obj_request);
1537
1538         if (obj_request->osd_req)
1539                 rbd_osd_req_destroy(obj_request->osd_req);
1540
1541         switch (obj_request->img_request->data_type) {
1542         case OBJ_REQUEST_NODATA:
1543         case OBJ_REQUEST_BIO:
1544         case OBJ_REQUEST_BVECS:
1545                 break;          /* Nothing to do */
1546         case OBJ_REQUEST_OWN_BVECS:
1547                 kfree(obj_request->bvec_pos.bvecs);
1548                 break;
1549         default:
1550                 rbd_assert(0);
1551         }
1552
1553         kfree(obj_request->img_extents);
1554         if (obj_request->copyup_bvecs) {
1555                 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1556                         if (obj_request->copyup_bvecs[i].bv_page)
1557                                 __free_page(obj_request->copyup_bvecs[i].bv_page);
1558                 }
1559                 kfree(obj_request->copyup_bvecs);
1560         }
1561
1562         kmem_cache_free(rbd_obj_request_cache, obj_request);
1563 }
1564
1565 /* It's OK to call this for a device with no parent */
1566
1567 static void rbd_spec_put(struct rbd_spec *spec);
1568 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1569 {
1570         rbd_dev_remove_parent(rbd_dev);
1571         rbd_spec_put(rbd_dev->parent_spec);
1572         rbd_dev->parent_spec = NULL;
1573         rbd_dev->parent_overlap = 0;
1574 }
1575
1576 /*
1577  * Parent image reference counting is used to determine when an
1578  * image's parent fields can be safely torn down--after there are no
1579  * more in-flight requests to the parent image.  When the last
1580  * reference is dropped, cleaning them up is safe.
1581  */
1582 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1583 {
1584         int counter;
1585
1586         if (!rbd_dev->parent_spec)
1587                 return;
1588
1589         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1590         if (counter > 0)
1591                 return;
1592
1593         /* Last reference; clean up parent data structures */
1594
1595         if (!counter)
1596                 rbd_dev_unparent(rbd_dev);
1597         else
1598                 rbd_warn(rbd_dev, "parent reference underflow");
1599 }
1600
1601 /*
1602  * If an image has a non-zero parent overlap, get a reference to its
1603  * parent.
1604  *
1605  * Returns true if the rbd device has a parent with a non-zero
1606  * overlap and a reference for it was successfully taken, or
1607  * false otherwise.
1608  */
1609 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1610 {
1611         int counter = 0;
1612
1613         if (!rbd_dev->parent_spec)
1614                 return false;
1615
1616         down_read(&rbd_dev->header_rwsem);
1617         if (rbd_dev->parent_overlap)
1618                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1619         up_read(&rbd_dev->header_rwsem);
1620
1621         if (counter < 0)
1622                 rbd_warn(rbd_dev, "parent reference overflow");
1623
1624         return counter > 0;
1625 }
1626
1627 /*
1628  * Caller is responsible for filling in the list of object requests
1629  * that comprises the image request, and the Linux request pointer
1630  * (if there is one).
1631  */
1632 static struct rbd_img_request *rbd_img_request_create(
1633                                         struct rbd_device *rbd_dev,
1634                                         enum obj_operation_type op_type,
1635                                         struct ceph_snap_context *snapc)
1636 {
1637         struct rbd_img_request *img_request;
1638
1639         img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1640         if (!img_request)
1641                 return NULL;
1642
1643         img_request->rbd_dev = rbd_dev;
1644         img_request->op_type = op_type;
1645         if (!rbd_img_is_write(img_request))
1646                 img_request->snap_id = rbd_dev->spec->snap_id;
1647         else
1648                 img_request->snapc = snapc;
1649
1650         if (rbd_dev_parent_get(rbd_dev))
1651                 img_request_layered_set(img_request);
1652
1653         spin_lock_init(&img_request->completion_lock);
1654         INIT_LIST_HEAD(&img_request->object_extents);
1655         kref_init(&img_request->kref);
1656
1657         dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1658              obj_op_name(op_type), img_request);
1659         return img_request;
1660 }
1661
1662 static void rbd_img_request_destroy(struct kref *kref)
1663 {
1664         struct rbd_img_request *img_request;
1665         struct rbd_obj_request *obj_request;
1666         struct rbd_obj_request *next_obj_request;
1667
1668         img_request = container_of(kref, struct rbd_img_request, kref);
1669
1670         dout("%s: img %p\n", __func__, img_request);
1671
1672         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1673                 rbd_img_obj_request_del(img_request, obj_request);
1674         rbd_assert(img_request->obj_request_count == 0);
1675
1676         if (img_request_layered_test(img_request)) {
1677                 img_request_layered_clear(img_request);
1678                 rbd_dev_parent_put(img_request->rbd_dev);
1679         }
1680
1681         if (rbd_img_is_write(img_request))
1682                 ceph_put_snap_context(img_request->snapc);
1683
1684         kmem_cache_free(rbd_img_request_cache, img_request);
1685 }
1686
1687 static void prune_extents(struct ceph_file_extent *img_extents,
1688                           u32 *num_img_extents, u64 overlap)
1689 {
1690         u32 cnt = *num_img_extents;
1691
1692         /* drop extents completely beyond the overlap */
1693         while (cnt && img_extents[cnt - 1].fe_off >= overlap)
1694                 cnt--;
1695
1696         if (cnt) {
1697                 struct ceph_file_extent *ex = &img_extents[cnt - 1];
1698
1699                 /* trim final overlapping extent */
1700                 if (ex->fe_off + ex->fe_len > overlap)
1701                         ex->fe_len = overlap - ex->fe_off;
1702         }
1703
1704         *num_img_extents = cnt;
1705 }
1706
1707 /*
1708  * Determine the byte range(s) covered by either just the object extent
1709  * or the entire object in the parent image.
1710  */
1711 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
1712                                     bool entire)
1713 {
1714         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1715         int ret;
1716
1717         if (!rbd_dev->parent_overlap)
1718                 return 0;
1719
1720         ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
1721                                   entire ? 0 : obj_req->ex.oe_off,
1722                                   entire ? rbd_dev->layout.object_size :
1723                                                         obj_req->ex.oe_len,
1724                                   &obj_req->img_extents,
1725                                   &obj_req->num_img_extents);
1726         if (ret)
1727                 return ret;
1728
1729         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
1730                       rbd_dev->parent_overlap);
1731         return 0;
1732 }
1733
1734 static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
1735 {
1736         switch (obj_req->img_request->data_type) {
1737         case OBJ_REQUEST_BIO:
1738                 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
1739                                                &obj_req->bio_pos,
1740                                                obj_req->ex.oe_len);
1741                 break;
1742         case OBJ_REQUEST_BVECS:
1743         case OBJ_REQUEST_OWN_BVECS:
1744                 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
1745                                                         obj_req->ex.oe_len);
1746                 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
1747                 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
1748                                                     &obj_req->bvec_pos);
1749                 break;
1750         default:
1751                 rbd_assert(0);
1752         }
1753 }
1754
1755 static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1756 {
1757         obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
1758         if (!obj_req->osd_req)
1759                 return -ENOMEM;
1760
1761         osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
1762                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1763         rbd_osd_req_setup_data(obj_req, 0);
1764
1765         rbd_osd_req_format_read(obj_req);
1766         return 0;
1767 }
1768
1769 static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1770                                 unsigned int which)
1771 {
1772         struct page **pages;
1773
1774         /*
1775          * The response data for a STAT call consists of:
1776          *     le64 length;
1777          *     struct {
1778          *         le32 tv_sec;
1779          *         le32 tv_nsec;
1780          *     } mtime;
1781          */
1782         pages = ceph_alloc_page_vector(1, GFP_NOIO);
1783         if (IS_ERR(pages))
1784                 return PTR_ERR(pages);
1785
1786         osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1787         osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
1788                                      8 + sizeof(struct ceph_timespec),
1789                                      0, false, true);
1790         return 0;
1791 }
1792
1793 static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1794                                   unsigned int which)
1795 {
1796         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1797         u16 opcode;
1798
1799         osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1800                                    rbd_dev->layout.object_size,
1801                                    rbd_dev->layout.object_size);
1802
1803         if (rbd_obj_is_entire(obj_req))
1804                 opcode = CEPH_OSD_OP_WRITEFULL;
1805         else
1806                 opcode = CEPH_OSD_OP_WRITE;
1807
1808         osd_req_op_extent_init(obj_req->osd_req, which, opcode,
1809                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1810         rbd_osd_req_setup_data(obj_req, which++);
1811
1812         rbd_assert(which == obj_req->osd_req->r_num_ops);
1813         rbd_osd_req_format_write(obj_req);
1814 }
1815
1816 static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
1817 {
1818         unsigned int num_osd_ops, which = 0;
1819         int ret;
1820
1821         /* reverse map the entire object onto the parent */
1822         ret = rbd_obj_calc_img_extents(obj_req, true);
1823         if (ret)
1824                 return ret;
1825
1826         if (obj_req->num_img_extents) {
1827                 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1828                 num_osd_ops = 3; /* stat + setallochint + write/writefull */
1829         } else {
1830                 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1831                 num_osd_ops = 2; /* setallochint + write/writefull */
1832         }
1833
1834         obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1835         if (!obj_req->osd_req)
1836                 return -ENOMEM;
1837
1838         if (obj_req->num_img_extents) {
1839                 ret = __rbd_obj_setup_stat(obj_req, which++);
1840                 if (ret)
1841                         return ret;
1842         }
1843
1844         __rbd_obj_setup_write(obj_req, which);
1845         return 0;
1846 }
1847
1848 static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
1849                                     unsigned int which)
1850 {
1851         u16 opcode;
1852
1853         if (rbd_obj_is_entire(obj_req)) {
1854                 if (obj_req->num_img_extents) {
1855                         osd_req_op_init(obj_req->osd_req, which++,
1856                                         CEPH_OSD_OP_CREATE, 0);
1857                         opcode = CEPH_OSD_OP_TRUNCATE;
1858                 } else {
1859                         osd_req_op_init(obj_req->osd_req, which++,
1860                                         CEPH_OSD_OP_DELETE, 0);
1861                         opcode = 0;
1862                 }
1863         } else if (rbd_obj_is_tail(obj_req)) {
1864                 opcode = CEPH_OSD_OP_TRUNCATE;
1865         } else {
1866                 opcode = CEPH_OSD_OP_ZERO;
1867         }
1868
1869         if (opcode)
1870                 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
1871                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
1872                                        0, 0);
1873
1874         rbd_assert(which == obj_req->osd_req->r_num_ops);
1875         rbd_osd_req_format_write(obj_req);
1876 }
1877
1878 static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
1879 {
1880         unsigned int num_osd_ops, which = 0;
1881         int ret;
1882
1883         /* reverse map the entire object onto the parent */
1884         ret = rbd_obj_calc_img_extents(obj_req, true);
1885         if (ret)
1886                 return ret;
1887
1888         if (rbd_obj_is_entire(obj_req)) {
1889                 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1890                 if (obj_req->num_img_extents)
1891                         num_osd_ops = 2; /* create + truncate */
1892                 else
1893                         num_osd_ops = 1; /* delete */
1894         } else {
1895                 if (obj_req->num_img_extents) {
1896                         obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1897                         num_osd_ops = 2; /* stat + truncate/zero */
1898                 } else {
1899                         obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1900                         num_osd_ops = 1; /* truncate/zero */
1901                 }
1902         }
1903
1904         obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1905         if (!obj_req->osd_req)
1906                 return -ENOMEM;
1907
1908         if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
1909                 ret = __rbd_obj_setup_stat(obj_req, which++);
1910                 if (ret)
1911                         return ret;
1912         }
1913
1914         __rbd_obj_setup_discard(obj_req, which);
1915         return 0;
1916 }
1917
1918 /*
1919  * For each object request in @img_req, allocate an OSD request, add
1920  * individual OSD ops and prepare them for submission.  The number of
1921  * OSD ops depends on op_type and the overlap point (if any).
1922  */
1923 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1924 {
1925         struct rbd_obj_request *obj_req;
1926         int ret;
1927
1928         for_each_obj_request(img_req, obj_req) {
1929                 switch (img_req->op_type) {
1930                 case OBJ_OP_READ:
1931                         ret = rbd_obj_setup_read(obj_req);
1932                         break;
1933                 case OBJ_OP_WRITE:
1934                         ret = rbd_obj_setup_write(obj_req);
1935                         break;
1936                 case OBJ_OP_DISCARD:
1937                         ret = rbd_obj_setup_discard(obj_req);
1938                         break;
1939                 default:
1940                         rbd_assert(0);
1941                 }
1942                 if (ret)
1943                         return ret;
1944
1945                 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
1946                 if (ret)
1947                         return ret;
1948         }
1949
1950         return 0;
1951 }
1952
1953 union rbd_img_fill_iter {
1954         struct ceph_bio_iter    bio_iter;
1955         struct ceph_bvec_iter   bvec_iter;
1956 };
1957
1958 struct rbd_img_fill_ctx {
1959         enum obj_request_type   pos_type;
1960         union rbd_img_fill_iter *pos;
1961         union rbd_img_fill_iter iter;
1962         ceph_object_extent_fn_t set_pos_fn;
1963         ceph_object_extent_fn_t count_fn;
1964         ceph_object_extent_fn_t copy_fn;
1965 };
1966
1967 static struct ceph_object_extent *alloc_object_extent(void *arg)
1968 {
1969         struct rbd_img_request *img_req = arg;
1970         struct rbd_obj_request *obj_req;
1971
1972         obj_req = rbd_obj_request_create();
1973         if (!obj_req)
1974                 return NULL;
1975
1976         rbd_img_obj_request_add(img_req, obj_req);
1977         return &obj_req->ex;
1978 }
1979
1980 /*
1981  * While su != os && sc == 1 is technically not fancy (it's the same
1982  * layout as su == os && sc == 1), we can't use the nocopy path for it
1983  * because ->set_pos_fn() should be called only once per object.
1984  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
1985  * treat su != os && sc == 1 as fancy.
1986  */
1987 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
1988 {
1989         return l->stripe_unit != l->object_size;
1990 }
1991
1992 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
1993                                        struct ceph_file_extent *img_extents,
1994                                        u32 num_img_extents,
1995                                        struct rbd_img_fill_ctx *fctx)
1996 {
1997         u32 i;
1998         int ret;
1999
2000         img_req->data_type = fctx->pos_type;
2001
2002         /*
2003          * Create object requests and set each object request's starting
2004          * position in the provided bio (list) or bio_vec array.
2005          */
2006         fctx->iter = *fctx->pos;
2007         for (i = 0; i < num_img_extents; i++) {
2008                 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2009                                            img_extents[i].fe_off,
2010                                            img_extents[i].fe_len,
2011                                            &img_req->object_extents,
2012                                            alloc_object_extent, img_req,
2013                                            fctx->set_pos_fn, &fctx->iter);
2014                 if (ret)
2015                         return ret;
2016         }
2017
2018         return __rbd_img_fill_request(img_req);
2019 }
2020
2021 /*
2022  * Map a list of image extents to a list of object extents, create the
2023  * corresponding object requests (normally each to a different object,
2024  * but not always) and add them to @img_req.  For each object request,
2025  * set up its data descriptor to point to the corresponding chunk(s) of
2026  * @fctx->pos data buffer.
2027  *
2028  * Because ceph_file_to_extents() will merge adjacent object extents
2029  * together, each object request's data descriptor may point to multiple
2030  * different chunks of @fctx->pos data buffer.
2031  *
2032  * @fctx->pos data buffer is assumed to be large enough.
2033  */
2034 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2035                                 struct ceph_file_extent *img_extents,
2036                                 u32 num_img_extents,
2037                                 struct rbd_img_fill_ctx *fctx)
2038 {
2039         struct rbd_device *rbd_dev = img_req->rbd_dev;
2040         struct rbd_obj_request *obj_req;
2041         u32 i;
2042         int ret;
2043
2044         if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2045             !rbd_layout_is_fancy(&rbd_dev->layout))
2046                 return rbd_img_fill_request_nocopy(img_req, img_extents,
2047                                                    num_img_extents, fctx);
2048
2049         img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2050
2051         /*
2052          * Create object requests and determine ->bvec_count for each object
2053          * request.  Note that ->bvec_count sum over all object requests may
2054          * be greater than the number of bio_vecs in the provided bio (list)
2055          * or bio_vec array because when mapped, those bio_vecs can straddle
2056          * stripe unit boundaries.
2057          */
2058         fctx->iter = *fctx->pos;
2059         for (i = 0; i < num_img_extents; i++) {
2060                 ret = ceph_file_to_extents(&rbd_dev->layout,
2061                                            img_extents[i].fe_off,
2062                                            img_extents[i].fe_len,
2063                                            &img_req->object_extents,
2064                                            alloc_object_extent, img_req,
2065                                            fctx->count_fn, &fctx->iter);
2066                 if (ret)
2067                         return ret;
2068         }
2069
2070         for_each_obj_request(img_req, obj_req) {
2071                 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2072                                               sizeof(*obj_req->bvec_pos.bvecs),
2073                                               GFP_NOIO);
2074                 if (!obj_req->bvec_pos.bvecs)
2075                         return -ENOMEM;
2076         }
2077
2078         /*
2079          * Fill in each object request's private bio_vec array, splitting and
2080          * rearranging the provided bio_vecs in stripe unit chunks as needed.
2081          */
2082         fctx->iter = *fctx->pos;
2083         for (i = 0; i < num_img_extents; i++) {
2084                 ret = ceph_iterate_extents(&rbd_dev->layout,
2085                                            img_extents[i].fe_off,
2086                                            img_extents[i].fe_len,
2087                                            &img_req->object_extents,
2088                                            fctx->copy_fn, &fctx->iter);
2089                 if (ret)
2090                         return ret;
2091         }
2092
2093         return __rbd_img_fill_request(img_req);
2094 }
2095
2096 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2097                                u64 off, u64 len)
2098 {
2099         struct ceph_file_extent ex = { off, len };
2100         union rbd_img_fill_iter dummy;
2101         struct rbd_img_fill_ctx fctx = {
2102                 .pos_type = OBJ_REQUEST_NODATA,
2103                 .pos = &dummy,
2104         };
2105
2106         return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2107 }
2108
2109 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2110 {
2111         struct rbd_obj_request *obj_req =
2112             container_of(ex, struct rbd_obj_request, ex);
2113         struct ceph_bio_iter *it = arg;
2114
2115         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2116         obj_req->bio_pos = *it;
2117         ceph_bio_iter_advance(it, bytes);
2118 }
2119
2120 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2121 {
2122         struct rbd_obj_request *obj_req =
2123             container_of(ex, struct rbd_obj_request, ex);
2124         struct ceph_bio_iter *it = arg;
2125
2126         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2127         ceph_bio_iter_advance_step(it, bytes, ({
2128                 obj_req->bvec_count++;
2129         }));
2130
2131 }
2132
2133 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2134 {
2135         struct rbd_obj_request *obj_req =
2136             container_of(ex, struct rbd_obj_request, ex);
2137         struct ceph_bio_iter *it = arg;
2138
2139         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2140         ceph_bio_iter_advance_step(it, bytes, ({
2141                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2142                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2143         }));
2144 }
2145
2146 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2147                                    struct ceph_file_extent *img_extents,
2148                                    u32 num_img_extents,
2149                                    struct ceph_bio_iter *bio_pos)
2150 {
2151         struct rbd_img_fill_ctx fctx = {
2152                 .pos_type = OBJ_REQUEST_BIO,
2153                 .pos = (union rbd_img_fill_iter *)bio_pos,
2154                 .set_pos_fn = set_bio_pos,
2155                 .count_fn = count_bio_bvecs,
2156                 .copy_fn = copy_bio_bvecs,
2157         };
2158
2159         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2160                                     &fctx);
2161 }
2162
2163 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2164                                  u64 off, u64 len, struct bio *bio)
2165 {
2166         struct ceph_file_extent ex = { off, len };
2167         struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2168
2169         return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2170 }
2171
2172 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2173 {
2174         struct rbd_obj_request *obj_req =
2175             container_of(ex, struct rbd_obj_request, ex);
2176         struct ceph_bvec_iter *it = arg;
2177
2178         obj_req->bvec_pos = *it;
2179         ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2180         ceph_bvec_iter_advance(it, bytes);
2181 }
2182
2183 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2184 {
2185         struct rbd_obj_request *obj_req =
2186             container_of(ex, struct rbd_obj_request, ex);
2187         struct ceph_bvec_iter *it = arg;
2188
2189         ceph_bvec_iter_advance_step(it, bytes, ({
2190                 obj_req->bvec_count++;
2191         }));
2192 }
2193
2194 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2195 {
2196         struct rbd_obj_request *obj_req =
2197             container_of(ex, struct rbd_obj_request, ex);
2198         struct ceph_bvec_iter *it = arg;
2199
2200         ceph_bvec_iter_advance_step(it, bytes, ({
2201                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2202                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2203         }));
2204 }
2205
2206 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2207                                      struct ceph_file_extent *img_extents,
2208                                      u32 num_img_extents,
2209                                      struct ceph_bvec_iter *bvec_pos)
2210 {
2211         struct rbd_img_fill_ctx fctx = {
2212                 .pos_type = OBJ_REQUEST_BVECS,
2213                 .pos = (union rbd_img_fill_iter *)bvec_pos,
2214                 .set_pos_fn = set_bvec_pos,
2215                 .count_fn = count_bvecs,
2216                 .copy_fn = copy_bvecs,
2217         };
2218
2219         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2220                                     &fctx);
2221 }
2222
2223 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2224                                    struct ceph_file_extent *img_extents,
2225                                    u32 num_img_extents,
2226                                    struct bio_vec *bvecs)
2227 {
2228         struct ceph_bvec_iter it = {
2229                 .bvecs = bvecs,
2230                 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2231                                                              num_img_extents) },
2232         };
2233
2234         return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2235                                          &it);
2236 }
2237
2238 static void rbd_img_request_submit(struct rbd_img_request *img_request)
2239 {
2240         struct rbd_obj_request *obj_request;
2241
2242         dout("%s: img %p\n", __func__, img_request);
2243
2244         rbd_img_request_get(img_request);
2245         for_each_obj_request(img_request, obj_request)
2246                 rbd_obj_request_submit(obj_request);
2247
2248         rbd_img_request_put(img_request);
2249 }
2250
2251 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2252 {
2253         struct rbd_img_request *img_req = obj_req->img_request;
2254         struct rbd_img_request *child_img_req;
2255         int ret;
2256
2257         child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2258                                                OBJ_OP_READ, NULL);
2259         if (!child_img_req)
2260                 return -ENOMEM;
2261
2262         __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2263         child_img_req->obj_request = obj_req;
2264
2265         if (!rbd_img_is_write(img_req)) {
2266                 switch (img_req->data_type) {
2267                 case OBJ_REQUEST_BIO:
2268                         ret = __rbd_img_fill_from_bio(child_img_req,
2269                                                       obj_req->img_extents,
2270                                                       obj_req->num_img_extents,
2271                                                       &obj_req->bio_pos);
2272                         break;
2273                 case OBJ_REQUEST_BVECS:
2274                 case OBJ_REQUEST_OWN_BVECS:
2275                         ret = __rbd_img_fill_from_bvecs(child_img_req,
2276                                                       obj_req->img_extents,
2277                                                       obj_req->num_img_extents,
2278                                                       &obj_req->bvec_pos);
2279                         break;
2280                 default:
2281                         rbd_assert(0);
2282                 }
2283         } else {
2284                 ret = rbd_img_fill_from_bvecs(child_img_req,
2285                                               obj_req->img_extents,
2286                                               obj_req->num_img_extents,
2287                                               obj_req->copyup_bvecs);
2288         }
2289         if (ret) {
2290                 rbd_img_request_put(child_img_req);
2291                 return ret;
2292         }
2293
2294         rbd_img_request_submit(child_img_req);
2295         return 0;
2296 }
2297
2298 static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2299 {
2300         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2301         int ret;
2302
2303         if (obj_req->result == -ENOENT &&
2304             rbd_dev->parent_overlap && !obj_req->tried_parent) {
2305                 /* reverse map this object extent onto the parent */
2306                 ret = rbd_obj_calc_img_extents(obj_req, false);
2307                 if (ret) {
2308                         obj_req->result = ret;
2309                         return true;
2310                 }
2311
2312                 if (obj_req->num_img_extents) {
2313                         obj_req->tried_parent = true;
2314                         ret = rbd_obj_read_from_parent(obj_req);
2315                         if (ret) {
2316                                 obj_req->result = ret;
2317                                 return true;
2318                         }
2319                         return false;
2320                 }
2321         }
2322
2323         /*
2324          * -ENOENT means a hole in the image -- zero-fill the entire
2325          * length of the request.  A short read also implies zero-fill
2326          * to the end of the request.  In both cases we update xferred
2327          * count to indicate the whole request was satisfied.
2328          */
2329         if (obj_req->result == -ENOENT ||
2330             (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
2331                 rbd_assert(!obj_req->xferred || !obj_req->result);
2332                 rbd_obj_zero_range(obj_req, obj_req->xferred,
2333                                    obj_req->ex.oe_len - obj_req->xferred);
2334                 obj_req->result = 0;
2335                 obj_req->xferred = obj_req->ex.oe_len;
2336         }
2337
2338         return true;
2339 }
2340
2341 /*
2342  * copyup_bvecs pages are never highmem pages
2343  */
2344 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2345 {
2346         struct ceph_bvec_iter it = {
2347                 .bvecs = bvecs,
2348                 .iter = { .bi_size = bytes },
2349         };
2350
2351         ceph_bvec_iter_advance_step(&it, bytes, ({
2352                 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2353                                bv.bv_len))
2354                         return false;
2355         }));
2356         return true;
2357 }
2358
2359 static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2360 {
2361         unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
2362         int ret;
2363
2364         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2365         rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2366         rbd_osd_req_destroy(obj_req->osd_req);
2367
2368         /*
2369          * Create a copyup request with the same number of OSD ops as
2370          * the original request.  The original request was stat + op(s),
2371          * the new copyup request will be copyup + the same op(s).
2372          */
2373         obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
2374         if (!obj_req->osd_req)
2375                 return -ENOMEM;
2376
2377         ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
2378         if (ret)
2379                 return ret;
2380
2381         /*
2382          * Only send non-zero copyup data to save some I/O and network
2383          * bandwidth -- zero copyup data is equivalent to the object not
2384          * existing.
2385          */
2386         if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2387                 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2388                 bytes = 0;
2389         }
2390         osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2391                                           obj_req->copyup_bvecs,
2392                                           obj_req->copyup_bvec_count,
2393                                           bytes);
2394
2395         switch (obj_req->img_request->op_type) {
2396         case OBJ_OP_WRITE:
2397                 __rbd_obj_setup_write(obj_req, 1);
2398                 break;
2399         case OBJ_OP_DISCARD:
2400                 rbd_assert(!rbd_obj_is_entire(obj_req));
2401                 __rbd_obj_setup_discard(obj_req, 1);
2402                 break;
2403         default:
2404                 rbd_assert(0);
2405         }
2406
2407         ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
2408         if (ret)
2409                 return ret;
2410
2411         rbd_obj_request_submit(obj_req);
2412         return 0;
2413 }
2414
2415 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
2416 {
2417         u32 i;
2418
2419         rbd_assert(!obj_req->copyup_bvecs);
2420         obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2421         obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2422                                         sizeof(*obj_req->copyup_bvecs),
2423                                         GFP_NOIO);
2424         if (!obj_req->copyup_bvecs)
2425                 return -ENOMEM;
2426
2427         for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2428                 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2429
2430                 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2431                 if (!obj_req->copyup_bvecs[i].bv_page)
2432                         return -ENOMEM;
2433
2434                 obj_req->copyup_bvecs[i].bv_offset = 0;
2435                 obj_req->copyup_bvecs[i].bv_len = len;
2436                 obj_overlap -= len;
2437         }
2438
2439         rbd_assert(!obj_overlap);
2440         return 0;
2441 }
2442
2443 static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
2444 {
2445         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2446         int ret;
2447
2448         rbd_assert(obj_req->num_img_extents);
2449         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2450                       rbd_dev->parent_overlap);
2451         if (!obj_req->num_img_extents) {
2452                 /*
2453                  * The overlap has become 0 (most likely because the
2454                  * image has been flattened).  Use rbd_obj_issue_copyup()
2455                  * to re-submit the original write request -- the copyup
2456                  * operation itself will be a no-op, since someone must
2457                  * have populated the child object while we weren't
2458                  * looking.  Move to WRITE_FLAT state as we'll be done
2459                  * with the operation once the null copyup completes.
2460                  */
2461                 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2462                 return rbd_obj_issue_copyup(obj_req, 0);
2463         }
2464
2465         ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
2466         if (ret)
2467                 return ret;
2468
2469         obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
2470         return rbd_obj_read_from_parent(obj_req);
2471 }
2472
2473 static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
2474 {
2475         int ret;
2476
2477 again:
2478         switch (obj_req->write_state) {
2479         case RBD_OBJ_WRITE_GUARD:
2480                 rbd_assert(!obj_req->xferred);
2481                 if (obj_req->result == -ENOENT) {
2482                         /*
2483                          * The target object doesn't exist.  Read the data for
2484                          * the entire target object up to the overlap point (if
2485                          * any) from the parent, so we can use it for a copyup.
2486                          */
2487                         ret = rbd_obj_handle_write_guard(obj_req);
2488                         if (ret) {
2489                                 obj_req->result = ret;
2490                                 return true;
2491                         }
2492                         return false;
2493                 }
2494                 /* fall through */
2495         case RBD_OBJ_WRITE_FLAT:
2496                 if (!obj_req->result)
2497                         /*
2498                          * There is no such thing as a successful short
2499                          * write -- indicate the whole request was satisfied.
2500                          */
2501                         obj_req->xferred = obj_req->ex.oe_len;
2502                 return true;
2503         case RBD_OBJ_WRITE_COPYUP:
2504                 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2505                 if (obj_req->result)
2506                         goto again;
2507
2508                 rbd_assert(obj_req->xferred);
2509                 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2510                 if (ret) {
2511                         obj_req->result = ret;
2512                         return true;
2513                 }
2514                 return false;
2515         default:
2516                 BUG();
2517         }
2518 }
2519
2520 /*
2521  * Returns true if @obj_req is completed, or false otherwise.
2522  */
2523 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2524 {
2525         switch (obj_req->img_request->op_type) {
2526         case OBJ_OP_READ:
2527                 return rbd_obj_handle_read(obj_req);
2528         case OBJ_OP_WRITE:
2529                 return rbd_obj_handle_write(obj_req);
2530         case OBJ_OP_DISCARD:
2531                 if (rbd_obj_handle_write(obj_req)) {
2532                         /*
2533                          * Hide -ENOENT from delete/truncate/zero -- discarding
2534                          * a non-existent object is not a problem.
2535                          */
2536                         if (obj_req->result == -ENOENT) {
2537                                 obj_req->result = 0;
2538                                 obj_req->xferred = obj_req->ex.oe_len;
2539                         }
2540                         return true;
2541                 }
2542                 return false;
2543         default:
2544                 BUG();
2545         }
2546 }
2547
2548 static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
2549 {
2550         struct rbd_img_request *img_req = obj_req->img_request;
2551
2552         rbd_assert((!obj_req->result &&
2553                     obj_req->xferred == obj_req->ex.oe_len) ||
2554                    (obj_req->result < 0 && !obj_req->xferred));
2555         if (!obj_req->result) {
2556                 img_req->xferred += obj_req->xferred;
2557                 return;
2558         }
2559
2560         rbd_warn(img_req->rbd_dev,
2561                  "%s at objno %llu %llu~%llu result %d xferred %llu",
2562                  obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2563                  obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
2564                  obj_req->xferred);
2565         if (!img_req->result) {
2566                 img_req->result = obj_req->result;
2567                 img_req->xferred = 0;
2568         }
2569 }
2570
2571 static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2572 {
2573         struct rbd_obj_request *obj_req = img_req->obj_request;
2574
2575         rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
2576         rbd_assert((!img_req->result &&
2577                     img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2578                    (img_req->result < 0 && !img_req->xferred));
2579
2580         obj_req->result = img_req->result;
2581         obj_req->xferred = img_req->xferred;
2582         rbd_img_request_put(img_req);
2583 }
2584
2585 static void rbd_img_end_request(struct rbd_img_request *img_req)
2586 {
2587         rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2588         rbd_assert((!img_req->result &&
2589                     img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2590                    (img_req->result < 0 && !img_req->xferred));
2591
2592         blk_mq_end_request(img_req->rq,
2593                            errno_to_blk_status(img_req->result));
2594         rbd_img_request_put(img_req);
2595 }
2596
2597 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2598 {
2599         struct rbd_img_request *img_req;
2600
2601 again:
2602         if (!__rbd_obj_handle_request(obj_req))
2603                 return;
2604
2605         img_req = obj_req->img_request;
2606         spin_lock(&img_req->completion_lock);
2607         rbd_obj_end_request(obj_req);
2608         rbd_assert(img_req->pending_count);
2609         if (--img_req->pending_count) {
2610                 spin_unlock(&img_req->completion_lock);
2611                 return;
2612         }
2613
2614         spin_unlock(&img_req->completion_lock);
2615         if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2616                 obj_req = img_req->obj_request;
2617                 rbd_img_end_child_request(img_req);
2618                 goto again;
2619         }
2620         rbd_img_end_request(img_req);
2621 }
2622
2623 static const struct rbd_client_id rbd_empty_cid;
2624
2625 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2626                           const struct rbd_client_id *rhs)
2627 {
2628         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2629 }
2630
2631 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
2632 {
2633         struct rbd_client_id cid;
2634
2635         mutex_lock(&rbd_dev->watch_mutex);
2636         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
2637         cid.handle = rbd_dev->watch_cookie;
2638         mutex_unlock(&rbd_dev->watch_mutex);
2639         return cid;
2640 }
2641
2642 /*
2643  * lock_rwsem must be held for write
2644  */
2645 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
2646                               const struct rbd_client_id *cid)
2647 {
2648         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
2649              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
2650              cid->gid, cid->handle);
2651         rbd_dev->owner_cid = *cid; /* struct */
2652 }
2653
2654 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
2655 {
2656         mutex_lock(&rbd_dev->watch_mutex);
2657         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
2658         mutex_unlock(&rbd_dev->watch_mutex);
2659 }
2660
2661 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
2662 {
2663         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2664
2665         strcpy(rbd_dev->lock_cookie, cookie);
2666         rbd_set_owner_cid(rbd_dev, &cid);
2667         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
2668 }
2669
2670 /*
2671  * lock_rwsem must be held for write
2672  */
2673 static int rbd_lock(struct rbd_device *rbd_dev)
2674 {
2675         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2676         char cookie[32];
2677         int ret;
2678
2679         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
2680                 rbd_dev->lock_cookie[0] != '\0');
2681
2682         format_lock_cookie(rbd_dev, cookie);
2683         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2684                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
2685                             RBD_LOCK_TAG, "", 0);
2686         if (ret)
2687                 return ret;
2688
2689         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
2690         __rbd_lock(rbd_dev, cookie);
2691         return 0;
2692 }
2693
2694 /*
2695  * lock_rwsem must be held for write
2696  */
2697 static void rbd_unlock(struct rbd_device *rbd_dev)
2698 {
2699         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2700         int ret;
2701
2702         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
2703                 rbd_dev->lock_cookie[0] == '\0');
2704
2705         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2706                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
2707         if (ret && ret != -ENOENT)
2708                 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
2709
2710         /* treat errors as the image is unlocked */
2711         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
2712         rbd_dev->lock_cookie[0] = '\0';
2713         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
2714         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
2715 }
2716
2717 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
2718                                 enum rbd_notify_op notify_op,
2719                                 struct page ***preply_pages,
2720                                 size_t *preply_len)
2721 {
2722         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2723         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2724         char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
2725         int buf_size = sizeof(buf);
2726         void *p = buf;
2727
2728         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
2729
2730         /* encode *LockPayload NotifyMessage (op + ClientId) */
2731         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
2732         ceph_encode_32(&p, notify_op);
2733         ceph_encode_64(&p, cid.gid);
2734         ceph_encode_64(&p, cid.handle);
2735
2736         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
2737                                 &rbd_dev->header_oloc, buf, buf_size,
2738                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
2739 }
2740
2741 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
2742                                enum rbd_notify_op notify_op)
2743 {
2744         struct page **reply_pages;
2745         size_t reply_len;
2746
2747         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2748         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2749 }
2750
2751 static void rbd_notify_acquired_lock(struct work_struct *work)
2752 {
2753         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2754                                                   acquired_lock_work);
2755
2756         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
2757 }
2758
2759 static void rbd_notify_released_lock(struct work_struct *work)
2760 {
2761         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2762                                                   released_lock_work);
2763
2764         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
2765 }
2766
2767 static int rbd_request_lock(struct rbd_device *rbd_dev)
2768 {
2769         struct page **reply_pages;
2770         size_t reply_len;
2771         bool lock_owner_responded = false;
2772         int ret;
2773
2774         dout("%s rbd_dev %p\n", __func__, rbd_dev);
2775
2776         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
2777                                    &reply_pages, &reply_len);
2778         if (ret && ret != -ETIMEDOUT) {
2779                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
2780                 goto out;
2781         }
2782
2783         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
2784                 void *p = page_address(reply_pages[0]);
2785                 void *const end = p + reply_len;
2786                 u32 n;
2787
2788                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
2789                 while (n--) {
2790                         u8 struct_v;
2791                         u32 len;
2792
2793                         ceph_decode_need(&p, end, 8 + 8, e_inval);
2794                         p += 8 + 8; /* skip gid and cookie */
2795
2796                         ceph_decode_32_safe(&p, end, len, e_inval);
2797                         if (!len)
2798                                 continue;
2799
2800                         if (lock_owner_responded) {
2801                                 rbd_warn(rbd_dev,
2802                                          "duplicate lock owners detected");
2803                                 ret = -EIO;
2804                                 goto out;
2805                         }
2806
2807                         lock_owner_responded = true;
2808                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
2809                                                   &struct_v, &len);
2810                         if (ret) {
2811                                 rbd_warn(rbd_dev,
2812                                          "failed to decode ResponseMessage: %d",
2813                                          ret);
2814                                 goto e_inval;
2815                         }
2816
2817                         ret = ceph_decode_32(&p);
2818                 }
2819         }
2820
2821         if (!lock_owner_responded) {
2822                 rbd_warn(rbd_dev, "no lock owners detected");
2823                 ret = -ETIMEDOUT;
2824         }
2825
2826 out:
2827         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2828         return ret;
2829
2830 e_inval:
2831         ret = -EINVAL;
2832         goto out;
2833 }
2834
2835 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
2836 {
2837         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
2838
2839         cancel_delayed_work(&rbd_dev->lock_dwork);
2840         if (wake_all)
2841                 wake_up_all(&rbd_dev->lock_waitq);
2842         else
2843                 wake_up(&rbd_dev->lock_waitq);
2844 }
2845
2846 static int get_lock_owner_info(struct rbd_device *rbd_dev,
2847                                struct ceph_locker **lockers, u32 *num_lockers)
2848 {
2849         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2850         u8 lock_type;
2851         char *lock_tag;
2852         int ret;
2853
2854         dout("%s rbd_dev %p\n", __func__, rbd_dev);
2855
2856         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
2857                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
2858                                  &lock_type, &lock_tag, lockers, num_lockers);
2859         if (ret)
2860                 return ret;
2861
2862         if (*num_lockers == 0) {
2863                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
2864                 goto out;
2865         }
2866
2867         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
2868                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
2869                          lock_tag);
2870                 ret = -EBUSY;
2871                 goto out;
2872         }
2873
2874         if (lock_type == CEPH_CLS_LOCK_SHARED) {
2875                 rbd_warn(rbd_dev, "shared lock type detected");
2876                 ret = -EBUSY;
2877                 goto out;
2878         }
2879
2880         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
2881                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
2882                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2883                          (*lockers)[0].id.cookie);
2884                 ret = -EBUSY;
2885                 goto out;
2886         }
2887
2888 out:
2889         kfree(lock_tag);
2890         return ret;
2891 }
2892
2893 static int find_watcher(struct rbd_device *rbd_dev,
2894                         const struct ceph_locker *locker)
2895 {
2896         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2897         struct ceph_watch_item *watchers;
2898         u32 num_watchers;
2899         u64 cookie;
2900         int i;
2901         int ret;
2902
2903         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
2904                                       &rbd_dev->header_oloc, &watchers,
2905                                       &num_watchers);
2906         if (ret)
2907                 return ret;
2908
2909         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
2910         for (i = 0; i < num_watchers; i++) {
2911                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
2912                             sizeof(locker->info.addr)) &&
2913                     watchers[i].cookie == cookie) {
2914                         struct rbd_client_id cid = {
2915                                 .gid = le64_to_cpu(watchers[i].name.num),
2916                                 .handle = cookie,
2917                         };
2918
2919                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
2920                              rbd_dev, cid.gid, cid.handle);
2921                         rbd_set_owner_cid(rbd_dev, &cid);
2922                         ret = 1;
2923                         goto out;
2924                 }
2925         }
2926
2927         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
2928         ret = 0;
2929 out:
2930         kfree(watchers);
2931         return ret;
2932 }
2933
2934 /*
2935  * lock_rwsem must be held for write
2936  */
2937 static int rbd_try_lock(struct rbd_device *rbd_dev)
2938 {
2939         struct ceph_client *client = rbd_dev->rbd_client->client;
2940         struct ceph_locker *lockers;
2941         u32 num_lockers;
2942         int ret;
2943
2944         for (;;) {
2945                 ret = rbd_lock(rbd_dev);
2946                 if (ret != -EBUSY)
2947                         return ret;
2948
2949                 /* determine if the current lock holder is still alive */
2950                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
2951                 if (ret)
2952                         return ret;
2953
2954                 if (num_lockers == 0)
2955                         goto again;
2956
2957                 ret = find_watcher(rbd_dev, lockers);
2958                 if (ret) {
2959                         if (ret > 0)
2960                                 ret = 0; /* have to request lock */
2961                         goto out;
2962                 }
2963
2964                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
2965                          ENTITY_NAME(lockers[0].id.name));
2966
2967                 ret = ceph_monc_blacklist_add(&client->monc,
2968                                               &lockers[0].info.addr);
2969                 if (ret) {
2970                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
2971                                  ENTITY_NAME(lockers[0].id.name), ret);
2972                         goto out;
2973                 }
2974
2975                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
2976                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
2977                                           lockers[0].id.cookie,
2978                                           &lockers[0].id.name);
2979                 if (ret && ret != -ENOENT)
2980                         goto out;
2981
2982 again:
2983                 ceph_free_lockers(lockers, num_lockers);
2984         }
2985
2986 out:
2987         ceph_free_lockers(lockers, num_lockers);
2988         return ret;
2989 }
2990
2991 /*
2992  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2993  */
2994 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2995                                                 int *pret)
2996 {
2997         enum rbd_lock_state lock_state;
2998
2999         down_read(&rbd_dev->lock_rwsem);
3000         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3001              rbd_dev->lock_state);
3002         if (__rbd_is_lock_owner(rbd_dev)) {
3003                 lock_state = rbd_dev->lock_state;
3004                 up_read(&rbd_dev->lock_rwsem);
3005                 return lock_state;
3006         }
3007
3008         up_read(&rbd_dev->lock_rwsem);
3009         down_write(&rbd_dev->lock_rwsem);
3010         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3011              rbd_dev->lock_state);
3012         if (!__rbd_is_lock_owner(rbd_dev)) {
3013                 *pret = rbd_try_lock(rbd_dev);
3014                 if (*pret)
3015                         rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3016         }
3017
3018         lock_state = rbd_dev->lock_state;
3019         up_write(&rbd_dev->lock_rwsem);
3020         return lock_state;
3021 }
3022
3023 static void rbd_acquire_lock(struct work_struct *work)
3024 {
3025         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3026                                             struct rbd_device, lock_dwork);
3027         enum rbd_lock_state lock_state;
3028         int ret = 0;
3029
3030         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3031 again:
3032         lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3033         if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3034                 if (lock_state == RBD_LOCK_STATE_LOCKED)
3035                         wake_requests(rbd_dev, true);
3036                 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3037                      rbd_dev, lock_state, ret);
3038                 return;
3039         }
3040
3041         ret = rbd_request_lock(rbd_dev);
3042         if (ret == -ETIMEDOUT) {
3043                 goto again; /* treat this as a dead client */
3044         } else if (ret == -EROFS) {
3045                 rbd_warn(rbd_dev, "peer will not release lock");
3046                 /*
3047                  * If this is rbd_add_acquire_lock(), we want to fail
3048                  * immediately -- reuse BLACKLISTED flag.  Otherwise we
3049                  * want to block.
3050                  */
3051                 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3052                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3053                         /* wake "rbd map --exclusive" process */
3054                         wake_requests(rbd_dev, false);
3055                 }
3056         } else if (ret < 0) {
3057                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3058                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3059                                  RBD_RETRY_DELAY);
3060         } else {
3061                 /*
3062                  * lock owner acked, but resend if we don't see them
3063                  * release the lock
3064                  */
3065                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3066                      rbd_dev);
3067                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3068                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3069         }
3070 }
3071
3072 /*
3073  * lock_rwsem must be held for write
3074  */
3075 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3076 {
3077         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3078              rbd_dev->lock_state);
3079         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3080                 return false;
3081
3082         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3083         downgrade_write(&rbd_dev->lock_rwsem);
3084         /*
3085          * Ensure that all in-flight IO is flushed.
3086          *
3087          * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3088          * may be shared with other devices.
3089          */
3090         ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3091         up_read(&rbd_dev->lock_rwsem);
3092
3093         down_write(&rbd_dev->lock_rwsem);
3094         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3095              rbd_dev->lock_state);
3096         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3097                 return false;
3098
3099         rbd_unlock(rbd_dev);
3100         /*
3101          * Give others a chance to grab the lock - we would re-acquire
3102          * almost immediately if we got new IO during ceph_osdc_sync()
3103          * otherwise.  We need to ack our own notifications, so this
3104          * lock_dwork will be requeued from rbd_wait_state_locked()
3105          * after wake_requests() in rbd_handle_released_lock().
3106          */
3107         cancel_delayed_work(&rbd_dev->lock_dwork);
3108         return true;
3109 }
3110
3111 static void rbd_release_lock_work(struct work_struct *work)
3112 {
3113         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3114                                                   unlock_work);
3115
3116         down_write(&rbd_dev->lock_rwsem);
3117         rbd_release_lock(rbd_dev);
3118         up_write(&rbd_dev->lock_rwsem);
3119 }
3120
3121 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3122                                      void **p)
3123 {
3124         struct rbd_client_id cid = { 0 };
3125
3126         if (struct_v >= 2) {
3127                 cid.gid = ceph_decode_64(p);
3128                 cid.handle = ceph_decode_64(p);
3129         }
3130
3131         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3132              cid.handle);
3133         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3134                 down_write(&rbd_dev->lock_rwsem);
3135                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3136                         /*
3137                          * we already know that the remote client is
3138                          * the owner
3139                          */
3140                         up_write(&rbd_dev->lock_rwsem);
3141                         return;
3142                 }
3143
3144                 rbd_set_owner_cid(rbd_dev, &cid);
3145                 downgrade_write(&rbd_dev->lock_rwsem);
3146         } else {
3147                 down_read(&rbd_dev->lock_rwsem);
3148         }
3149
3150         if (!__rbd_is_lock_owner(rbd_dev))
3151                 wake_requests(rbd_dev, false);
3152         up_read(&rbd_dev->lock_rwsem);
3153 }
3154
3155 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3156                                      void **p)
3157 {
3158         struct rbd_client_id cid = { 0 };
3159
3160         if (struct_v >= 2) {
3161                 cid.gid = ceph_decode_64(p);
3162                 cid.handle = ceph_decode_64(p);
3163         }
3164
3165         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3166              cid.handle);
3167         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3168                 down_write(&rbd_dev->lock_rwsem);
3169                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3170                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3171                              __func__, rbd_dev, cid.gid, cid.handle,
3172                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3173                         up_write(&rbd_dev->lock_rwsem);
3174                         return;
3175                 }
3176
3177                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3178                 downgrade_write(&rbd_dev->lock_rwsem);
3179         } else {
3180                 down_read(&rbd_dev->lock_rwsem);
3181         }
3182
3183         if (!__rbd_is_lock_owner(rbd_dev))
3184                 wake_requests(rbd_dev, false);
3185         up_read(&rbd_dev->lock_rwsem);
3186 }
3187
3188 /*
3189  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3190  * ResponseMessage is needed.
3191  */
3192 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3193                                    void **p)
3194 {
3195         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3196         struct rbd_client_id cid = { 0 };
3197         int result = 1;
3198
3199         if (struct_v >= 2) {
3200                 cid.gid = ceph_decode_64(p);
3201                 cid.handle = ceph_decode_64(p);
3202         }
3203
3204         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3205              cid.handle);
3206         if (rbd_cid_equal(&cid, &my_cid))
3207                 return result;
3208
3209         down_read(&rbd_dev->lock_rwsem);
3210         if (__rbd_is_lock_owner(rbd_dev)) {
3211                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3212                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3213                         goto out_unlock;
3214
3215                 /*
3216                  * encode ResponseMessage(0) so the peer can detect
3217                  * a missing owner
3218                  */
3219                 result = 0;
3220
3221                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3222                         if (!rbd_dev->opts->exclusive) {
3223                                 dout("%s rbd_dev %p queueing unlock_work\n",
3224                                      __func__, rbd_dev);
3225                                 queue_work(rbd_dev->task_wq,
3226                                            &rbd_dev->unlock_work);
3227                         } else {
3228                                 /* refuse to release the lock */
3229                                 result = -EROFS;
3230                         }
3231                 }
3232         }
3233
3234 out_unlock:
3235         up_read(&rbd_dev->lock_rwsem);
3236         return result;
3237 }
3238
3239 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3240                                      u64 notify_id, u64 cookie, s32 *result)
3241 {
3242         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3243         char buf[4 + CEPH_ENCODING_START_BLK_LEN];
3244         int buf_size = sizeof(buf);
3245         int ret;
3246
3247         if (result) {
3248                 void *p = buf;
3249
3250                 /* encode ResponseMessage */
3251                 ceph_start_encoding(&p, 1, 1,
3252                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
3253                 ceph_encode_32(&p, *result);
3254         } else {
3255                 buf_size = 0;
3256         }
3257
3258         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3259                                    &rbd_dev->header_oloc, notify_id, cookie,
3260                                    buf, buf_size);
3261         if (ret)
3262                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3263 }
3264
3265 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3266                                    u64 cookie)
3267 {
3268         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3269         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3270 }
3271
3272 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3273                                           u64 notify_id, u64 cookie, s32 result)
3274 {
3275         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3276         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3277 }
3278
3279 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3280                          u64 notifier_id, void *data, size_t data_len)
3281 {
3282         struct rbd_device *rbd_dev = arg;
3283         void *p = data;
3284         void *const end = p + data_len;
3285         u8 struct_v = 0;
3286         u32 len;
3287         u32 notify_op;
3288         int ret;
3289
3290         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3291              __func__, rbd_dev, cookie, notify_id, data_len);
3292         if (data_len) {
3293                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3294                                           &struct_v, &len);
3295                 if (ret) {
3296                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3297                                  ret);
3298                         return;
3299                 }
3300
3301                 notify_op = ceph_decode_32(&p);
3302         } else {
3303                 /* legacy notification for header updates */
3304                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3305                 len = 0;
3306         }
3307
3308         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3309         switch (notify_op) {
3310         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3311                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3312                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3313                 break;
3314         case RBD_NOTIFY_OP_RELEASED_LOCK:
3315                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3316                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3317                 break;
3318         case RBD_NOTIFY_OP_REQUEST_LOCK:
3319                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3320                 if (ret <= 0)
3321                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3322                                                       cookie, ret);
3323                 else
3324                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3325                 break;
3326         case RBD_NOTIFY_OP_HEADER_UPDATE:
3327                 ret = rbd_dev_refresh(rbd_dev);
3328                 if (ret)
3329                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
3330
3331                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3332                 break;
3333         default:
3334                 if (rbd_is_lock_owner(rbd_dev))
3335                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3336                                                       cookie, -EOPNOTSUPP);
3337                 else
3338                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3339                 break;
3340         }
3341 }
3342
3343 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3344
3345 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3346 {
3347         struct rbd_device *rbd_dev = arg;
3348
3349         rbd_warn(rbd_dev, "encountered watch error: %d", err);
3350
3351         down_write(&rbd_dev->lock_rwsem);
3352         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3353         up_write(&rbd_dev->lock_rwsem);
3354
3355         mutex_lock(&rbd_dev->watch_mutex);
3356         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3357                 __rbd_unregister_watch(rbd_dev);
3358                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3359
3360                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3361         }
3362         mutex_unlock(&rbd_dev->watch_mutex);
3363 }
3364
3365 /*
3366  * watch_mutex must be locked
3367  */
3368 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3369 {
3370         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3371         struct ceph_osd_linger_request *handle;
3372
3373         rbd_assert(!rbd_dev->watch_handle);
3374         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3375
3376         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3377                                  &rbd_dev->header_oloc, rbd_watch_cb,
3378                                  rbd_watch_errcb, rbd_dev);
3379         if (IS_ERR(handle))
3380                 return PTR_ERR(handle);
3381
3382         rbd_dev->watch_handle = handle;
3383         return 0;
3384 }
3385
3386 /*
3387  * watch_mutex must be locked
3388  */
3389 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3390 {
3391         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3392         int ret;
3393
3394         rbd_assert(rbd_dev->watch_handle);
3395         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3396
3397         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3398         if (ret)
3399                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3400
3401         rbd_dev->watch_handle = NULL;
3402 }
3403
3404 static int rbd_register_watch(struct rbd_device *rbd_dev)
3405 {
3406         int ret;
3407
3408         mutex_lock(&rbd_dev->watch_mutex);
3409         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3410         ret = __rbd_register_watch(rbd_dev);
3411         if (ret)
3412                 goto out;
3413
3414         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3415         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3416
3417 out:
3418         mutex_unlock(&rbd_dev->watch_mutex);
3419         return ret;
3420 }
3421
3422 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3423 {
3424         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3425
3426         cancel_work_sync(&rbd_dev->acquired_lock_work);
3427         cancel_work_sync(&rbd_dev->released_lock_work);
3428         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3429         cancel_work_sync(&rbd_dev->unlock_work);
3430 }
3431
3432 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3433 {
3434         WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3435         cancel_tasks_sync(rbd_dev);
3436
3437         mutex_lock(&rbd_dev->watch_mutex);
3438         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3439                 __rbd_unregister_watch(rbd_dev);
3440         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3441         mutex_unlock(&rbd_dev->watch_mutex);
3442
3443         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3444         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3445 }
3446
3447 /*
3448  * lock_rwsem must be held for write
3449  */
3450 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3451 {
3452         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3453         char cookie[32];
3454         int ret;
3455
3456         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3457
3458         format_lock_cookie(rbd_dev, cookie);
3459         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3460                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
3461                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3462                                   RBD_LOCK_TAG, cookie);
3463         if (ret) {
3464                 if (ret != -EOPNOTSUPP)
3465                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3466                                  ret);
3467
3468                 /*
3469                  * Lock cookie cannot be updated on older OSDs, so do
3470                  * a manual release and queue an acquire.
3471                  */
3472                 if (rbd_release_lock(rbd_dev))
3473                         queue_delayed_work(rbd_dev->task_wq,
3474                                            &rbd_dev->lock_dwork, 0);
3475         } else {
3476                 __rbd_lock(rbd_dev, cookie);
3477         }
3478 }
3479
3480 static void rbd_reregister_watch(struct work_struct *work)
3481 {
3482         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3483                                             struct rbd_device, watch_dwork);
3484         int ret;
3485
3486         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3487
3488         mutex_lock(&rbd_dev->watch_mutex);
3489         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3490                 mutex_unlock(&rbd_dev->watch_mutex);
3491                 return;
3492         }
3493
3494         ret = __rbd_register_watch(rbd_dev);
3495         if (ret) {
3496                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3497                 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3498                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3499                         wake_requests(rbd_dev, true);
3500                 } else {
3501                         queue_delayed_work(rbd_dev->task_wq,
3502                                            &rbd_dev->watch_dwork,
3503                                            RBD_RETRY_DELAY);
3504                 }
3505                 mutex_unlock(&rbd_dev->watch_mutex);
3506                 return;
3507         }
3508
3509         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3510         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3511         mutex_unlock(&rbd_dev->watch_mutex);
3512
3513         down_write(&rbd_dev->lock_rwsem);
3514         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3515                 rbd_reacquire_lock(rbd_dev);
3516         up_write(&rbd_dev->lock_rwsem);
3517
3518         ret = rbd_dev_refresh(rbd_dev);
3519         if (ret)
3520                 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
3521 }
3522
3523 /*
3524  * Synchronous osd object method call.  Returns the number of bytes
3525  * returned in the outbound buffer, or a negative error code.
3526  */
3527 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3528                              struct ceph_object_id *oid,
3529                              struct ceph_object_locator *oloc,
3530                              const char *method_name,
3531                              const void *outbound,
3532                              size_t outbound_size,
3533                              void *inbound,
3534                              size_t inbound_size)
3535 {
3536         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3537         struct page *req_page = NULL;
3538         struct page *reply_page;
3539         int ret;
3540
3541         /*
3542          * Method calls are ultimately read operations.  The result
3543          * should placed into the inbound buffer provided.  They
3544          * also supply outbound data--parameters for the object
3545          * method.  Currently if this is present it will be a
3546          * snapshot id.
3547          */
3548         if (outbound) {
3549                 if (outbound_size > PAGE_SIZE)
3550                         return -E2BIG;
3551
3552                 req_page = alloc_page(GFP_KERNEL);
3553                 if (!req_page)
3554                         return -ENOMEM;
3555
3556                 memcpy(page_address(req_page), outbound, outbound_size);
3557         }
3558
3559         reply_page = alloc_page(GFP_KERNEL);
3560         if (!reply_page) {
3561                 if (req_page)
3562                         __free_page(req_page);
3563                 return -ENOMEM;
3564         }
3565
3566         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3567                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
3568                              reply_page, &inbound_size);
3569         if (!ret) {
3570                 memcpy(inbound, page_address(reply_page), inbound_size);
3571                 ret = inbound_size;
3572         }
3573
3574         if (req_page)
3575                 __free_page(req_page);
3576         __free_page(reply_page);
3577         return ret;
3578 }
3579
3580 /*
3581  * lock_rwsem must be held for read
3582  */
3583 static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
3584 {
3585         DEFINE_WAIT(wait);
3586         unsigned long timeout;
3587         int ret = 0;
3588
3589         if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
3590                 return -EBLACKLISTED;
3591
3592         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3593                 return 0;
3594
3595         if (!may_acquire) {
3596                 rbd_warn(rbd_dev, "exclusive lock required");
3597                 return -EROFS;
3598         }
3599
3600         do {
3601                 /*
3602                  * Note the use of mod_delayed_work() in rbd_acquire_lock()
3603                  * and cancel_delayed_work() in wake_requests().
3604                  */
3605                 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3606                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3607                 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3608                                           TASK_UNINTERRUPTIBLE);
3609                 up_read(&rbd_dev->lock_rwsem);
3610                 timeout = schedule_timeout(ceph_timeout_jiffies(
3611                                                 rbd_dev->opts->lock_timeout));
3612                 down_read(&rbd_dev->lock_rwsem);
3613                 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3614                         ret = -EBLACKLISTED;
3615                         break;
3616                 }
3617                 if (!timeout) {
3618                         rbd_warn(rbd_dev, "timed out waiting for lock");
3619                         ret = -ETIMEDOUT;
3620                         break;
3621                 }
3622         } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3623
3624         finish_wait(&rbd_dev->lock_waitq, &wait);
3625         return ret;
3626 }
3627
3628 static void rbd_queue_workfn(struct work_struct *work)
3629 {
3630         struct request *rq = blk_mq_rq_from_pdu(work);
3631         struct rbd_device *rbd_dev = rq->q->queuedata;
3632         struct rbd_img_request *img_request;
3633         struct ceph_snap_context *snapc = NULL;
3634         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3635         u64 length = blk_rq_bytes(rq);
3636         enum obj_operation_type op_type;
3637         u64 mapping_size;
3638         bool must_be_locked;
3639         int result;
3640
3641         switch (req_op(rq)) {
3642         case REQ_OP_DISCARD:
3643         case REQ_OP_WRITE_ZEROES:
3644                 op_type = OBJ_OP_DISCARD;
3645                 break;
3646         case REQ_OP_WRITE:
3647                 op_type = OBJ_OP_WRITE;
3648                 break;
3649         case REQ_OP_READ:
3650                 op_type = OBJ_OP_READ;
3651                 break;
3652         default:
3653                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3654                 result = -EIO;
3655                 goto err;
3656         }
3657
3658         /* Ignore/skip any zero-length requests */
3659
3660         if (!length) {
3661                 dout("%s: zero-length request\n", __func__);
3662                 result = 0;
3663                 goto err_rq;
3664         }
3665
3666         rbd_assert(op_type == OBJ_OP_READ ||
3667                    rbd_dev->spec->snap_id == CEPH_NOSNAP);
3668
3669         /*
3670          * Quit early if the mapped snapshot no longer exists.  It's
3671          * still possible the snapshot will have disappeared by the
3672          * time our request arrives at the osd, but there's no sense in
3673          * sending it if we already know.
3674          */
3675         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3676                 dout("request for non-existent snapshot");
3677                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3678                 result = -ENXIO;
3679                 goto err_rq;
3680         }
3681
3682         if (offset && length > U64_MAX - offset + 1) {
3683                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3684                          length);
3685                 result = -EINVAL;
3686                 goto err_rq;    /* Shouldn't happen */
3687         }
3688
3689         blk_mq_start_request(rq);
3690
3691         down_read(&rbd_dev->header_rwsem);
3692         mapping_size = rbd_dev->mapping.size;
3693         if (op_type != OBJ_OP_READ) {
3694                 snapc = rbd_dev->header.snapc;
3695                 ceph_get_snap_context(snapc);
3696         }
3697         up_read(&rbd_dev->header_rwsem);
3698
3699         if (offset + length > mapping_size) {
3700                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3701                          length, mapping_size);
3702                 result = -EIO;
3703                 goto err_rq;
3704         }
3705
3706         must_be_locked =
3707             (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3708             (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
3709         if (must_be_locked) {
3710                 down_read(&rbd_dev->lock_rwsem);
3711                 result = rbd_wait_state_locked(rbd_dev,
3712                                                !rbd_dev->opts->exclusive);
3713                 if (result)
3714                         goto err_unlock;
3715         }
3716
3717         img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
3718         if (!img_request) {
3719                 result = -ENOMEM;
3720                 goto err_unlock;
3721         }
3722         img_request->rq = rq;
3723         snapc = NULL; /* img_request consumes a ref */
3724
3725         if (op_type == OBJ_OP_DISCARD)
3726                 result = rbd_img_fill_nodata(img_request, offset, length);
3727         else
3728                 result = rbd_img_fill_from_bio(img_request, offset, length,
3729                                                rq->bio);
3730         if (result)
3731                 goto err_img_request;
3732
3733         rbd_img_request_submit(img_request);
3734         if (must_be_locked)
3735                 up_read(&rbd_dev->lock_rwsem);
3736         return;
3737
3738 err_img_request:
3739         rbd_img_request_put(img_request);
3740 err_unlock:
3741         if (must_be_locked)
3742                 up_read(&rbd_dev->lock_rwsem);
3743 err_rq:
3744         if (result)
3745                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3746                          obj_op_name(op_type), length, offset, result);
3747         ceph_put_snap_context(snapc);
3748 err:
3749         blk_mq_end_request(rq, errno_to_blk_status(result));
3750 }
3751
3752 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3753                 const struct blk_mq_queue_data *bd)
3754 {
3755         struct request *rq = bd->rq;
3756         struct work_struct *work = blk_mq_rq_to_pdu(rq);
3757
3758         queue_work(rbd_wq, work);
3759         return BLK_STS_OK;
3760 }
3761
3762 static void rbd_free_disk(struct rbd_device *rbd_dev)
3763 {
3764         blk_cleanup_queue(rbd_dev->disk->queue);
3765         blk_mq_free_tag_set(&rbd_dev->tag_set);
3766         put_disk(rbd_dev->disk);
3767         rbd_dev->disk = NULL;
3768 }
3769
3770 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3771                              struct ceph_object_id *oid,
3772                              struct ceph_object_locator *oloc,
3773                              void *buf, int buf_len)
3774
3775 {
3776         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3777         struct ceph_osd_request *req;
3778         struct page **pages;
3779         int num_pages = calc_pages_for(0, buf_len);
3780         int ret;
3781
3782         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
3783         if (!req)
3784                 return -ENOMEM;
3785
3786         ceph_oid_copy(&req->r_base_oid, oid);
3787         ceph_oloc_copy(&req->r_base_oloc, oloc);
3788         req->r_flags = CEPH_OSD_FLAG_READ;
3789
3790         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
3791         if (IS_ERR(pages)) {
3792                 ret = PTR_ERR(pages);
3793                 goto out_req;
3794         }
3795
3796         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
3797         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
3798                                          true);
3799
3800         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
3801         if (ret)
3802                 goto out_req;
3803
3804         ceph_osdc_start_request(osdc, req, false);
3805         ret = ceph_osdc_wait_request(osdc, req);
3806         if (ret >= 0)
3807                 ceph_copy_from_page_vector(pages, buf, 0, ret);
3808
3809 out_req:
3810         ceph_osdc_put_request(req);
3811         return ret;
3812 }
3813
3814 /*
3815  * Read the complete header for the given rbd device.  On successful
3816  * return, the rbd_dev->header field will contain up-to-date
3817  * information about the image.
3818  */
3819 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3820 {
3821         struct rbd_image_header_ondisk *ondisk = NULL;
3822         u32 snap_count = 0;
3823         u64 names_size = 0;
3824         u32 want_count;
3825         int ret;
3826
3827         /*
3828          * The complete header will include an array of its 64-bit
3829          * snapshot ids, followed by the names of those snapshots as
3830          * a contiguous block of NUL-terminated strings.  Note that
3831          * the number of snapshots could change by the time we read
3832          * it in, in which case we re-read it.
3833          */
3834         do {
3835                 size_t size;
3836
3837                 kfree(ondisk);
3838
3839                 size = sizeof (*ondisk);
3840                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3841                 size += names_size;
3842                 ondisk = kmalloc(size, GFP_KERNEL);
3843                 if (!ondisk)
3844                         return -ENOMEM;
3845
3846                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
3847                                         &rbd_dev->header_oloc, ondisk, size);
3848                 if (ret < 0)
3849                         goto out;
3850                 if ((size_t)ret < size) {
3851                         ret = -ENXIO;
3852                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3853                                 size, ret);
3854                         goto out;
3855                 }
3856                 if (!rbd_dev_ondisk_valid(ondisk)) {
3857                         ret = -ENXIO;
3858                         rbd_warn(rbd_dev, "invalid header");
3859                         goto out;
3860                 }
3861
3862                 names_size = le64_to_cpu(ondisk->snap_names_len);
3863                 want_count = snap_count;
3864                 snap_count = le32_to_cpu(ondisk->snap_count);
3865         } while (snap_count != want_count);
3866
3867         ret = rbd_header_from_disk(rbd_dev, ondisk);
3868 out:
3869         kfree(ondisk);
3870
3871         return ret;
3872 }
3873
3874 /*
3875  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3876  * has disappeared from the (just updated) snapshot context.
3877  */
3878 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3879 {
3880         u64 snap_id;
3881
3882         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3883                 return;
3884
3885         snap_id = rbd_dev->spec->snap_id;
3886         if (snap_id == CEPH_NOSNAP)
3887                 return;
3888
3889         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3890                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3891 }
3892
3893 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3894 {
3895         sector_t size;
3896
3897         /*
3898          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
3899          * try to update its size.  If REMOVING is set, updating size
3900          * is just useless work since the device can't be opened.
3901          */
3902         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
3903             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
3904                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3905                 dout("setting size to %llu sectors", (unsigned long long)size);
3906                 set_capacity(rbd_dev->disk, size);
3907                 revalidate_disk(rbd_dev->disk);
3908         }
3909 }
3910
3911 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3912 {
3913         u64 mapping_size;
3914         int ret;
3915
3916         down_write(&rbd_dev->header_rwsem);
3917         mapping_size = rbd_dev->mapping.size;
3918
3919         ret = rbd_dev_header_info(rbd_dev);
3920         if (ret)
3921                 goto out;
3922
3923         /*
3924          * If there is a parent, see if it has disappeared due to the
3925          * mapped image getting flattened.
3926          */
3927         if (rbd_dev->parent) {
3928                 ret = rbd_dev_v2_parent_info(rbd_dev);
3929                 if (ret)
3930                         goto out;
3931         }
3932
3933         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3934                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3935         } else {
3936                 /* validate mapped snapshot's EXISTS flag */
3937                 rbd_exists_validate(rbd_dev);
3938         }
3939
3940 out:
3941         up_write(&rbd_dev->header_rwsem);
3942         if (!ret && mapping_size != rbd_dev->mapping.size)
3943                 rbd_dev_update_size(rbd_dev);
3944
3945         return ret;
3946 }
3947
3948 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
3949                 unsigned int hctx_idx, unsigned int numa_node)
3950 {
3951         struct work_struct *work = blk_mq_rq_to_pdu(rq);
3952
3953         INIT_WORK(work, rbd_queue_workfn);
3954         return 0;
3955 }
3956
3957 static const struct blk_mq_ops rbd_mq_ops = {
3958         .queue_rq       = rbd_queue_rq,
3959         .init_request   = rbd_init_request,
3960 };
3961
3962 static int rbd_init_disk(struct rbd_device *rbd_dev)
3963 {
3964         struct gendisk *disk;
3965         struct request_queue *q;
3966         unsigned int objset_bytes =
3967             rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
3968         int err;
3969
3970         /* create gendisk info */
3971         disk = alloc_disk(single_major ?
3972                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3973                           RBD_MINORS_PER_MAJOR);
3974         if (!disk)
3975                 return -ENOMEM;
3976
3977         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3978                  rbd_dev->dev_id);
3979         disk->major = rbd_dev->major;
3980         disk->first_minor = rbd_dev->minor;
3981         if (single_major)
3982                 disk->flags |= GENHD_FL_EXT_DEVT;
3983         disk->fops = &rbd_bd_ops;
3984         disk->private_data = rbd_dev;
3985
3986         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3987         rbd_dev->tag_set.ops = &rbd_mq_ops;
3988         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
3989         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
3990         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
3991         rbd_dev->tag_set.nr_hw_queues = 1;
3992         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3993
3994         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3995         if (err)
3996                 goto out_disk;
3997
3998         q = blk_mq_init_queue(&rbd_dev->tag_set);
3999         if (IS_ERR(q)) {
4000                 err = PTR_ERR(q);
4001                 goto out_tag_set;
4002         }
4003
4004         blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
4005         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4006
4007         blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
4008         q->limits.max_sectors = queue_max_hw_sectors(q);
4009         blk_queue_max_segments(q, USHRT_MAX);
4010         blk_queue_max_segment_size(q, UINT_MAX);
4011         blk_queue_io_min(q, objset_bytes);
4012         blk_queue_io_opt(q, objset_bytes);
4013
4014         if (rbd_dev->opts->trim) {
4015                 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
4016                 q->limits.discard_granularity = objset_bytes;
4017                 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
4018                 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
4019         }
4020
4021         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4022                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4023
4024         /*
4025          * disk_release() expects a queue ref from add_disk() and will
4026          * put it.  Hold an extra ref until add_disk() is called.
4027          */
4028         WARN_ON(!blk_get_queue(q));
4029         disk->queue = q;
4030         q->queuedata = rbd_dev;
4031
4032         rbd_dev->disk = disk;
4033
4034         return 0;
4035 out_tag_set:
4036         blk_mq_free_tag_set(&rbd_dev->tag_set);
4037 out_disk:
4038         put_disk(disk);
4039         return err;
4040 }
4041
4042 /*
4043   sysfs
4044 */
4045
4046 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4047 {
4048         return container_of(dev, struct rbd_device, dev);
4049 }
4050
4051 static ssize_t rbd_size_show(struct device *dev,
4052                              struct device_attribute *attr, char *buf)
4053 {
4054         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4055
4056         return sprintf(buf, "%llu\n",
4057                 (unsigned long long)rbd_dev->mapping.size);
4058 }
4059
4060 /*
4061  * Note this shows the features for whatever's mapped, which is not
4062  * necessarily the base image.
4063  */
4064 static ssize_t rbd_features_show(struct device *dev,
4065                              struct device_attribute *attr, char *buf)
4066 {
4067         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4068
4069         return sprintf(buf, "0x%016llx\n",
4070                         (unsigned long long)rbd_dev->mapping.features);
4071 }
4072
4073 static ssize_t rbd_major_show(struct device *dev,
4074                               struct device_attribute *attr, char *buf)
4075 {
4076         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4077
4078         if (rbd_dev->major)
4079                 return sprintf(buf, "%d\n", rbd_dev->major);
4080
4081         return sprintf(buf, "(none)\n");
4082 }
4083
4084 static ssize_t rbd_minor_show(struct device *dev,
4085                               struct device_attribute *attr, char *buf)
4086 {
4087         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4088
4089         return sprintf(buf, "%d\n", rbd_dev->minor);
4090 }
4091
4092 static ssize_t rbd_client_addr_show(struct device *dev,
4093                                     struct device_attribute *attr, char *buf)
4094 {
4095         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4096         struct ceph_entity_addr *client_addr =
4097             ceph_client_addr(rbd_dev->rbd_client->client);
4098
4099         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4100                        le32_to_cpu(client_addr->nonce));
4101 }
4102
4103 static ssize_t rbd_client_id_show(struct device *dev,
4104                                   struct device_attribute *attr, char *buf)
4105 {
4106         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4107
4108         return sprintf(buf, "client%lld\n",
4109                        ceph_client_gid(rbd_dev->rbd_client->client));
4110 }
4111
4112 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4113                                      struct device_attribute *attr, char *buf)
4114 {
4115         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4116
4117         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4118 }
4119
4120 static ssize_t rbd_config_info_show(struct device *dev,
4121                                     struct device_attribute *attr, char *buf)
4122 {
4123         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4124
4125         return sprintf(buf, "%s\n", rbd_dev->config_info);
4126 }
4127
4128 static ssize_t rbd_pool_show(struct device *dev,
4129                              struct device_attribute *attr, char *buf)
4130 {
4131         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4132
4133         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4134 }
4135
4136 static ssize_t rbd_pool_id_show(struct device *dev,
4137                              struct device_attribute *attr, char *buf)
4138 {
4139         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4140
4141         return sprintf(buf, "%llu\n",
4142                         (unsigned long long) rbd_dev->spec->pool_id);
4143 }
4144
4145 static ssize_t rbd_pool_ns_show(struct device *dev,
4146                                 struct device_attribute *attr, char *buf)
4147 {
4148         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4149
4150         return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
4151 }
4152
4153 static ssize_t rbd_name_show(struct device *dev,
4154                              struct device_attribute *attr, char *buf)
4155 {
4156         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4157
4158         if (rbd_dev->spec->image_name)
4159                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4160
4161         return sprintf(buf, "(unknown)\n");
4162 }
4163
4164 static ssize_t rbd_image_id_show(struct device *dev,
4165                              struct device_attribute *attr, char *buf)
4166 {
4167         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4168
4169         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4170 }
4171
4172 /*
4173  * Shows the name of the currently-mapped snapshot (or
4174  * RBD_SNAP_HEAD_NAME for the base image).
4175  */
4176 static ssize_t rbd_snap_show(struct device *dev,
4177                              struct device_attribute *attr,
4178                              char *buf)
4179 {
4180         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4181
4182         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4183 }
4184
4185 static ssize_t rbd_snap_id_show(struct device *dev,
4186                                 struct device_attribute *attr, char *buf)
4187 {
4188         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4189
4190         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4191 }
4192
4193 /*
4194  * For a v2 image, shows the chain of parent images, separated by empty
4195  * lines.  For v1 images or if there is no parent, shows "(no parent
4196  * image)".
4197  */
4198 static ssize_t rbd_parent_show(struct device *dev,
4199                                struct device_attribute *attr,
4200                                char *buf)
4201 {
4202         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4203         ssize_t count = 0;
4204
4205         if (!rbd_dev->parent)
4206                 return sprintf(buf, "(no parent image)\n");
4207
4208         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4209                 struct rbd_spec *spec = rbd_dev->parent_spec;
4210
4211                 count += sprintf(&buf[count], "%s"
4212                             "pool_id %llu\npool_name %s\n"
4213                             "pool_ns %s\n"
4214                             "image_id %s\nimage_name %s\n"
4215                             "snap_id %llu\nsnap_name %s\n"
4216                             "overlap %llu\n",
4217                             !count ? "" : "\n", /* first? */
4218                             spec->pool_id, spec->pool_name,
4219                             spec->pool_ns ?: "",
4220                             spec->image_id, spec->image_name ?: "(unknown)",
4221                             spec->snap_id, spec->snap_name,
4222                             rbd_dev->parent_overlap);
4223         }
4224
4225         return count;
4226 }
4227
4228 static ssize_t rbd_image_refresh(struct device *dev,
4229                                  struct device_attribute *attr,
4230                                  const char *buf,
4231                                  size_t size)
4232 {
4233         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4234         int ret;
4235
4236         ret = rbd_dev_refresh(rbd_dev);
4237         if (ret)
4238                 return ret;
4239
4240         return size;
4241 }
4242
4243 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
4244 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
4245 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
4246 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
4247 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
4248 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
4249 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
4250 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
4251 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
4252 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
4253 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
4254 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
4255 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
4256 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
4257 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
4258 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
4259 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
4260
4261 static struct attribute *rbd_attrs[] = {
4262         &dev_attr_size.attr,
4263         &dev_attr_features.attr,
4264         &dev_attr_major.attr,
4265         &dev_attr_minor.attr,
4266         &dev_attr_client_addr.attr,
4267         &dev_attr_client_id.attr,
4268         &dev_attr_cluster_fsid.attr,
4269         &dev_attr_config_info.attr,
4270         &dev_attr_pool.attr,
4271         &dev_attr_pool_id.attr,
4272         &dev_attr_pool_ns.attr,
4273         &dev_attr_name.attr,
4274         &dev_attr_image_id.attr,
4275         &dev_attr_current_snap.attr,
4276         &dev_attr_snap_id.attr,
4277         &dev_attr_parent.attr,
4278         &dev_attr_refresh.attr,
4279         NULL
4280 };
4281
4282 static struct attribute_group rbd_attr_group = {
4283         .attrs = rbd_attrs,
4284 };
4285
4286 static const struct attribute_group *rbd_attr_groups[] = {
4287         &rbd_attr_group,
4288         NULL
4289 };
4290
4291 static void rbd_dev_release(struct device *dev);
4292
4293 static const struct device_type rbd_device_type = {
4294         .name           = "rbd",
4295         .groups         = rbd_attr_groups,
4296         .release        = rbd_dev_release,
4297 };
4298
4299 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4300 {
4301         kref_get(&spec->kref);
4302
4303         return spec;
4304 }
4305
4306 static void rbd_spec_free(struct kref *kref);
4307 static void rbd_spec_put(struct rbd_spec *spec)
4308 {
4309         if (spec)
4310                 kref_put(&spec->kref, rbd_spec_free);
4311 }
4312
4313 static struct rbd_spec *rbd_spec_alloc(void)
4314 {
4315         struct rbd_spec *spec;
4316
4317         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4318         if (!spec)
4319                 return NULL;
4320
4321         spec->pool_id = CEPH_NOPOOL;
4322         spec->snap_id = CEPH_NOSNAP;
4323         kref_init(&spec->kref);
4324
4325         return spec;
4326 }
4327
4328 static void rbd_spec_free(struct kref *kref)
4329 {
4330         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4331
4332         kfree(spec->pool_name);
4333         kfree(spec->pool_ns);
4334         kfree(spec->image_id);
4335         kfree(spec->image_name);
4336         kfree(spec->snap_name);
4337         kfree(spec);
4338 }
4339
4340 static void rbd_dev_free(struct rbd_device *rbd_dev)
4341 {
4342         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4343         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4344
4345         ceph_oid_destroy(&rbd_dev->header_oid);
4346         ceph_oloc_destroy(&rbd_dev->header_oloc);
4347         kfree(rbd_dev->config_info);
4348
4349         rbd_put_client(rbd_dev->rbd_client);
4350         rbd_spec_put(rbd_dev->spec);
4351         kfree(rbd_dev->opts);
4352         kfree(rbd_dev);
4353 }
4354
4355 static void rbd_dev_release(struct device *dev)
4356 {
4357         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4358         bool need_put = !!rbd_dev->opts;
4359
4360         if (need_put) {
4361                 destroy_workqueue(rbd_dev->task_wq);
4362                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4363         }
4364
4365         rbd_dev_free(rbd_dev);
4366
4367         /*
4368          * This is racy, but way better than putting module outside of
4369          * the release callback.  The race window is pretty small, so
4370          * doing something similar to dm (dm-builtin.c) is overkill.
4371          */
4372         if (need_put)
4373                 module_put(THIS_MODULE);
4374 }
4375
4376 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4377                                            struct rbd_spec *spec)
4378 {
4379         struct rbd_device *rbd_dev;
4380
4381         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4382         if (!rbd_dev)
4383                 return NULL;
4384
4385         spin_lock_init(&rbd_dev->lock);
4386         INIT_LIST_HEAD(&rbd_dev->node);
4387         init_rwsem(&rbd_dev->header_rwsem);
4388
4389         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4390         ceph_oid_init(&rbd_dev->header_oid);
4391         rbd_dev->header_oloc.pool = spec->pool_id;
4392         if (spec->pool_ns) {
4393                 WARN_ON(!*spec->pool_ns);
4394                 rbd_dev->header_oloc.pool_ns =
4395                     ceph_find_or_create_string(spec->pool_ns,
4396                                                strlen(spec->pool_ns));
4397         }
4398
4399         mutex_init(&rbd_dev->watch_mutex);
4400         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4401         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4402
4403         init_rwsem(&rbd_dev->lock_rwsem);
4404         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4405         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4406         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4407         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4408         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4409         init_waitqueue_head(&rbd_dev->lock_waitq);
4410
4411         rbd_dev->dev.bus = &rbd_bus_type;
4412         rbd_dev->dev.type = &rbd_device_type;
4413         rbd_dev->dev.parent = &rbd_root_dev;
4414         device_initialize(&rbd_dev->dev);
4415
4416         rbd_dev->rbd_client = rbdc;
4417         rbd_dev->spec = spec;
4418
4419         return rbd_dev;
4420 }
4421
4422 /*
4423  * Create a mapping rbd_dev.
4424  */
4425 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4426                                          struct rbd_spec *spec,
4427                                          struct rbd_options *opts)
4428 {
4429         struct rbd_device *rbd_dev;
4430
4431         rbd_dev = __rbd_dev_create(rbdc, spec);
4432         if (!rbd_dev)
4433                 return NULL;
4434
4435         rbd_dev->opts = opts;
4436
4437         /* get an id and fill in device name */
4438         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4439                                          minor_to_rbd_dev_id(1 << MINORBITS),
4440                                          GFP_KERNEL);
4441         if (rbd_dev->dev_id < 0)
4442                 goto fail_rbd_dev;
4443
4444         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4445         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4446                                                    rbd_dev->name);
4447         if (!rbd_dev->task_wq)
4448                 goto fail_dev_id;
4449
4450         /* we have a ref from do_rbd_add() */
4451         __module_get(THIS_MODULE);
4452
4453         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4454         return rbd_dev;
4455
4456 fail_dev_id:
4457         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4458 fail_rbd_dev:
4459         rbd_dev_free(rbd_dev);
4460         return NULL;
4461 }
4462
4463 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4464 {
4465         if (rbd_dev)
4466                 put_device(&rbd_dev->dev);
4467 }
4468
4469 /*
4470  * Get the size and object order for an image snapshot, or if
4471  * snap_id is CEPH_NOSNAP, gets this information for the base
4472  * image.
4473  */
4474 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4475                                 u8 *order, u64 *snap_size)
4476 {
4477         __le64 snapid = cpu_to_le64(snap_id);
4478         int ret;
4479         struct {
4480                 u8 order;
4481                 __le64 size;
4482         } __attribute__ ((packed)) size_buf = { 0 };
4483
4484         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4485                                   &rbd_dev->header_oloc, "get_size",
4486                                   &snapid, sizeof(snapid),
4487                                   &size_buf, sizeof(size_buf));
4488         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4489         if (ret < 0)
4490                 return ret;
4491         if (ret < sizeof (size_buf))
4492                 return -ERANGE;
4493
4494         if (order) {
4495                 *order = size_buf.order;
4496                 dout("  order %u", (unsigned int)*order);
4497         }
4498         *snap_size = le64_to_cpu(size_buf.size);
4499
4500         dout("  snap_id 0x%016llx snap_size = %llu\n",
4501                 (unsigned long long)snap_id,
4502                 (unsigned long long)*snap_size);
4503
4504         return 0;
4505 }
4506
4507 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4508 {
4509         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4510                                         &rbd_dev->header.obj_order,
4511                                         &rbd_dev->header.image_size);
4512 }
4513
4514 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4515 {
4516         void *reply_buf;
4517         int ret;
4518         void *p;
4519
4520         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4521         if (!reply_buf)
4522                 return -ENOMEM;
4523
4524         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4525                                   &rbd_dev->header_oloc, "get_object_prefix",
4526                                   NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4527         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4528         if (ret < 0)
4529                 goto out;
4530
4531         p = reply_buf;
4532         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4533                                                 p + ret, NULL, GFP_NOIO);
4534         ret = 0;
4535
4536         if (IS_ERR(rbd_dev->header.object_prefix)) {
4537                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4538                 rbd_dev->header.object_prefix = NULL;
4539         } else {
4540                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4541         }
4542 out:
4543         kfree(reply_buf);
4544
4545         return ret;
4546 }
4547
4548 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4549                 u64 *snap_features)
4550 {
4551         __le64 snapid = cpu_to_le64(snap_id);
4552         struct {
4553                 __le64 features;
4554                 __le64 incompat;
4555         } __attribute__ ((packed)) features_buf = { 0 };
4556         u64 unsup;
4557         int ret;
4558
4559         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4560                                   &rbd_dev->header_oloc, "get_features",
4561                                   &snapid, sizeof(snapid),
4562                                   &features_buf, sizeof(features_buf));
4563         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4564         if (ret < 0)
4565                 return ret;
4566         if (ret < sizeof (features_buf))
4567                 return -ERANGE;
4568
4569         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4570         if (unsup) {
4571                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4572                          unsup);
4573                 return -ENXIO;
4574         }
4575
4576         *snap_features = le64_to_cpu(features_buf.features);
4577
4578         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4579                 (unsigned long long)snap_id,
4580                 (unsigned long long)*snap_features,
4581                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4582
4583         return 0;
4584 }
4585
4586 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4587 {
4588         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4589                                                 &rbd_dev->header.features);
4590 }
4591
4592 struct parent_image_info {
4593         u64             pool_id;
4594         const char      *pool_ns;
4595         const char      *image_id;
4596         u64             snap_id;
4597
4598         bool            has_overlap;
4599         u64             overlap;
4600 };
4601
4602 /*
4603  * The caller is responsible for @pii.
4604  */
4605 static int decode_parent_image_spec(void **p, void *end,
4606                                     struct parent_image_info *pii)
4607 {
4608         u8 struct_v;
4609         u32 struct_len;
4610         int ret;
4611
4612         ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
4613                                   &struct_v, &struct_len);
4614         if (ret)
4615                 return ret;
4616
4617         ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
4618         pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
4619         if (IS_ERR(pii->pool_ns)) {
4620                 ret = PTR_ERR(pii->pool_ns);
4621                 pii->pool_ns = NULL;
4622                 return ret;
4623         }
4624         pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
4625         if (IS_ERR(pii->image_id)) {
4626                 ret = PTR_ERR(pii->image_id);
4627                 pii->image_id = NULL;
4628                 return ret;
4629         }
4630         ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
4631         return 0;
4632
4633 e_inval:
4634         return -EINVAL;
4635 }
4636
4637 static int __get_parent_info(struct rbd_device *rbd_dev,
4638                              struct page *req_page,
4639                              struct page *reply_page,
4640                              struct parent_image_info *pii)
4641 {
4642         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4643         size_t reply_len = PAGE_SIZE;
4644         void *p, *end;
4645         int ret;
4646
4647         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
4648                              "rbd", "parent_get", CEPH_OSD_FLAG_READ,
4649                              req_page, sizeof(u64), reply_page, &reply_len);
4650         if (ret)
4651                 return ret == -EOPNOTSUPP ? 1 : ret;
4652
4653         p = page_address(reply_page);
4654         end = p + reply_len;
4655         ret = decode_parent_image_spec(&p, end, pii);
4656         if (ret)
4657                 return ret;
4658
4659         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
4660                              "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
4661                              req_page, sizeof(u64), reply_page, &reply_len);
4662         if (ret)
4663                 return ret;
4664
4665         p = page_address(reply_page);
4666         end = p + reply_len;
4667         ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
4668         if (pii->has_overlap)
4669                 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
4670
4671         return 0;
4672
4673 e_inval:
4674         return -EINVAL;
4675 }
4676
4677 /*
4678  * The caller is responsible for @pii.
4679  */
4680 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
4681                                     struct page *req_page,
4682                                     struct page *reply_page,
4683                                     struct parent_image_info *pii)
4684 {
4685         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4686         size_t reply_len = PAGE_SIZE;
4687         void *p, *end;
4688         int ret;
4689
4690         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
4691                              "rbd", "get_parent", CEPH_OSD_FLAG_READ,
4692                              req_page, sizeof(u64), reply_page, &reply_len);
4693         if (ret)
4694                 return ret;
4695
4696         p = page_address(reply_page);
4697         end = p + reply_len;
4698         ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
4699         pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4700         if (IS_ERR(pii->image_id)) {
4701                 ret = PTR_ERR(pii->image_id);
4702                 pii->image_id = NULL;
4703                 return ret;
4704         }
4705         ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
4706         pii->has_overlap = true;
4707         ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
4708
4709         return 0;
4710
4711 e_inval:
4712         return -EINVAL;
4713 }
4714
4715 static int get_parent_info(struct rbd_device *rbd_dev,
4716                            struct parent_image_info *pii)
4717 {
4718         struct page *req_page, *reply_page;
4719         void *p;
4720         int ret;
4721
4722         req_page = alloc_page(GFP_KERNEL);
4723         if (!req_page)
4724                 return -ENOMEM;
4725
4726         reply_page = alloc_page(GFP_KERNEL);
4727         if (!reply_page) {
4728                 __free_page(req_page);
4729                 return -ENOMEM;
4730         }
4731
4732         p = page_address(req_page);
4733         ceph_encode_64(&p, rbd_dev->spec->snap_id);
4734         ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
4735         if (ret > 0)
4736                 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
4737                                                pii);
4738
4739         __free_page(req_page);
4740         __free_page(reply_page);
4741         return ret;
4742 }
4743
4744 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4745 {
4746         struct rbd_spec *parent_spec;
4747         struct parent_image_info pii = { 0 };
4748         int ret;
4749
4750         parent_spec = rbd_spec_alloc();
4751         if (!parent_spec)
4752                 return -ENOMEM;
4753
4754         ret = get_parent_info(rbd_dev, &pii);
4755         if (ret)
4756                 goto out_err;
4757
4758         dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
4759              __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
4760              pii.has_overlap, pii.overlap);
4761
4762         if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
4763                 /*
4764                  * Either the parent never existed, or we have
4765                  * record of it but the image got flattened so it no
4766                  * longer has a parent.  When the parent of a
4767                  * layered image disappears we immediately set the
4768                  * overlap to 0.  The effect of this is that all new
4769                  * requests will be treated as if the image had no
4770                  * parent.
4771                  *
4772                  * If !pii.has_overlap, the parent image spec is not
4773                  * applicable.  It's there to avoid duplication in each
4774                  * snapshot record.
4775                  */
4776                 if (rbd_dev->parent_overlap) {
4777                         rbd_dev->parent_overlap = 0;
4778                         rbd_dev_parent_put(rbd_dev);
4779                         pr_info("%s: clone image has been flattened\n",
4780                                 rbd_dev->disk->disk_name);
4781                 }
4782
4783                 goto out;       /* No parent?  No problem. */
4784         }
4785
4786         /* The ceph file layout needs to fit pool id in 32 bits */
4787
4788         ret = -EIO;
4789         if (pii.pool_id > (u64)U32_MAX) {
4790                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4791                         (unsigned long long)pii.pool_id, U32_MAX);
4792                 goto out_err;
4793         }
4794
4795         /*
4796          * The parent won't change (except when the clone is
4797          * flattened, already handled that).  So we only need to
4798          * record the parent spec we have not already done so.
4799          */
4800         if (!rbd_dev->parent_spec) {
4801                 parent_spec->pool_id = pii.pool_id;
4802                 if (pii.pool_ns && *pii.pool_ns) {
4803                         parent_spec->pool_ns = pii.pool_ns;
4804                         pii.pool_ns = NULL;
4805                 }
4806                 parent_spec->image_id = pii.image_id;
4807                 pii.image_id = NULL;
4808                 parent_spec->snap_id = pii.snap_id;
4809
4810                 rbd_dev->parent_spec = parent_spec;
4811                 parent_spec = NULL;     /* rbd_dev now owns this */
4812         }
4813
4814         /*
4815          * We always update the parent overlap.  If it's zero we issue
4816          * a warning, as we will proceed as if there was no parent.
4817          */
4818         if (!pii.overlap) {
4819                 if (parent_spec) {
4820                         /* refresh, careful to warn just once */
4821                         if (rbd_dev->parent_overlap)
4822                                 rbd_warn(rbd_dev,
4823                                     "clone now standalone (overlap became 0)");
4824                 } else {
4825                         /* initial probe */
4826                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
4827                 }
4828         }
4829         rbd_dev->parent_overlap = pii.overlap;
4830
4831 out:
4832         ret = 0;
4833 out_err:
4834         kfree(pii.pool_ns);
4835         kfree(pii.image_id);
4836         rbd_spec_put(parent_spec);
4837         return ret;
4838 }
4839
4840 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4841 {
4842         struct {
4843                 __le64 stripe_unit;
4844                 __le64 stripe_count;
4845         } __attribute__ ((packed)) striping_info_buf = { 0 };
4846         size_t size = sizeof (striping_info_buf);
4847         void *p;
4848         int ret;
4849
4850         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4851                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
4852                                 NULL, 0, &striping_info_buf, size);
4853         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4854         if (ret < 0)
4855                 return ret;
4856         if (ret < size)
4857                 return -ERANGE;
4858
4859         p = &striping_info_buf;
4860         rbd_dev->header.stripe_unit = ceph_decode_64(&p);
4861         rbd_dev->header.stripe_count = ceph_decode_64(&p);
4862         return 0;
4863 }
4864
4865 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
4866 {
4867         __le64 data_pool_id;
4868         int ret;
4869
4870         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4871                                   &rbd_dev->header_oloc, "get_data_pool",
4872                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
4873         if (ret < 0)
4874                 return ret;
4875         if (ret < sizeof(data_pool_id))
4876                 return -EBADMSG;
4877
4878         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
4879         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
4880         return 0;
4881 }
4882
4883 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4884 {
4885         CEPH_DEFINE_OID_ONSTACK(oid);
4886         size_t image_id_size;
4887         char *image_id;
4888         void *p;
4889         void *end;
4890         size_t size;
4891         void *reply_buf = NULL;
4892         size_t len = 0;
4893         char *image_name = NULL;
4894         int ret;
4895
4896         rbd_assert(!rbd_dev->spec->image_name);
4897
4898         len = strlen(rbd_dev->spec->image_id);
4899         image_id_size = sizeof (__le32) + len;
4900         image_id = kmalloc(image_id_size, GFP_KERNEL);
4901         if (!image_id)
4902                 return NULL;
4903
4904         p = image_id;
4905         end = image_id + image_id_size;
4906         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4907
4908         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4909         reply_buf = kmalloc(size, GFP_KERNEL);
4910         if (!reply_buf)
4911                 goto out;
4912
4913         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
4914         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
4915                                   "dir_get_name", image_id, image_id_size,
4916                                   reply_buf, size);
4917         if (ret < 0)
4918                 goto out;
4919         p = reply_buf;
4920         end = reply_buf + ret;
4921
4922         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4923         if (IS_ERR(image_name))
4924                 image_name = NULL;
4925         else
4926                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4927 out:
4928         kfree(reply_buf);
4929         kfree(image_id);
4930
4931         return image_name;
4932 }
4933
4934 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4935 {
4936         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4937         const char *snap_name;
4938         u32 which = 0;
4939
4940         /* Skip over names until we find the one we are looking for */
4941
4942         snap_name = rbd_dev->header.snap_names;
4943         while (which < snapc->num_snaps) {
4944                 if (!strcmp(name, snap_name))
4945                         return snapc->snaps[which];
4946                 snap_name += strlen(snap_name) + 1;
4947                 which++;
4948         }
4949         return CEPH_NOSNAP;
4950 }
4951
4952 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4953 {
4954         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4955         u32 which;
4956         bool found = false;
4957         u64 snap_id;
4958
4959         for (which = 0; !found && which < snapc->num_snaps; which++) {
4960                 const char *snap_name;
4961
4962                 snap_id = snapc->snaps[which];
4963                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4964                 if (IS_ERR(snap_name)) {
4965                         /* ignore no-longer existing snapshots */
4966                         if (PTR_ERR(snap_name) == -ENOENT)
4967                                 continue;
4968                         else
4969                                 break;
4970                 }
4971                 found = !strcmp(name, snap_name);
4972                 kfree(snap_name);
4973         }
4974         return found ? snap_id : CEPH_NOSNAP;
4975 }
4976
4977 /*
4978  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4979  * no snapshot by that name is found, or if an error occurs.
4980  */
4981 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4982 {
4983         if (rbd_dev->image_format == 1)
4984                 return rbd_v1_snap_id_by_name(rbd_dev, name);
4985
4986         return rbd_v2_snap_id_by_name(rbd_dev, name);
4987 }
4988
4989 /*
4990  * An image being mapped will have everything but the snap id.
4991  */
4992 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4993 {
4994         struct rbd_spec *spec = rbd_dev->spec;
4995
4996         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4997         rbd_assert(spec->image_id && spec->image_name);
4998         rbd_assert(spec->snap_name);
4999
5000         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5001                 u64 snap_id;
5002
5003                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5004                 if (snap_id == CEPH_NOSNAP)
5005                         return -ENOENT;
5006
5007                 spec->snap_id = snap_id;
5008         } else {
5009                 spec->snap_id = CEPH_NOSNAP;
5010         }
5011
5012         return 0;
5013 }
5014
5015 /*
5016  * A parent image will have all ids but none of the names.
5017  *
5018  * All names in an rbd spec are dynamically allocated.  It's OK if we
5019  * can't figure out the name for an image id.
5020  */
5021 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5022 {
5023         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5024         struct rbd_spec *spec = rbd_dev->spec;
5025         const char *pool_name;
5026         const char *image_name;
5027         const char *snap_name;
5028         int ret;
5029
5030         rbd_assert(spec->pool_id != CEPH_NOPOOL);
5031         rbd_assert(spec->image_id);
5032         rbd_assert(spec->snap_id != CEPH_NOSNAP);
5033
5034         /* Get the pool name; we have to make our own copy of this */
5035
5036         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5037         if (!pool_name) {
5038                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5039                 return -EIO;
5040         }
5041         pool_name = kstrdup(pool_name, GFP_KERNEL);
5042         if (!pool_name)
5043                 return -ENOMEM;
5044
5045         /* Fetch the image name; tolerate failure here */
5046
5047         image_name = rbd_dev_image_name(rbd_dev);
5048         if (!image_name)
5049                 rbd_warn(rbd_dev, "unable to get image name");
5050
5051         /* Fetch the snapshot name */
5052
5053         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5054         if (IS_ERR(snap_name)) {
5055                 ret = PTR_ERR(snap_name);
5056                 goto out_err;
5057         }
5058
5059         spec->pool_name = pool_name;
5060         spec->image_name = image_name;
5061         spec->snap_name = snap_name;
5062
5063         return 0;
5064
5065 out_err:
5066         kfree(image_name);
5067         kfree(pool_name);
5068         return ret;
5069 }
5070
5071 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5072 {
5073         size_t size;
5074         int ret;
5075         void *reply_buf;
5076         void *p;
5077         void *end;
5078         u64 seq;
5079         u32 snap_count;
5080         struct ceph_snap_context *snapc;
5081         u32 i;
5082
5083         /*
5084          * We'll need room for the seq value (maximum snapshot id),
5085          * snapshot count, and array of that many snapshot ids.
5086          * For now we have a fixed upper limit on the number we're
5087          * prepared to receive.
5088          */
5089         size = sizeof (__le64) + sizeof (__le32) +
5090                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
5091         reply_buf = kzalloc(size, GFP_KERNEL);
5092         if (!reply_buf)
5093                 return -ENOMEM;
5094
5095         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5096                                   &rbd_dev->header_oloc, "get_snapcontext",
5097                                   NULL, 0, reply_buf, size);
5098         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5099         if (ret < 0)
5100                 goto out;
5101
5102         p = reply_buf;
5103         end = reply_buf + ret;
5104         ret = -ERANGE;
5105         ceph_decode_64_safe(&p, end, seq, out);
5106         ceph_decode_32_safe(&p, end, snap_count, out);
5107
5108         /*
5109          * Make sure the reported number of snapshot ids wouldn't go
5110          * beyond the end of our buffer.  But before checking that,
5111          * make sure the computed size of the snapshot context we
5112          * allocate is representable in a size_t.
5113          */
5114         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5115                                  / sizeof (u64)) {
5116                 ret = -EINVAL;
5117                 goto out;
5118         }
5119         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5120                 goto out;
5121         ret = 0;
5122
5123         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5124         if (!snapc) {
5125                 ret = -ENOMEM;
5126                 goto out;
5127         }
5128         snapc->seq = seq;
5129         for (i = 0; i < snap_count; i++)
5130                 snapc->snaps[i] = ceph_decode_64(&p);
5131
5132         ceph_put_snap_context(rbd_dev->header.snapc);
5133         rbd_dev->header.snapc = snapc;
5134
5135         dout("  snap context seq = %llu, snap_count = %u\n",
5136                 (unsigned long long)seq, (unsigned int)snap_count);
5137 out:
5138         kfree(reply_buf);
5139
5140         return ret;
5141 }
5142
5143 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5144                                         u64 snap_id)
5145 {
5146         size_t size;
5147         void *reply_buf;
5148         __le64 snapid;
5149         int ret;
5150         void *p;
5151         void *end;
5152         char *snap_name;
5153
5154         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5155         reply_buf = kmalloc(size, GFP_KERNEL);
5156         if (!reply_buf)
5157                 return ERR_PTR(-ENOMEM);
5158
5159         snapid = cpu_to_le64(snap_id);
5160         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5161                                   &rbd_dev->header_oloc, "get_snapshot_name",
5162                                   &snapid, sizeof(snapid), reply_buf, size);
5163         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5164         if (ret < 0) {
5165                 snap_name = ERR_PTR(ret);
5166                 goto out;
5167         }
5168
5169         p = reply_buf;
5170         end = reply_buf + ret;
5171         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5172         if (IS_ERR(snap_name))
5173                 goto out;
5174
5175         dout("  snap_id 0x%016llx snap_name = %s\n",
5176                 (unsigned long long)snap_id, snap_name);
5177 out:
5178         kfree(reply_buf);
5179
5180         return snap_name;
5181 }
5182
5183 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5184 {
5185         bool first_time = rbd_dev->header.object_prefix == NULL;
5186         int ret;
5187
5188         ret = rbd_dev_v2_image_size(rbd_dev);
5189         if (ret)
5190                 return ret;
5191
5192         if (first_time) {
5193                 ret = rbd_dev_v2_header_onetime(rbd_dev);
5194                 if (ret)
5195                         return ret;
5196         }
5197
5198         ret = rbd_dev_v2_snap_context(rbd_dev);
5199         if (ret && first_time) {
5200                 kfree(rbd_dev->header.object_prefix);
5201                 rbd_dev->header.object_prefix = NULL;
5202         }
5203
5204         return ret;
5205 }
5206
5207 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5208 {
5209         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5210
5211         if (rbd_dev->image_format == 1)
5212                 return rbd_dev_v1_header_info(rbd_dev);
5213
5214         return rbd_dev_v2_header_info(rbd_dev);
5215 }
5216
5217 /*
5218  * Skips over white space at *buf, and updates *buf to point to the
5219  * first found non-space character (if any). Returns the length of
5220  * the token (string of non-white space characters) found.  Note
5221  * that *buf must be terminated with '\0'.
5222  */
5223 static inline size_t next_token(const char **buf)
5224 {
5225         /*
5226         * These are the characters that produce nonzero for
5227         * isspace() in the "C" and "POSIX" locales.
5228         */
5229         const char *spaces = " \f\n\r\t\v";
5230
5231         *buf += strspn(*buf, spaces);   /* Find start of token */
5232
5233         return strcspn(*buf, spaces);   /* Return token length */
5234 }
5235
5236 /*
5237  * Finds the next token in *buf, dynamically allocates a buffer big
5238  * enough to hold a copy of it, and copies the token into the new
5239  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5240  * that a duplicate buffer is created even for a zero-length token.
5241  *
5242  * Returns a pointer to the newly-allocated duplicate, or a null
5243  * pointer if memory for the duplicate was not available.  If
5244  * the lenp argument is a non-null pointer, the length of the token
5245  * (not including the '\0') is returned in *lenp.
5246  *
5247  * If successful, the *buf pointer will be updated to point beyond
5248  * the end of the found token.
5249  *
5250  * Note: uses GFP_KERNEL for allocation.
5251  */
5252 static inline char *dup_token(const char **buf, size_t *lenp)
5253 {
5254         char *dup;
5255         size_t len;
5256
5257         len = next_token(buf);
5258         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5259         if (!dup)
5260                 return NULL;
5261         *(dup + len) = '\0';
5262         *buf += len;
5263
5264         if (lenp)
5265                 *lenp = len;
5266
5267         return dup;
5268 }
5269
5270 /*
5271  * Parse the options provided for an "rbd add" (i.e., rbd image
5272  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5273  * and the data written is passed here via a NUL-terminated buffer.
5274  * Returns 0 if successful or an error code otherwise.
5275  *
5276  * The information extracted from these options is recorded in
5277  * the other parameters which return dynamically-allocated
5278  * structures:
5279  *  ceph_opts
5280  *      The address of a pointer that will refer to a ceph options
5281  *      structure.  Caller must release the returned pointer using
5282  *      ceph_destroy_options() when it is no longer needed.
5283  *  rbd_opts
5284  *      Address of an rbd options pointer.  Fully initialized by
5285  *      this function; caller must release with kfree().
5286  *  spec
5287  *      Address of an rbd image specification pointer.  Fully
5288  *      initialized by this function based on parsed options.
5289  *      Caller must release with rbd_spec_put().
5290  *
5291  * The options passed take this form:
5292  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5293  * where:
5294  *  <mon_addrs>
5295  *      A comma-separated list of one or more monitor addresses.
5296  *      A monitor address is an ip address, optionally followed
5297  *      by a port number (separated by a colon).
5298  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5299  *  <options>
5300  *      A comma-separated list of ceph and/or rbd options.
5301  *  <pool_name>
5302  *      The name of the rados pool containing the rbd image.
5303  *  <image_name>
5304  *      The name of the image in that pool to map.
5305  *  <snap_id>
5306  *      An optional snapshot id.  If provided, the mapping will
5307  *      present data from the image at the time that snapshot was
5308  *      created.  The image head is used if no snapshot id is
5309  *      provided.  Snapshot mappings are always read-only.
5310  */
5311 static int rbd_add_parse_args(const char *buf,
5312                                 struct ceph_options **ceph_opts,
5313                                 struct rbd_options **opts,
5314                                 struct rbd_spec **rbd_spec)
5315 {
5316         size_t len;
5317         char *options;
5318         const char *mon_addrs;
5319         char *snap_name;
5320         size_t mon_addrs_size;
5321         struct parse_rbd_opts_ctx pctx = { 0 };
5322         struct ceph_options *copts;
5323         int ret;
5324
5325         /* The first four tokens are required */
5326
5327         len = next_token(&buf);
5328         if (!len) {
5329                 rbd_warn(NULL, "no monitor address(es) provided");
5330                 return -EINVAL;
5331         }
5332         mon_addrs = buf;
5333         mon_addrs_size = len + 1;
5334         buf += len;
5335
5336         ret = -EINVAL;
5337         options = dup_token(&buf, NULL);
5338         if (!options)
5339                 return -ENOMEM;
5340         if (!*options) {
5341                 rbd_warn(NULL, "no options provided");
5342                 goto out_err;
5343         }
5344
5345         pctx.spec = rbd_spec_alloc();
5346         if (!pctx.spec)
5347                 goto out_mem;
5348
5349         pctx.spec->pool_name = dup_token(&buf, NULL);
5350         if (!pctx.spec->pool_name)
5351                 goto out_mem;
5352         if (!*pctx.spec->pool_name) {
5353                 rbd_warn(NULL, "no pool name provided");
5354                 goto out_err;
5355         }
5356
5357         pctx.spec->image_name = dup_token(&buf, NULL);
5358         if (!pctx.spec->image_name)
5359                 goto out_mem;
5360         if (!*pctx.spec->image_name) {
5361                 rbd_warn(NULL, "no image name provided");
5362                 goto out_err;
5363         }
5364
5365         /*
5366          * Snapshot name is optional; default is to use "-"
5367          * (indicating the head/no snapshot).
5368          */
5369         len = next_token(&buf);
5370         if (!len) {
5371                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5372                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5373         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5374                 ret = -ENAMETOOLONG;
5375                 goto out_err;
5376         }
5377         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5378         if (!snap_name)
5379                 goto out_mem;
5380         *(snap_name + len) = '\0';
5381         pctx.spec->snap_name = snap_name;
5382
5383         /* Initialize all rbd options to the defaults */
5384
5385         pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
5386         if (!pctx.opts)
5387                 goto out_mem;
5388
5389         pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
5390         pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5391         pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
5392         pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5393         pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5394         pctx.opts->trim = RBD_TRIM_DEFAULT;
5395
5396         copts = ceph_parse_options(options, mon_addrs,
5397                                    mon_addrs + mon_addrs_size - 1,
5398                                    parse_rbd_opts_token, &pctx);
5399         if (IS_ERR(copts)) {
5400                 ret = PTR_ERR(copts);
5401                 goto out_err;
5402         }
5403         kfree(options);
5404
5405         *ceph_opts = copts;
5406         *opts = pctx.opts;
5407         *rbd_spec = pctx.spec;
5408
5409         return 0;
5410 out_mem:
5411         ret = -ENOMEM;
5412 out_err:
5413         kfree(pctx.opts);
5414         rbd_spec_put(pctx.spec);
5415         kfree(options);
5416
5417         return ret;
5418 }
5419
5420 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5421 {
5422         down_write(&rbd_dev->lock_rwsem);
5423         if (__rbd_is_lock_owner(rbd_dev))
5424                 rbd_unlock(rbd_dev);
5425         up_write(&rbd_dev->lock_rwsem);
5426 }
5427
5428 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5429 {
5430         int ret;
5431
5432         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5433                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5434                 return -EINVAL;
5435         }
5436
5437         /* FIXME: "rbd map --exclusive" should be in interruptible */
5438         down_read(&rbd_dev->lock_rwsem);
5439         ret = rbd_wait_state_locked(rbd_dev, true);
5440         up_read(&rbd_dev->lock_rwsem);
5441         if (ret) {
5442                 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5443                 return -EROFS;
5444         }
5445
5446         return 0;
5447 }
5448
5449 /*
5450  * An rbd format 2 image has a unique identifier, distinct from the
5451  * name given to it by the user.  Internally, that identifier is
5452  * what's used to specify the names of objects related to the image.
5453  *
5454  * A special "rbd id" object is used to map an rbd image name to its
5455  * id.  If that object doesn't exist, then there is no v2 rbd image
5456  * with the supplied name.
5457  *
5458  * This function will record the given rbd_dev's image_id field if
5459  * it can be determined, and in that case will return 0.  If any
5460  * errors occur a negative errno will be returned and the rbd_dev's
5461  * image_id field will be unchanged (and should be NULL).
5462  */
5463 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5464 {
5465         int ret;
5466         size_t size;
5467         CEPH_DEFINE_OID_ONSTACK(oid);
5468         void *response;
5469         char *image_id;
5470
5471         /*
5472          * When probing a parent image, the image id is already
5473          * known (and the image name likely is not).  There's no
5474          * need to fetch the image id again in this case.  We
5475          * do still need to set the image format though.
5476          */
5477         if (rbd_dev->spec->image_id) {
5478                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5479
5480                 return 0;
5481         }
5482
5483         /*
5484          * First, see if the format 2 image id file exists, and if
5485          * so, get the image's persistent id from it.
5486          */
5487         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5488                                rbd_dev->spec->image_name);
5489         if (ret)
5490                 return ret;
5491
5492         dout("rbd id object name is %s\n", oid.name);
5493
5494         /* Response will be an encoded string, which includes a length */
5495
5496         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5497         response = kzalloc(size, GFP_NOIO);
5498         if (!response) {
5499                 ret = -ENOMEM;
5500                 goto out;
5501         }
5502
5503         /* If it doesn't exist we'll assume it's a format 1 image */
5504
5505         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5506                                   "get_id", NULL, 0,
5507                                   response, RBD_IMAGE_ID_LEN_MAX);
5508         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5509         if (ret == -ENOENT) {
5510                 image_id = kstrdup("", GFP_KERNEL);
5511                 ret = image_id ? 0 : -ENOMEM;
5512                 if (!ret)
5513                         rbd_dev->image_format = 1;
5514         } else if (ret >= 0) {
5515                 void *p = response;
5516
5517                 image_id = ceph_extract_encoded_string(&p, p + ret,
5518                                                 NULL, GFP_NOIO);
5519                 ret = PTR_ERR_OR_ZERO(image_id);
5520                 if (!ret)
5521                         rbd_dev->image_format = 2;
5522         }
5523
5524         if (!ret) {
5525                 rbd_dev->spec->image_id = image_id;
5526                 dout("image_id is %s\n", image_id);
5527         }
5528 out:
5529         kfree(response);
5530         ceph_oid_destroy(&oid);
5531         return ret;
5532 }
5533
5534 /*
5535  * Undo whatever state changes are made by v1 or v2 header info
5536  * call.
5537  */
5538 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5539 {
5540         struct rbd_image_header *header;
5541
5542         rbd_dev_parent_put(rbd_dev);
5543
5544         /* Free dynamic fields from the header, then zero it out */
5545
5546         header = &rbd_dev->header;
5547         ceph_put_snap_context(header->snapc);
5548         kfree(header->snap_sizes);
5549         kfree(header->snap_names);
5550         kfree(header->object_prefix);
5551         memset(header, 0, sizeof (*header));
5552 }
5553
5554 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5555 {
5556         int ret;
5557
5558         ret = rbd_dev_v2_object_prefix(rbd_dev);
5559         if (ret)
5560                 goto out_err;
5561
5562         /*
5563          * Get the and check features for the image.  Currently the
5564          * features are assumed to never change.
5565          */
5566         ret = rbd_dev_v2_features(rbd_dev);
5567         if (ret)
5568                 goto out_err;
5569
5570         /* If the image supports fancy striping, get its parameters */
5571
5572         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5573                 ret = rbd_dev_v2_striping_info(rbd_dev);
5574                 if (ret < 0)
5575                         goto out_err;
5576         }
5577
5578         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5579                 ret = rbd_dev_v2_data_pool(rbd_dev);
5580                 if (ret)
5581                         goto out_err;
5582         }
5583
5584         rbd_init_layout(rbd_dev);
5585         return 0;
5586
5587 out_err:
5588         rbd_dev->header.features = 0;
5589         kfree(rbd_dev->header.object_prefix);
5590         rbd_dev->header.object_prefix = NULL;
5591         return ret;
5592 }
5593
5594 /*
5595  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5596  * rbd_dev_image_probe() recursion depth, which means it's also the
5597  * length of the already discovered part of the parent chain.
5598  */
5599 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5600 {
5601         struct rbd_device *parent = NULL;
5602         int ret;
5603
5604         if (!rbd_dev->parent_spec)
5605                 return 0;
5606
5607         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5608                 pr_info("parent chain is too long (%d)\n", depth);
5609                 ret = -EINVAL;
5610                 goto out_err;
5611         }
5612
5613         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5614         if (!parent) {
5615                 ret = -ENOMEM;
5616                 goto out_err;
5617         }
5618
5619         /*
5620          * Images related by parent/child relationships always share
5621          * rbd_client and spec/parent_spec, so bump their refcounts.
5622          */
5623         __rbd_get_client(rbd_dev->rbd_client);
5624         rbd_spec_get(rbd_dev->parent_spec);
5625
5626         ret = rbd_dev_image_probe(parent, depth);
5627         if (ret < 0)
5628                 goto out_err;
5629
5630         rbd_dev->parent = parent;
5631         atomic_set(&rbd_dev->parent_ref, 1);
5632         return 0;
5633
5634 out_err:
5635         rbd_dev_unparent(rbd_dev);
5636         rbd_dev_destroy(parent);
5637         return ret;
5638 }
5639
5640 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5641 {
5642         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5643         rbd_dev_mapping_clear(rbd_dev);
5644         rbd_free_disk(rbd_dev);
5645         if (!single_major)
5646                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5647 }
5648
5649 /*
5650  * rbd_dev->header_rwsem must be locked for write and will be unlocked
5651  * upon return.
5652  */
5653 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5654 {
5655         int ret;
5656
5657         /* Record our major and minor device numbers. */
5658
5659         if (!single_major) {
5660                 ret = register_blkdev(0, rbd_dev->name);
5661                 if (ret < 0)
5662                         goto err_out_unlock;
5663
5664                 rbd_dev->major = ret;
5665                 rbd_dev->minor = 0;
5666         } else {
5667                 rbd_dev->major = rbd_major;
5668                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5669         }
5670
5671         /* Set up the blkdev mapping. */
5672
5673         ret = rbd_init_disk(rbd_dev);
5674         if (ret)
5675                 goto err_out_blkdev;
5676
5677         ret = rbd_dev_mapping_set(rbd_dev);
5678         if (ret)
5679                 goto err_out_disk;
5680
5681         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5682         set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
5683
5684         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5685         if (ret)
5686                 goto err_out_mapping;
5687
5688         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5689         up_write(&rbd_dev->header_rwsem);
5690         return 0;
5691
5692 err_out_mapping:
5693         rbd_dev_mapping_clear(rbd_dev);
5694 err_out_disk:
5695         rbd_free_disk(rbd_dev);
5696 err_out_blkdev:
5697         if (!single_major)
5698                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5699 err_out_unlock:
5700         up_write(&rbd_dev->header_rwsem);
5701         return ret;
5702 }
5703
5704 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5705 {
5706         struct rbd_spec *spec = rbd_dev->spec;
5707         int ret;
5708
5709         /* Record the header object name for this rbd image. */
5710
5711         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5712         if (rbd_dev->image_format == 1)
5713                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5714                                        spec->image_name, RBD_SUFFIX);
5715         else
5716                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5717                                        RBD_HEADER_PREFIX, spec->image_id);
5718
5719         return ret;
5720 }
5721
5722 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5723 {
5724         rbd_dev_unprobe(rbd_dev);
5725         if (rbd_dev->opts)
5726                 rbd_unregister_watch(rbd_dev);
5727         rbd_dev->image_format = 0;
5728         kfree(rbd_dev->spec->image_id);
5729         rbd_dev->spec->image_id = NULL;
5730 }
5731
5732 /*
5733  * Probe for the existence of the header object for the given rbd
5734  * device.  If this image is the one being mapped (i.e., not a
5735  * parent), initiate a watch on its header object before using that
5736  * object to get detailed information about the rbd image.
5737  */
5738 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
5739 {
5740         int ret;
5741
5742         /*
5743          * Get the id from the image id object.  Unless there's an
5744          * error, rbd_dev->spec->image_id will be filled in with
5745          * a dynamically-allocated string, and rbd_dev->image_format
5746          * will be set to either 1 or 2.
5747          */
5748         ret = rbd_dev_image_id(rbd_dev);
5749         if (ret)
5750                 return ret;
5751
5752         ret = rbd_dev_header_name(rbd_dev);
5753         if (ret)
5754                 goto err_out_format;
5755
5756         if (!depth) {
5757                 ret = rbd_register_watch(rbd_dev);
5758                 if (ret) {
5759                         if (ret == -ENOENT)
5760                                 pr_info("image %s/%s%s%s does not exist\n",
5761                                         rbd_dev->spec->pool_name,
5762                                         rbd_dev->spec->pool_ns ?: "",
5763                                         rbd_dev->spec->pool_ns ? "/" : "",
5764                                         rbd_dev->spec->image_name);
5765                         goto err_out_format;
5766                 }
5767         }
5768
5769         ret = rbd_dev_header_info(rbd_dev);
5770         if (ret)
5771                 goto err_out_watch;
5772
5773         /*
5774          * If this image is the one being mapped, we have pool name and
5775          * id, image name and id, and snap name - need to fill snap id.
5776          * Otherwise this is a parent image, identified by pool, image
5777          * and snap ids - need to fill in names for those ids.
5778          */
5779         if (!depth)
5780                 ret = rbd_spec_fill_snap_id(rbd_dev);
5781         else
5782                 ret = rbd_spec_fill_names(rbd_dev);
5783         if (ret) {
5784                 if (ret == -ENOENT)
5785                         pr_info("snap %s/%s%s%s@%s does not exist\n",
5786                                 rbd_dev->spec->pool_name,
5787                                 rbd_dev->spec->pool_ns ?: "",
5788                                 rbd_dev->spec->pool_ns ? "/" : "",
5789                                 rbd_dev->spec->image_name,
5790                                 rbd_dev->spec->snap_name);
5791                 goto err_out_probe;
5792         }
5793
5794         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5795                 ret = rbd_dev_v2_parent_info(rbd_dev);
5796                 if (ret)
5797                         goto err_out_probe;
5798
5799                 /*
5800                  * Need to warn users if this image is the one being
5801                  * mapped and has a parent.
5802                  */
5803                 if (!depth && rbd_dev->parent_spec)
5804                         rbd_warn(rbd_dev,
5805                                  "WARNING: kernel layering is EXPERIMENTAL!");
5806         }
5807
5808         ret = rbd_dev_probe_parent(rbd_dev, depth);
5809         if (ret)
5810                 goto err_out_probe;
5811
5812         dout("discovered format %u image, header name is %s\n",
5813                 rbd_dev->image_format, rbd_dev->header_oid.name);
5814         return 0;
5815
5816 err_out_probe:
5817         rbd_dev_unprobe(rbd_dev);
5818 err_out_watch:
5819         if (!depth)
5820                 rbd_unregister_watch(rbd_dev);
5821 err_out_format:
5822         rbd_dev->image_format = 0;
5823         kfree(rbd_dev->spec->image_id);
5824         rbd_dev->spec->image_id = NULL;
5825         return ret;
5826 }
5827
5828 static ssize_t do_rbd_add(struct bus_type *bus,
5829                           const char *buf,
5830                           size_t count)
5831 {
5832         struct rbd_device *rbd_dev = NULL;
5833         struct ceph_options *ceph_opts = NULL;
5834         struct rbd_options *rbd_opts = NULL;
5835         struct rbd_spec *spec = NULL;
5836         struct rbd_client *rbdc;
5837         int rc;
5838
5839         if (!try_module_get(THIS_MODULE))
5840                 return -ENODEV;
5841
5842         /* parse add command */
5843         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5844         if (rc < 0)
5845                 goto out;
5846
5847         rbdc = rbd_get_client(ceph_opts);
5848         if (IS_ERR(rbdc)) {
5849                 rc = PTR_ERR(rbdc);
5850                 goto err_out_args;
5851         }
5852
5853         /* pick the pool */
5854         rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
5855         if (rc < 0) {
5856                 if (rc == -ENOENT)
5857                         pr_info("pool %s does not exist\n", spec->pool_name);
5858                 goto err_out_client;
5859         }
5860         spec->pool_id = (u64)rc;
5861
5862         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
5863         if (!rbd_dev) {
5864                 rc = -ENOMEM;
5865                 goto err_out_client;
5866         }
5867         rbdc = NULL;            /* rbd_dev now owns this */
5868         spec = NULL;            /* rbd_dev now owns this */
5869         rbd_opts = NULL;        /* rbd_dev now owns this */
5870
5871         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
5872         if (!rbd_dev->config_info) {
5873                 rc = -ENOMEM;
5874                 goto err_out_rbd_dev;
5875         }
5876
5877         down_write(&rbd_dev->header_rwsem);
5878         rc = rbd_dev_image_probe(rbd_dev, 0);
5879         if (rc < 0) {
5880                 up_write(&rbd_dev->header_rwsem);
5881                 goto err_out_rbd_dev;
5882         }
5883
5884         /* If we are mapping a snapshot it must be marked read-only */
5885         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5886                 rbd_dev->opts->read_only = true;
5887
5888         rc = rbd_dev_device_setup(rbd_dev);
5889         if (rc)
5890                 goto err_out_image_probe;
5891
5892         if (rbd_dev->opts->exclusive) {
5893                 rc = rbd_add_acquire_lock(rbd_dev);
5894                 if (rc)
5895                         goto err_out_device_setup;
5896         }
5897
5898         /* Everything's ready.  Announce the disk to the world. */
5899
5900         rc = device_add(&rbd_dev->dev);
5901         if (rc)
5902                 goto err_out_image_lock;
5903
5904         add_disk(rbd_dev->disk);
5905         /* see rbd_init_disk() */
5906         blk_put_queue(rbd_dev->disk->queue);
5907
5908         spin_lock(&rbd_dev_list_lock);
5909         list_add_tail(&rbd_dev->node, &rbd_dev_list);
5910         spin_unlock(&rbd_dev_list_lock);
5911
5912         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5913                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5914                 rbd_dev->header.features);
5915         rc = count;
5916 out:
5917         module_put(THIS_MODULE);
5918         return rc;
5919
5920 err_out_image_lock:
5921         rbd_dev_image_unlock(rbd_dev);
5922 err_out_device_setup:
5923         rbd_dev_device_release(rbd_dev);
5924 err_out_image_probe:
5925         rbd_dev_image_release(rbd_dev);
5926 err_out_rbd_dev:
5927         rbd_dev_destroy(rbd_dev);
5928 err_out_client:
5929         rbd_put_client(rbdc);
5930 err_out_args:
5931         rbd_spec_put(spec);
5932         kfree(rbd_opts);
5933         goto out;
5934 }
5935
5936 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
5937 {
5938         if (single_major)
5939                 return -EINVAL;
5940
5941         return do_rbd_add(bus, buf, count);
5942 }
5943
5944 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
5945                                       size_t count)
5946 {
5947         return do_rbd_add(bus, buf, count);
5948 }
5949
5950 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5951 {
5952         while (rbd_dev->parent) {
5953                 struct rbd_device *first = rbd_dev;
5954                 struct rbd_device *second = first->parent;
5955                 struct rbd_device *third;
5956
5957                 /*
5958                  * Follow to the parent with no grandparent and
5959                  * remove it.
5960                  */
5961                 while (second && (third = second->parent)) {
5962                         first = second;
5963                         second = third;
5964                 }
5965                 rbd_assert(second);
5966                 rbd_dev_image_release(second);
5967                 rbd_dev_destroy(second);
5968                 first->parent = NULL;
5969                 first->parent_overlap = 0;
5970
5971                 rbd_assert(first->parent_spec);
5972                 rbd_spec_put(first->parent_spec);
5973                 first->parent_spec = NULL;
5974         }
5975 }
5976
5977 static ssize_t do_rbd_remove(struct bus_type *bus,
5978                              const char *buf,
5979                              size_t count)
5980 {
5981         struct rbd_device *rbd_dev = NULL;
5982         struct list_head *tmp;
5983         int dev_id;
5984         char opt_buf[6];
5985         bool force = false;
5986         int ret;
5987
5988         dev_id = -1;
5989         opt_buf[0] = '\0';
5990         sscanf(buf, "%d %5s", &dev_id, opt_buf);
5991         if (dev_id < 0) {
5992                 pr_err("dev_id out of range\n");
5993                 return -EINVAL;
5994         }
5995         if (opt_buf[0] != '\0') {
5996                 if (!strcmp(opt_buf, "force")) {
5997                         force = true;
5998                 } else {
5999                         pr_err("bad remove option at '%s'\n", opt_buf);
6000                         return -EINVAL;
6001                 }
6002         }
6003
6004         ret = -ENOENT;
6005         spin_lock(&rbd_dev_list_lock);
6006         list_for_each(tmp, &rbd_dev_list) {
6007                 rbd_dev = list_entry(tmp, struct rbd_device, node);
6008                 if (rbd_dev->dev_id == dev_id) {
6009                         ret = 0;
6010                         break;
6011                 }
6012         }
6013         if (!ret) {
6014                 spin_lock_irq(&rbd_dev->lock);
6015                 if (rbd_dev->open_count && !force)
6016                         ret = -EBUSY;
6017                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6018                                           &rbd_dev->flags))
6019                         ret = -EINPROGRESS;
6020                 spin_unlock_irq(&rbd_dev->lock);
6021         }
6022         spin_unlock(&rbd_dev_list_lock);
6023         if (ret)
6024                 return ret;
6025
6026         if (force) {
6027                 /*
6028                  * Prevent new IO from being queued and wait for existing
6029                  * IO to complete/fail.
6030                  */
6031                 blk_mq_freeze_queue(rbd_dev->disk->queue);
6032                 blk_set_queue_dying(rbd_dev->disk->queue);
6033         }
6034
6035         del_gendisk(rbd_dev->disk);
6036         spin_lock(&rbd_dev_list_lock);
6037         list_del_init(&rbd_dev->node);
6038         spin_unlock(&rbd_dev_list_lock);
6039         device_del(&rbd_dev->dev);
6040
6041         rbd_dev_image_unlock(rbd_dev);
6042         rbd_dev_device_release(rbd_dev);
6043         rbd_dev_image_release(rbd_dev);
6044         rbd_dev_destroy(rbd_dev);
6045         return count;
6046 }
6047
6048 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
6049 {
6050         if (single_major)
6051                 return -EINVAL;
6052
6053         return do_rbd_remove(bus, buf, count);
6054 }
6055
6056 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
6057                                          size_t count)
6058 {
6059         return do_rbd_remove(bus, buf, count);
6060 }
6061
6062 /*
6063  * create control files in sysfs
6064  * /sys/bus/rbd/...
6065  */
6066 static int __init rbd_sysfs_init(void)
6067 {
6068         int ret;
6069
6070         ret = device_register(&rbd_root_dev);
6071         if (ret < 0)
6072                 return ret;
6073
6074         ret = bus_register(&rbd_bus_type);
6075         if (ret < 0)
6076                 device_unregister(&rbd_root_dev);
6077
6078         return ret;
6079 }
6080
6081 static void __exit rbd_sysfs_cleanup(void)
6082 {
6083         bus_unregister(&rbd_bus_type);
6084         device_unregister(&rbd_root_dev);
6085 }
6086
6087 static int __init rbd_slab_init(void)
6088 {
6089         rbd_assert(!rbd_img_request_cache);
6090         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6091         if (!rbd_img_request_cache)
6092                 return -ENOMEM;
6093
6094         rbd_assert(!rbd_obj_request_cache);
6095         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6096         if (!rbd_obj_request_cache)
6097                 goto out_err;
6098
6099         return 0;
6100
6101 out_err:
6102         kmem_cache_destroy(rbd_img_request_cache);
6103         rbd_img_request_cache = NULL;
6104         return -ENOMEM;
6105 }
6106
6107 static void rbd_slab_exit(void)
6108 {
6109         rbd_assert(rbd_obj_request_cache);
6110         kmem_cache_destroy(rbd_obj_request_cache);
6111         rbd_obj_request_cache = NULL;
6112
6113         rbd_assert(rbd_img_request_cache);
6114         kmem_cache_destroy(rbd_img_request_cache);
6115         rbd_img_request_cache = NULL;
6116 }
6117
6118 static int __init rbd_init(void)
6119 {
6120         int rc;
6121
6122         if (!libceph_compatible(NULL)) {
6123                 rbd_warn(NULL, "libceph incompatibility (quitting)");
6124                 return -EINVAL;
6125         }
6126
6127         rc = rbd_slab_init();
6128         if (rc)
6129                 return rc;
6130
6131         /*
6132          * The number of active work items is limited by the number of
6133          * rbd devices * queue depth, so leave @max_active at default.
6134          */
6135         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6136         if (!rbd_wq) {
6137                 rc = -ENOMEM;
6138                 goto err_out_slab;
6139         }
6140
6141         if (single_major) {
6142                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6143                 if (rbd_major < 0) {
6144                         rc = rbd_major;
6145                         goto err_out_wq;
6146                 }
6147         }
6148
6149         rc = rbd_sysfs_init();
6150         if (rc)
6151                 goto err_out_blkdev;
6152
6153         if (single_major)
6154                 pr_info("loaded (major %d)\n", rbd_major);
6155         else
6156                 pr_info("loaded\n");
6157
6158         return 0;
6159
6160 err_out_blkdev:
6161         if (single_major)
6162                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6163 err_out_wq:
6164         destroy_workqueue(rbd_wq);
6165 err_out_slab:
6166         rbd_slab_exit();
6167         return rc;
6168 }
6169
6170 static void __exit rbd_exit(void)
6171 {
6172         ida_destroy(&rbd_dev_id_ida);
6173         rbd_sysfs_cleanup();
6174         if (single_major)
6175                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6176         destroy_workqueue(rbd_wq);
6177         rbd_slab_exit();
6178 }
6179
6180 module_init(rbd_init);
6181 module_exit(rbd_exit);
6182
6183 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6184 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6185 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6186 /* following authorship retained from original osdblk.c */
6187 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6188
6189 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6190 MODULE_LICENSE("GPL");