Merge tag 'sound-4.15-rc1' of ssh://gitolite.kernel.org/pub/scm/linux/kernel/git...
[sfrench/cifs-2.6.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
38
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
48
49 #include "rbd_types.h"
50
51 #define RBD_DEBUG       /* Activate rbd_assert() calls */
52
53 /*
54  * The basic unit of block I/O is a sector.  It is interpreted in a
55  * number of contexts in Linux (blk, bio, genhd), but the default is
56  * universally 512 bytes.  These symbols are just slightly more
57  * meaningful than the bare numbers they represent.
58  */
59 #define SECTOR_SHIFT    9
60 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
61
62 /*
63  * Increment the given counter and return its updated value.
64  * If the counter is already 0 it will not be incremented.
65  * If the counter is already at its maximum value returns
66  * -EINVAL without updating it.
67  */
68 static int atomic_inc_return_safe(atomic_t *v)
69 {
70         unsigned int counter;
71
72         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73         if (counter <= (unsigned int)INT_MAX)
74                 return (int)counter;
75
76         atomic_dec(v);
77
78         return -EINVAL;
79 }
80
81 /* Decrement the counter.  Return the resulting value, or -EINVAL */
82 static int atomic_dec_return_safe(atomic_t *v)
83 {
84         int counter;
85
86         counter = atomic_dec_return(v);
87         if (counter >= 0)
88                 return counter;
89
90         atomic_inc(v);
91
92         return -EINVAL;
93 }
94
95 #define RBD_DRV_NAME "rbd"
96
97 #define RBD_MINORS_PER_MAJOR            256
98 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
99
100 #define RBD_MAX_PARENT_CHAIN_LEN        16
101
102 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
103 #define RBD_MAX_SNAP_NAME_LEN   \
104                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
106 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
107
108 #define RBD_SNAP_HEAD_NAME      "-"
109
110 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
111
112 /* This allows a single page to hold an image name sent by OSD */
113 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
114 #define RBD_IMAGE_ID_LEN_MAX    64
115
116 #define RBD_OBJ_PREFIX_LEN_MAX  64
117
118 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
119 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
120
121 /* Feature bits */
122
123 #define RBD_FEATURE_LAYERING            (1ULL<<0)
124 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
125 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
126 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
127
128 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
129                                  RBD_FEATURE_STRIPINGV2 |       \
130                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
131                                  RBD_FEATURE_DATA_POOL)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
136
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN            32
142
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147         /* These six fields never change for a given rbd image */
148         char *object_prefix;
149         __u8 obj_order;
150         u64 stripe_unit;
151         u64 stripe_count;
152         s64 data_pool_id;
153         u64 features;           /* Might be changeable someday? */
154
155         /* The remaining fields need to be updated occasionally */
156         u64 image_size;
157         struct ceph_snap_context *snapc;
158         char *snap_names;       /* format 1 only */
159         u64 *snap_sizes;        /* format 1 only */
160 };
161
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188         u64             pool_id;
189         const char      *pool_name;
190
191         const char      *image_id;
192         const char      *image_name;
193
194         u64             snap_id;
195         const char      *snap_name;
196
197         struct kref     kref;
198 };
199
200 /*
201  * an instance of the client.  multiple devices may share an rbd client.
202  */
203 struct rbd_client {
204         struct ceph_client      *client;
205         struct kref             kref;
206         struct list_head        node;
207 };
208
209 struct rbd_img_request;
210 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
211
212 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
213
214 struct rbd_obj_request;
215 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
216
217 enum obj_request_type {
218         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
219 };
220
221 enum obj_operation_type {
222         OBJ_OP_WRITE,
223         OBJ_OP_READ,
224         OBJ_OP_DISCARD,
225 };
226
227 enum obj_req_flags {
228         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
229         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
230         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
231         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
232 };
233
234 struct rbd_obj_request {
235         u64                     object_no;
236         u64                     offset;         /* object start byte */
237         u64                     length;         /* bytes from offset */
238         unsigned long           flags;
239
240         /*
241          * An object request associated with an image will have its
242          * img_data flag set; a standalone object request will not.
243          *
244          * A standalone object request will have which == BAD_WHICH
245          * and a null obj_request pointer.
246          *
247          * An object request initiated in support of a layered image
248          * object (to check for its existence before a write) will
249          * have which == BAD_WHICH and a non-null obj_request pointer.
250          *
251          * Finally, an object request for rbd image data will have
252          * which != BAD_WHICH, and will have a non-null img_request
253          * pointer.  The value of which will be in the range
254          * 0..(img_request->obj_request_count-1).
255          */
256         union {
257                 struct rbd_obj_request  *obj_request;   /* STAT op */
258                 struct {
259                         struct rbd_img_request  *img_request;
260                         u64                     img_offset;
261                         /* links for img_request->obj_requests list */
262                         struct list_head        links;
263                 };
264         };
265         u32                     which;          /* posn image request list */
266
267         enum obj_request_type   type;
268         union {
269                 struct bio      *bio_list;
270                 struct {
271                         struct page     **pages;
272                         u32             page_count;
273                 };
274         };
275         struct page             **copyup_pages;
276         u32                     copyup_page_count;
277
278         struct ceph_osd_request *osd_req;
279
280         u64                     xferred;        /* bytes transferred */
281         int                     result;
282
283         rbd_obj_callback_t      callback;
284         struct completion       completion;
285
286         struct kref             kref;
287 };
288
289 enum img_req_flags {
290         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
291         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
292         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
293         IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
294 };
295
296 struct rbd_img_request {
297         struct rbd_device       *rbd_dev;
298         u64                     offset; /* starting image byte offset */
299         u64                     length; /* byte count from offset */
300         unsigned long           flags;
301         union {
302                 u64                     snap_id;        /* for reads */
303                 struct ceph_snap_context *snapc;        /* for writes */
304         };
305         union {
306                 struct request          *rq;            /* block request */
307                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
308         };
309         struct page             **copyup_pages;
310         u32                     copyup_page_count;
311         spinlock_t              completion_lock;/* protects next_completion */
312         u32                     next_completion;
313         rbd_img_callback_t      callback;
314         u64                     xferred;/* aggregate bytes transferred */
315         int                     result; /* first nonzero obj_request result */
316
317         u32                     obj_request_count;
318         struct list_head        obj_requests;   /* rbd_obj_request structs */
319
320         struct kref             kref;
321 };
322
323 #define for_each_obj_request(ireq, oreq) \
324         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
325 #define for_each_obj_request_from(ireq, oreq) \
326         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
327 #define for_each_obj_request_safe(ireq, oreq, n) \
328         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
329
330 enum rbd_watch_state {
331         RBD_WATCH_STATE_UNREGISTERED,
332         RBD_WATCH_STATE_REGISTERED,
333         RBD_WATCH_STATE_ERROR,
334 };
335
336 enum rbd_lock_state {
337         RBD_LOCK_STATE_UNLOCKED,
338         RBD_LOCK_STATE_LOCKED,
339         RBD_LOCK_STATE_RELEASING,
340 };
341
342 /* WatchNotify::ClientId */
343 struct rbd_client_id {
344         u64 gid;
345         u64 handle;
346 };
347
348 struct rbd_mapping {
349         u64                     size;
350         u64                     features;
351         bool                    read_only;
352 };
353
354 /*
355  * a single device
356  */
357 struct rbd_device {
358         int                     dev_id;         /* blkdev unique id */
359
360         int                     major;          /* blkdev assigned major */
361         int                     minor;
362         struct gendisk          *disk;          /* blkdev's gendisk and rq */
363
364         u32                     image_format;   /* Either 1 or 2 */
365         struct rbd_client       *rbd_client;
366
367         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
368
369         spinlock_t              lock;           /* queue, flags, open_count */
370
371         struct rbd_image_header header;
372         unsigned long           flags;          /* possibly lock protected */
373         struct rbd_spec         *spec;
374         struct rbd_options      *opts;
375         char                    *config_info;   /* add{,_single_major} string */
376
377         struct ceph_object_id   header_oid;
378         struct ceph_object_locator header_oloc;
379
380         struct ceph_file_layout layout;         /* used for all rbd requests */
381
382         struct mutex            watch_mutex;
383         enum rbd_watch_state    watch_state;
384         struct ceph_osd_linger_request *watch_handle;
385         u64                     watch_cookie;
386         struct delayed_work     watch_dwork;
387
388         struct rw_semaphore     lock_rwsem;
389         enum rbd_lock_state     lock_state;
390         char                    lock_cookie[32];
391         struct rbd_client_id    owner_cid;
392         struct work_struct      acquired_lock_work;
393         struct work_struct      released_lock_work;
394         struct delayed_work     lock_dwork;
395         struct work_struct      unlock_work;
396         wait_queue_head_t       lock_waitq;
397
398         struct workqueue_struct *task_wq;
399
400         struct rbd_spec         *parent_spec;
401         u64                     parent_overlap;
402         atomic_t                parent_ref;
403         struct rbd_device       *parent;
404
405         /* Block layer tags. */
406         struct blk_mq_tag_set   tag_set;
407
408         /* protects updating the header */
409         struct rw_semaphore     header_rwsem;
410
411         struct rbd_mapping      mapping;
412
413         struct list_head        node;
414
415         /* sysfs related */
416         struct device           dev;
417         unsigned long           open_count;     /* protected by lock */
418 };
419
420 /*
421  * Flag bits for rbd_dev->flags:
422  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
423  *   by rbd_dev->lock
424  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
425  */
426 enum rbd_dev_flags {
427         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
428         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
429         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
430 };
431
432 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
433
434 static LIST_HEAD(rbd_dev_list);    /* devices */
435 static DEFINE_SPINLOCK(rbd_dev_list_lock);
436
437 static LIST_HEAD(rbd_client_list);              /* clients */
438 static DEFINE_SPINLOCK(rbd_client_list_lock);
439
440 /* Slab caches for frequently-allocated structures */
441
442 static struct kmem_cache        *rbd_img_request_cache;
443 static struct kmem_cache        *rbd_obj_request_cache;
444
445 static struct bio_set           *rbd_bio_clone;
446
447 static int rbd_major;
448 static DEFINE_IDA(rbd_dev_id_ida);
449
450 static struct workqueue_struct *rbd_wq;
451
452 /*
453  * Default to false for now, as single-major requires >= 0.75 version of
454  * userspace rbd utility.
455  */
456 static bool single_major = false;
457 module_param(single_major, bool, S_IRUGO);
458 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
459
460 static int rbd_img_request_submit(struct rbd_img_request *img_request);
461
462 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
463                        size_t count);
464 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
465                           size_t count);
466 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
467                                     size_t count);
468 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
469                                        size_t count);
470 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
471 static void rbd_spec_put(struct rbd_spec *spec);
472
473 static int rbd_dev_id_to_minor(int dev_id)
474 {
475         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
476 }
477
478 static int minor_to_rbd_dev_id(int minor)
479 {
480         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
481 }
482
483 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
484 {
485         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
486                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
487 }
488
489 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
490 {
491         bool is_lock_owner;
492
493         down_read(&rbd_dev->lock_rwsem);
494         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
495         up_read(&rbd_dev->lock_rwsem);
496         return is_lock_owner;
497 }
498
499 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
500 {
501         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
502 }
503
504 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
505 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
506 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
507 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
508 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
509
510 static struct attribute *rbd_bus_attrs[] = {
511         &bus_attr_add.attr,
512         &bus_attr_remove.attr,
513         &bus_attr_add_single_major.attr,
514         &bus_attr_remove_single_major.attr,
515         &bus_attr_supported_features.attr,
516         NULL,
517 };
518
519 static umode_t rbd_bus_is_visible(struct kobject *kobj,
520                                   struct attribute *attr, int index)
521 {
522         if (!single_major &&
523             (attr == &bus_attr_add_single_major.attr ||
524              attr == &bus_attr_remove_single_major.attr))
525                 return 0;
526
527         return attr->mode;
528 }
529
530 static const struct attribute_group rbd_bus_group = {
531         .attrs = rbd_bus_attrs,
532         .is_visible = rbd_bus_is_visible,
533 };
534 __ATTRIBUTE_GROUPS(rbd_bus);
535
536 static struct bus_type rbd_bus_type = {
537         .name           = "rbd",
538         .bus_groups     = rbd_bus_groups,
539 };
540
541 static void rbd_root_dev_release(struct device *dev)
542 {
543 }
544
545 static struct device rbd_root_dev = {
546         .init_name =    "rbd",
547         .release =      rbd_root_dev_release,
548 };
549
550 static __printf(2, 3)
551 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
552 {
553         struct va_format vaf;
554         va_list args;
555
556         va_start(args, fmt);
557         vaf.fmt = fmt;
558         vaf.va = &args;
559
560         if (!rbd_dev)
561                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
562         else if (rbd_dev->disk)
563                 printk(KERN_WARNING "%s: %s: %pV\n",
564                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
565         else if (rbd_dev->spec && rbd_dev->spec->image_name)
566                 printk(KERN_WARNING "%s: image %s: %pV\n",
567                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
568         else if (rbd_dev->spec && rbd_dev->spec->image_id)
569                 printk(KERN_WARNING "%s: id %s: %pV\n",
570                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
571         else    /* punt */
572                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
573                         RBD_DRV_NAME, rbd_dev, &vaf);
574         va_end(args);
575 }
576
577 #ifdef RBD_DEBUG
578 #define rbd_assert(expr)                                                \
579                 if (unlikely(!(expr))) {                                \
580                         printk(KERN_ERR "\nAssertion failure in %s() "  \
581                                                 "at line %d:\n\n"       \
582                                         "\trbd_assert(%s);\n\n",        \
583                                         __func__, __LINE__, #expr);     \
584                         BUG();                                          \
585                 }
586 #else /* !RBD_DEBUG */
587 #  define rbd_assert(expr)      ((void) 0)
588 #endif /* !RBD_DEBUG */
589
590 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
591 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
592 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
593 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
594
595 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
596 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
597 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
598 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
599 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
600                                         u64 snap_id);
601 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
602                                 u8 *order, u64 *snap_size);
603 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
604                 u64 *snap_features);
605
606 static int rbd_open(struct block_device *bdev, fmode_t mode)
607 {
608         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
609         bool removing = false;
610
611         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
612                 return -EROFS;
613
614         spin_lock_irq(&rbd_dev->lock);
615         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
616                 removing = true;
617         else
618                 rbd_dev->open_count++;
619         spin_unlock_irq(&rbd_dev->lock);
620         if (removing)
621                 return -ENOENT;
622
623         (void) get_device(&rbd_dev->dev);
624
625         return 0;
626 }
627
628 static void rbd_release(struct gendisk *disk, fmode_t mode)
629 {
630         struct rbd_device *rbd_dev = disk->private_data;
631         unsigned long open_count_before;
632
633         spin_lock_irq(&rbd_dev->lock);
634         open_count_before = rbd_dev->open_count--;
635         spin_unlock_irq(&rbd_dev->lock);
636         rbd_assert(open_count_before > 0);
637
638         put_device(&rbd_dev->dev);
639 }
640
641 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
642 {
643         int ret = 0;
644         int val;
645         bool ro;
646         bool ro_changed = false;
647
648         /* get_user() may sleep, so call it before taking rbd_dev->lock */
649         if (get_user(val, (int __user *)(arg)))
650                 return -EFAULT;
651
652         ro = val ? true : false;
653         /* Snapshot doesn't allow to write*/
654         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
655                 return -EROFS;
656
657         spin_lock_irq(&rbd_dev->lock);
658         /* prevent others open this device */
659         if (rbd_dev->open_count > 1) {
660                 ret = -EBUSY;
661                 goto out;
662         }
663
664         if (rbd_dev->mapping.read_only != ro) {
665                 rbd_dev->mapping.read_only = ro;
666                 ro_changed = true;
667         }
668
669 out:
670         spin_unlock_irq(&rbd_dev->lock);
671         /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
672         if (ret == 0 && ro_changed)
673                 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
674
675         return ret;
676 }
677
678 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
679                         unsigned int cmd, unsigned long arg)
680 {
681         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
682         int ret = 0;
683
684         switch (cmd) {
685         case BLKROSET:
686                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
687                 break;
688         default:
689                 ret = -ENOTTY;
690         }
691
692         return ret;
693 }
694
695 #ifdef CONFIG_COMPAT
696 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
697                                 unsigned int cmd, unsigned long arg)
698 {
699         return rbd_ioctl(bdev, mode, cmd, arg);
700 }
701 #endif /* CONFIG_COMPAT */
702
703 static const struct block_device_operations rbd_bd_ops = {
704         .owner                  = THIS_MODULE,
705         .open                   = rbd_open,
706         .release                = rbd_release,
707         .ioctl                  = rbd_ioctl,
708 #ifdef CONFIG_COMPAT
709         .compat_ioctl           = rbd_compat_ioctl,
710 #endif
711 };
712
713 /*
714  * Initialize an rbd client instance.  Success or not, this function
715  * consumes ceph_opts.  Caller holds client_mutex.
716  */
717 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
718 {
719         struct rbd_client *rbdc;
720         int ret = -ENOMEM;
721
722         dout("%s:\n", __func__);
723         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
724         if (!rbdc)
725                 goto out_opt;
726
727         kref_init(&rbdc->kref);
728         INIT_LIST_HEAD(&rbdc->node);
729
730         rbdc->client = ceph_create_client(ceph_opts, rbdc);
731         if (IS_ERR(rbdc->client))
732                 goto out_rbdc;
733         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
734
735         ret = ceph_open_session(rbdc->client);
736         if (ret < 0)
737                 goto out_client;
738
739         spin_lock(&rbd_client_list_lock);
740         list_add_tail(&rbdc->node, &rbd_client_list);
741         spin_unlock(&rbd_client_list_lock);
742
743         dout("%s: rbdc %p\n", __func__, rbdc);
744
745         return rbdc;
746 out_client:
747         ceph_destroy_client(rbdc->client);
748 out_rbdc:
749         kfree(rbdc);
750 out_opt:
751         if (ceph_opts)
752                 ceph_destroy_options(ceph_opts);
753         dout("%s: error %d\n", __func__, ret);
754
755         return ERR_PTR(ret);
756 }
757
758 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
759 {
760         kref_get(&rbdc->kref);
761
762         return rbdc;
763 }
764
765 /*
766  * Find a ceph client with specific addr and configuration.  If
767  * found, bump its reference count.
768  */
769 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
770 {
771         struct rbd_client *client_node;
772         bool found = false;
773
774         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
775                 return NULL;
776
777         spin_lock(&rbd_client_list_lock);
778         list_for_each_entry(client_node, &rbd_client_list, node) {
779                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
780                         __rbd_get_client(client_node);
781
782                         found = true;
783                         break;
784                 }
785         }
786         spin_unlock(&rbd_client_list_lock);
787
788         return found ? client_node : NULL;
789 }
790
791 /*
792  * (Per device) rbd map options
793  */
794 enum {
795         Opt_queue_depth,
796         Opt_last_int,
797         /* int args above */
798         Opt_last_string,
799         /* string args above */
800         Opt_read_only,
801         Opt_read_write,
802         Opt_lock_on_read,
803         Opt_exclusive,
804         Opt_err
805 };
806
807 static match_table_t rbd_opts_tokens = {
808         {Opt_queue_depth, "queue_depth=%d"},
809         /* int args above */
810         /* string args above */
811         {Opt_read_only, "read_only"},
812         {Opt_read_only, "ro"},          /* Alternate spelling */
813         {Opt_read_write, "read_write"},
814         {Opt_read_write, "rw"},         /* Alternate spelling */
815         {Opt_lock_on_read, "lock_on_read"},
816         {Opt_exclusive, "exclusive"},
817         {Opt_err, NULL}
818 };
819
820 struct rbd_options {
821         int     queue_depth;
822         bool    read_only;
823         bool    lock_on_read;
824         bool    exclusive;
825 };
826
827 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
828 #define RBD_READ_ONLY_DEFAULT   false
829 #define RBD_LOCK_ON_READ_DEFAULT false
830 #define RBD_EXCLUSIVE_DEFAULT   false
831
832 static int parse_rbd_opts_token(char *c, void *private)
833 {
834         struct rbd_options *rbd_opts = private;
835         substring_t argstr[MAX_OPT_ARGS];
836         int token, intval, ret;
837
838         token = match_token(c, rbd_opts_tokens, argstr);
839         if (token < Opt_last_int) {
840                 ret = match_int(&argstr[0], &intval);
841                 if (ret < 0) {
842                         pr_err("bad mount option arg (not int) at '%s'\n", c);
843                         return ret;
844                 }
845                 dout("got int token %d val %d\n", token, intval);
846         } else if (token > Opt_last_int && token < Opt_last_string) {
847                 dout("got string token %d val %s\n", token, argstr[0].from);
848         } else {
849                 dout("got token %d\n", token);
850         }
851
852         switch (token) {
853         case Opt_queue_depth:
854                 if (intval < 1) {
855                         pr_err("queue_depth out of range\n");
856                         return -EINVAL;
857                 }
858                 rbd_opts->queue_depth = intval;
859                 break;
860         case Opt_read_only:
861                 rbd_opts->read_only = true;
862                 break;
863         case Opt_read_write:
864                 rbd_opts->read_only = false;
865                 break;
866         case Opt_lock_on_read:
867                 rbd_opts->lock_on_read = true;
868                 break;
869         case Opt_exclusive:
870                 rbd_opts->exclusive = true;
871                 break;
872         default:
873                 /* libceph prints "bad option" msg */
874                 return -EINVAL;
875         }
876
877         return 0;
878 }
879
880 static char* obj_op_name(enum obj_operation_type op_type)
881 {
882         switch (op_type) {
883         case OBJ_OP_READ:
884                 return "read";
885         case OBJ_OP_WRITE:
886                 return "write";
887         case OBJ_OP_DISCARD:
888                 return "discard";
889         default:
890                 return "???";
891         }
892 }
893
894 /*
895  * Get a ceph client with specific addr and configuration, if one does
896  * not exist create it.  Either way, ceph_opts is consumed by this
897  * function.
898  */
899 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
900 {
901         struct rbd_client *rbdc;
902
903         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
904         rbdc = rbd_client_find(ceph_opts);
905         if (rbdc)       /* using an existing client */
906                 ceph_destroy_options(ceph_opts);
907         else
908                 rbdc = rbd_client_create(ceph_opts);
909         mutex_unlock(&client_mutex);
910
911         return rbdc;
912 }
913
914 /*
915  * Destroy ceph client
916  *
917  * Caller must hold rbd_client_list_lock.
918  */
919 static void rbd_client_release(struct kref *kref)
920 {
921         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
922
923         dout("%s: rbdc %p\n", __func__, rbdc);
924         spin_lock(&rbd_client_list_lock);
925         list_del(&rbdc->node);
926         spin_unlock(&rbd_client_list_lock);
927
928         ceph_destroy_client(rbdc->client);
929         kfree(rbdc);
930 }
931
932 /*
933  * Drop reference to ceph client node. If it's not referenced anymore, release
934  * it.
935  */
936 static void rbd_put_client(struct rbd_client *rbdc)
937 {
938         if (rbdc)
939                 kref_put(&rbdc->kref, rbd_client_release);
940 }
941
942 static bool rbd_image_format_valid(u32 image_format)
943 {
944         return image_format == 1 || image_format == 2;
945 }
946
947 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
948 {
949         size_t size;
950         u32 snap_count;
951
952         /* The header has to start with the magic rbd header text */
953         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
954                 return false;
955
956         /* The bio layer requires at least sector-sized I/O */
957
958         if (ondisk->options.order < SECTOR_SHIFT)
959                 return false;
960
961         /* If we use u64 in a few spots we may be able to loosen this */
962
963         if (ondisk->options.order > 8 * sizeof (int) - 1)
964                 return false;
965
966         /*
967          * The size of a snapshot header has to fit in a size_t, and
968          * that limits the number of snapshots.
969          */
970         snap_count = le32_to_cpu(ondisk->snap_count);
971         size = SIZE_MAX - sizeof (struct ceph_snap_context);
972         if (snap_count > size / sizeof (__le64))
973                 return false;
974
975         /*
976          * Not only that, but the size of the entire the snapshot
977          * header must also be representable in a size_t.
978          */
979         size -= snap_count * sizeof (__le64);
980         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
981                 return false;
982
983         return true;
984 }
985
986 /*
987  * returns the size of an object in the image
988  */
989 static u32 rbd_obj_bytes(struct rbd_image_header *header)
990 {
991         return 1U << header->obj_order;
992 }
993
994 static void rbd_init_layout(struct rbd_device *rbd_dev)
995 {
996         if (rbd_dev->header.stripe_unit == 0 ||
997             rbd_dev->header.stripe_count == 0) {
998                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
999                 rbd_dev->header.stripe_count = 1;
1000         }
1001
1002         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1003         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1004         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1005         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1006                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1007         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1008 }
1009
1010 /*
1011  * Fill an rbd image header with information from the given format 1
1012  * on-disk header.
1013  */
1014 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1015                                  struct rbd_image_header_ondisk *ondisk)
1016 {
1017         struct rbd_image_header *header = &rbd_dev->header;
1018         bool first_time = header->object_prefix == NULL;
1019         struct ceph_snap_context *snapc;
1020         char *object_prefix = NULL;
1021         char *snap_names = NULL;
1022         u64 *snap_sizes = NULL;
1023         u32 snap_count;
1024         int ret = -ENOMEM;
1025         u32 i;
1026
1027         /* Allocate this now to avoid having to handle failure below */
1028
1029         if (first_time) {
1030                 object_prefix = kstrndup(ondisk->object_prefix,
1031                                          sizeof(ondisk->object_prefix),
1032                                          GFP_KERNEL);
1033                 if (!object_prefix)
1034                         return -ENOMEM;
1035         }
1036
1037         /* Allocate the snapshot context and fill it in */
1038
1039         snap_count = le32_to_cpu(ondisk->snap_count);
1040         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1041         if (!snapc)
1042                 goto out_err;
1043         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1044         if (snap_count) {
1045                 struct rbd_image_snap_ondisk *snaps;
1046                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1047
1048                 /* We'll keep a copy of the snapshot names... */
1049
1050                 if (snap_names_len > (u64)SIZE_MAX)
1051                         goto out_2big;
1052                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1053                 if (!snap_names)
1054                         goto out_err;
1055
1056                 /* ...as well as the array of their sizes. */
1057                 snap_sizes = kmalloc_array(snap_count,
1058                                            sizeof(*header->snap_sizes),
1059                                            GFP_KERNEL);
1060                 if (!snap_sizes)
1061                         goto out_err;
1062
1063                 /*
1064                  * Copy the names, and fill in each snapshot's id
1065                  * and size.
1066                  *
1067                  * Note that rbd_dev_v1_header_info() guarantees the
1068                  * ondisk buffer we're working with has
1069                  * snap_names_len bytes beyond the end of the
1070                  * snapshot id array, this memcpy() is safe.
1071                  */
1072                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1073                 snaps = ondisk->snaps;
1074                 for (i = 0; i < snap_count; i++) {
1075                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1076                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1077                 }
1078         }
1079
1080         /* We won't fail any more, fill in the header */
1081
1082         if (first_time) {
1083                 header->object_prefix = object_prefix;
1084                 header->obj_order = ondisk->options.order;
1085                 rbd_init_layout(rbd_dev);
1086         } else {
1087                 ceph_put_snap_context(header->snapc);
1088                 kfree(header->snap_names);
1089                 kfree(header->snap_sizes);
1090         }
1091
1092         /* The remaining fields always get updated (when we refresh) */
1093
1094         header->image_size = le64_to_cpu(ondisk->image_size);
1095         header->snapc = snapc;
1096         header->snap_names = snap_names;
1097         header->snap_sizes = snap_sizes;
1098
1099         return 0;
1100 out_2big:
1101         ret = -EIO;
1102 out_err:
1103         kfree(snap_sizes);
1104         kfree(snap_names);
1105         ceph_put_snap_context(snapc);
1106         kfree(object_prefix);
1107
1108         return ret;
1109 }
1110
1111 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1112 {
1113         const char *snap_name;
1114
1115         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1116
1117         /* Skip over names until we find the one we are looking for */
1118
1119         snap_name = rbd_dev->header.snap_names;
1120         while (which--)
1121                 snap_name += strlen(snap_name) + 1;
1122
1123         return kstrdup(snap_name, GFP_KERNEL);
1124 }
1125
1126 /*
1127  * Snapshot id comparison function for use with qsort()/bsearch().
1128  * Note that result is for snapshots in *descending* order.
1129  */
1130 static int snapid_compare_reverse(const void *s1, const void *s2)
1131 {
1132         u64 snap_id1 = *(u64 *)s1;
1133         u64 snap_id2 = *(u64 *)s2;
1134
1135         if (snap_id1 < snap_id2)
1136                 return 1;
1137         return snap_id1 == snap_id2 ? 0 : -1;
1138 }
1139
1140 /*
1141  * Search a snapshot context to see if the given snapshot id is
1142  * present.
1143  *
1144  * Returns the position of the snapshot id in the array if it's found,
1145  * or BAD_SNAP_INDEX otherwise.
1146  *
1147  * Note: The snapshot array is in kept sorted (by the osd) in
1148  * reverse order, highest snapshot id first.
1149  */
1150 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1151 {
1152         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1153         u64 *found;
1154
1155         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1156                                 sizeof (snap_id), snapid_compare_reverse);
1157
1158         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1159 }
1160
1161 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1162                                         u64 snap_id)
1163 {
1164         u32 which;
1165         const char *snap_name;
1166
1167         which = rbd_dev_snap_index(rbd_dev, snap_id);
1168         if (which == BAD_SNAP_INDEX)
1169                 return ERR_PTR(-ENOENT);
1170
1171         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1172         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1173 }
1174
1175 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1176 {
1177         if (snap_id == CEPH_NOSNAP)
1178                 return RBD_SNAP_HEAD_NAME;
1179
1180         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1181         if (rbd_dev->image_format == 1)
1182                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1183
1184         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1185 }
1186
1187 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1188                                 u64 *snap_size)
1189 {
1190         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1191         if (snap_id == CEPH_NOSNAP) {
1192                 *snap_size = rbd_dev->header.image_size;
1193         } else if (rbd_dev->image_format == 1) {
1194                 u32 which;
1195
1196                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1197                 if (which == BAD_SNAP_INDEX)
1198                         return -ENOENT;
1199
1200                 *snap_size = rbd_dev->header.snap_sizes[which];
1201         } else {
1202                 u64 size = 0;
1203                 int ret;
1204
1205                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1206                 if (ret)
1207                         return ret;
1208
1209                 *snap_size = size;
1210         }
1211         return 0;
1212 }
1213
1214 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1215                         u64 *snap_features)
1216 {
1217         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1218         if (snap_id == CEPH_NOSNAP) {
1219                 *snap_features = rbd_dev->header.features;
1220         } else if (rbd_dev->image_format == 1) {
1221                 *snap_features = 0;     /* No features for format 1 */
1222         } else {
1223                 u64 features = 0;
1224                 int ret;
1225
1226                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1227                 if (ret)
1228                         return ret;
1229
1230                 *snap_features = features;
1231         }
1232         return 0;
1233 }
1234
1235 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1236 {
1237         u64 snap_id = rbd_dev->spec->snap_id;
1238         u64 size = 0;
1239         u64 features = 0;
1240         int ret;
1241
1242         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1243         if (ret)
1244                 return ret;
1245         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1246         if (ret)
1247                 return ret;
1248
1249         rbd_dev->mapping.size = size;
1250         rbd_dev->mapping.features = features;
1251
1252         return 0;
1253 }
1254
1255 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1256 {
1257         rbd_dev->mapping.size = 0;
1258         rbd_dev->mapping.features = 0;
1259 }
1260
1261 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1262 {
1263         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1264
1265         return offset & (segment_size - 1);
1266 }
1267
1268 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1269                                 u64 offset, u64 length)
1270 {
1271         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1272
1273         offset &= segment_size - 1;
1274
1275         rbd_assert(length <= U64_MAX - offset);
1276         if (offset + length > segment_size)
1277                 length = segment_size - offset;
1278
1279         return length;
1280 }
1281
1282 /*
1283  * bio helpers
1284  */
1285
1286 static void bio_chain_put(struct bio *chain)
1287 {
1288         struct bio *tmp;
1289
1290         while (chain) {
1291                 tmp = chain;
1292                 chain = chain->bi_next;
1293                 bio_put(tmp);
1294         }
1295 }
1296
1297 /*
1298  * zeros a bio chain, starting at specific offset
1299  */
1300 static void zero_bio_chain(struct bio *chain, int start_ofs)
1301 {
1302         struct bio_vec bv;
1303         struct bvec_iter iter;
1304         unsigned long flags;
1305         void *buf;
1306         int pos = 0;
1307
1308         while (chain) {
1309                 bio_for_each_segment(bv, chain, iter) {
1310                         if (pos + bv.bv_len > start_ofs) {
1311                                 int remainder = max(start_ofs - pos, 0);
1312                                 buf = bvec_kmap_irq(&bv, &flags);
1313                                 memset(buf + remainder, 0,
1314                                        bv.bv_len - remainder);
1315                                 flush_dcache_page(bv.bv_page);
1316                                 bvec_kunmap_irq(buf, &flags);
1317                         }
1318                         pos += bv.bv_len;
1319                 }
1320
1321                 chain = chain->bi_next;
1322         }
1323 }
1324
1325 /*
1326  * similar to zero_bio_chain(), zeros data defined by a page array,
1327  * starting at the given byte offset from the start of the array and
1328  * continuing up to the given end offset.  The pages array is
1329  * assumed to be big enough to hold all bytes up to the end.
1330  */
1331 static void zero_pages(struct page **pages, u64 offset, u64 end)
1332 {
1333         struct page **page = &pages[offset >> PAGE_SHIFT];
1334
1335         rbd_assert(end > offset);
1336         rbd_assert(end - offset <= (u64)SIZE_MAX);
1337         while (offset < end) {
1338                 size_t page_offset;
1339                 size_t length;
1340                 unsigned long flags;
1341                 void *kaddr;
1342
1343                 page_offset = offset & ~PAGE_MASK;
1344                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1345                 local_irq_save(flags);
1346                 kaddr = kmap_atomic(*page);
1347                 memset(kaddr + page_offset, 0, length);
1348                 flush_dcache_page(*page);
1349                 kunmap_atomic(kaddr);
1350                 local_irq_restore(flags);
1351
1352                 offset += length;
1353                 page++;
1354         }
1355 }
1356
1357 /*
1358  * Clone a portion of a bio, starting at the given byte offset
1359  * and continuing for the number of bytes indicated.
1360  */
1361 static struct bio *bio_clone_range(struct bio *bio_src,
1362                                         unsigned int offset,
1363                                         unsigned int len,
1364                                         gfp_t gfpmask)
1365 {
1366         struct bio *bio;
1367
1368         bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1369         if (!bio)
1370                 return NULL;    /* ENOMEM */
1371
1372         bio_advance(bio, offset);
1373         bio->bi_iter.bi_size = len;
1374
1375         return bio;
1376 }
1377
1378 /*
1379  * Clone a portion of a bio chain, starting at the given byte offset
1380  * into the first bio in the source chain and continuing for the
1381  * number of bytes indicated.  The result is another bio chain of
1382  * exactly the given length, or a null pointer on error.
1383  *
1384  * The bio_src and offset parameters are both in-out.  On entry they
1385  * refer to the first source bio and the offset into that bio where
1386  * the start of data to be cloned is located.
1387  *
1388  * On return, bio_src is updated to refer to the bio in the source
1389  * chain that contains first un-cloned byte, and *offset will
1390  * contain the offset of that byte within that bio.
1391  */
1392 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1393                                         unsigned int *offset,
1394                                         unsigned int len,
1395                                         gfp_t gfpmask)
1396 {
1397         struct bio *bi = *bio_src;
1398         unsigned int off = *offset;
1399         struct bio *chain = NULL;
1400         struct bio **end;
1401
1402         /* Build up a chain of clone bios up to the limit */
1403
1404         if (!bi || off >= bi->bi_iter.bi_size || !len)
1405                 return NULL;            /* Nothing to clone */
1406
1407         end = &chain;
1408         while (len) {
1409                 unsigned int bi_size;
1410                 struct bio *bio;
1411
1412                 if (!bi) {
1413                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1414                         goto out_err;   /* EINVAL; ran out of bio's */
1415                 }
1416                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1417                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1418                 if (!bio)
1419                         goto out_err;   /* ENOMEM */
1420
1421                 *end = bio;
1422                 end = &bio->bi_next;
1423
1424                 off += bi_size;
1425                 if (off == bi->bi_iter.bi_size) {
1426                         bi = bi->bi_next;
1427                         off = 0;
1428                 }
1429                 len -= bi_size;
1430         }
1431         *bio_src = bi;
1432         *offset = off;
1433
1434         return chain;
1435 out_err:
1436         bio_chain_put(chain);
1437
1438         return NULL;
1439 }
1440
1441 /*
1442  * The default/initial value for all object request flags is 0.  For
1443  * each flag, once its value is set to 1 it is never reset to 0
1444  * again.
1445  */
1446 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1447 {
1448         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1449                 struct rbd_device *rbd_dev;
1450
1451                 rbd_dev = obj_request->img_request->rbd_dev;
1452                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1453                         obj_request);
1454         }
1455 }
1456
1457 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1458 {
1459         smp_mb();
1460         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1461 }
1462
1463 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1464 {
1465         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1466                 struct rbd_device *rbd_dev = NULL;
1467
1468                 if (obj_request_img_data_test(obj_request))
1469                         rbd_dev = obj_request->img_request->rbd_dev;
1470                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1471                         obj_request);
1472         }
1473 }
1474
1475 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1476 {
1477         smp_mb();
1478         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1479 }
1480
1481 /*
1482  * This sets the KNOWN flag after (possibly) setting the EXISTS
1483  * flag.  The latter is set based on the "exists" value provided.
1484  *
1485  * Note that for our purposes once an object exists it never goes
1486  * away again.  It's possible that the response from two existence
1487  * checks are separated by the creation of the target object, and
1488  * the first ("doesn't exist") response arrives *after* the second
1489  * ("does exist").  In that case we ignore the second one.
1490  */
1491 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1492                                 bool exists)
1493 {
1494         if (exists)
1495                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1496         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1497         smp_mb();
1498 }
1499
1500 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1501 {
1502         smp_mb();
1503         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1504 }
1505
1506 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1507 {
1508         smp_mb();
1509         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1510 }
1511
1512 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1513 {
1514         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1515
1516         return obj_request->img_offset <
1517             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1518 }
1519
1520 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1521 {
1522         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1523                 kref_read(&obj_request->kref));
1524         kref_get(&obj_request->kref);
1525 }
1526
1527 static void rbd_obj_request_destroy(struct kref *kref);
1528 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1529 {
1530         rbd_assert(obj_request != NULL);
1531         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1532                 kref_read(&obj_request->kref));
1533         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1534 }
1535
1536 static void rbd_img_request_get(struct rbd_img_request *img_request)
1537 {
1538         dout("%s: img %p (was %d)\n", __func__, img_request,
1539              kref_read(&img_request->kref));
1540         kref_get(&img_request->kref);
1541 }
1542
1543 static bool img_request_child_test(struct rbd_img_request *img_request);
1544 static void rbd_parent_request_destroy(struct kref *kref);
1545 static void rbd_img_request_destroy(struct kref *kref);
1546 static void rbd_img_request_put(struct rbd_img_request *img_request)
1547 {
1548         rbd_assert(img_request != NULL);
1549         dout("%s: img %p (was %d)\n", __func__, img_request,
1550                 kref_read(&img_request->kref));
1551         if (img_request_child_test(img_request))
1552                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1553         else
1554                 kref_put(&img_request->kref, rbd_img_request_destroy);
1555 }
1556
1557 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1558                                         struct rbd_obj_request *obj_request)
1559 {
1560         rbd_assert(obj_request->img_request == NULL);
1561
1562         /* Image request now owns object's original reference */
1563         obj_request->img_request = img_request;
1564         obj_request->which = img_request->obj_request_count;
1565         rbd_assert(!obj_request_img_data_test(obj_request));
1566         obj_request_img_data_set(obj_request);
1567         rbd_assert(obj_request->which != BAD_WHICH);
1568         img_request->obj_request_count++;
1569         list_add_tail(&obj_request->links, &img_request->obj_requests);
1570         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1571                 obj_request->which);
1572 }
1573
1574 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1575                                         struct rbd_obj_request *obj_request)
1576 {
1577         rbd_assert(obj_request->which != BAD_WHICH);
1578
1579         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1580                 obj_request->which);
1581         list_del(&obj_request->links);
1582         rbd_assert(img_request->obj_request_count > 0);
1583         img_request->obj_request_count--;
1584         rbd_assert(obj_request->which == img_request->obj_request_count);
1585         obj_request->which = BAD_WHICH;
1586         rbd_assert(obj_request_img_data_test(obj_request));
1587         rbd_assert(obj_request->img_request == img_request);
1588         obj_request->img_request = NULL;
1589         obj_request->callback = NULL;
1590         rbd_obj_request_put(obj_request);
1591 }
1592
1593 static bool obj_request_type_valid(enum obj_request_type type)
1594 {
1595         switch (type) {
1596         case OBJ_REQUEST_NODATA:
1597         case OBJ_REQUEST_BIO:
1598         case OBJ_REQUEST_PAGES:
1599                 return true;
1600         default:
1601                 return false;
1602         }
1603 }
1604
1605 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1606
1607 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1608 {
1609         struct ceph_osd_request *osd_req = obj_request->osd_req;
1610
1611         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1612              obj_request, obj_request->object_no, obj_request->offset,
1613              obj_request->length, osd_req);
1614         if (obj_request_img_data_test(obj_request)) {
1615                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1616                 rbd_img_request_get(obj_request->img_request);
1617         }
1618         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1619 }
1620
1621 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1622 {
1623
1624         dout("%s: img %p\n", __func__, img_request);
1625
1626         /*
1627          * If no error occurred, compute the aggregate transfer
1628          * count for the image request.  We could instead use
1629          * atomic64_cmpxchg() to update it as each object request
1630          * completes; not clear which way is better off hand.
1631          */
1632         if (!img_request->result) {
1633                 struct rbd_obj_request *obj_request;
1634                 u64 xferred = 0;
1635
1636                 for_each_obj_request(img_request, obj_request)
1637                         xferred += obj_request->xferred;
1638                 img_request->xferred = xferred;
1639         }
1640
1641         if (img_request->callback)
1642                 img_request->callback(img_request);
1643         else
1644                 rbd_img_request_put(img_request);
1645 }
1646
1647 /*
1648  * The default/initial value for all image request flags is 0.  Each
1649  * is conditionally set to 1 at image request initialization time
1650  * and currently never change thereafter.
1651  */
1652 static void img_request_write_set(struct rbd_img_request *img_request)
1653 {
1654         set_bit(IMG_REQ_WRITE, &img_request->flags);
1655         smp_mb();
1656 }
1657
1658 static bool img_request_write_test(struct rbd_img_request *img_request)
1659 {
1660         smp_mb();
1661         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1662 }
1663
1664 /*
1665  * Set the discard flag when the img_request is an discard request
1666  */
1667 static void img_request_discard_set(struct rbd_img_request *img_request)
1668 {
1669         set_bit(IMG_REQ_DISCARD, &img_request->flags);
1670         smp_mb();
1671 }
1672
1673 static bool img_request_discard_test(struct rbd_img_request *img_request)
1674 {
1675         smp_mb();
1676         return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1677 }
1678
1679 static void img_request_child_set(struct rbd_img_request *img_request)
1680 {
1681         set_bit(IMG_REQ_CHILD, &img_request->flags);
1682         smp_mb();
1683 }
1684
1685 static void img_request_child_clear(struct rbd_img_request *img_request)
1686 {
1687         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1688         smp_mb();
1689 }
1690
1691 static bool img_request_child_test(struct rbd_img_request *img_request)
1692 {
1693         smp_mb();
1694         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1695 }
1696
1697 static void img_request_layered_set(struct rbd_img_request *img_request)
1698 {
1699         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1700         smp_mb();
1701 }
1702
1703 static void img_request_layered_clear(struct rbd_img_request *img_request)
1704 {
1705         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1706         smp_mb();
1707 }
1708
1709 static bool img_request_layered_test(struct rbd_img_request *img_request)
1710 {
1711         smp_mb();
1712         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1713 }
1714
1715 static enum obj_operation_type
1716 rbd_img_request_op_type(struct rbd_img_request *img_request)
1717 {
1718         if (img_request_write_test(img_request))
1719                 return OBJ_OP_WRITE;
1720         else if (img_request_discard_test(img_request))
1721                 return OBJ_OP_DISCARD;
1722         else
1723                 return OBJ_OP_READ;
1724 }
1725
1726 static void
1727 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1728 {
1729         u64 xferred = obj_request->xferred;
1730         u64 length = obj_request->length;
1731
1732         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1733                 obj_request, obj_request->img_request, obj_request->result,
1734                 xferred, length);
1735         /*
1736          * ENOENT means a hole in the image.  We zero-fill the entire
1737          * length of the request.  A short read also implies zero-fill
1738          * to the end of the request.  An error requires the whole
1739          * length of the request to be reported finished with an error
1740          * to the block layer.  In each case we update the xferred
1741          * count to indicate the whole request was satisfied.
1742          */
1743         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1744         if (obj_request->result == -ENOENT) {
1745                 if (obj_request->type == OBJ_REQUEST_BIO)
1746                         zero_bio_chain(obj_request->bio_list, 0);
1747                 else
1748                         zero_pages(obj_request->pages, 0, length);
1749                 obj_request->result = 0;
1750         } else if (xferred < length && !obj_request->result) {
1751                 if (obj_request->type == OBJ_REQUEST_BIO)
1752                         zero_bio_chain(obj_request->bio_list, xferred);
1753                 else
1754                         zero_pages(obj_request->pages, xferred, length);
1755         }
1756         obj_request->xferred = length;
1757         obj_request_done_set(obj_request);
1758 }
1759
1760 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1761 {
1762         dout("%s: obj %p cb %p\n", __func__, obj_request,
1763                 obj_request->callback);
1764         if (obj_request->callback)
1765                 obj_request->callback(obj_request);
1766         else
1767                 complete_all(&obj_request->completion);
1768 }
1769
1770 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1771 {
1772         obj_request->result = err;
1773         obj_request->xferred = 0;
1774         /*
1775          * kludge - mirror rbd_obj_request_submit() to match a put in
1776          * rbd_img_obj_callback()
1777          */
1778         if (obj_request_img_data_test(obj_request)) {
1779                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1780                 rbd_img_request_get(obj_request->img_request);
1781         }
1782         obj_request_done_set(obj_request);
1783         rbd_obj_request_complete(obj_request);
1784 }
1785
1786 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1787 {
1788         struct rbd_img_request *img_request = NULL;
1789         struct rbd_device *rbd_dev = NULL;
1790         bool layered = false;
1791
1792         if (obj_request_img_data_test(obj_request)) {
1793                 img_request = obj_request->img_request;
1794                 layered = img_request && img_request_layered_test(img_request);
1795                 rbd_dev = img_request->rbd_dev;
1796         }
1797
1798         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1799                 obj_request, img_request, obj_request->result,
1800                 obj_request->xferred, obj_request->length);
1801         if (layered && obj_request->result == -ENOENT &&
1802                         obj_request->img_offset < rbd_dev->parent_overlap)
1803                 rbd_img_parent_read(obj_request);
1804         else if (img_request)
1805                 rbd_img_obj_request_read_callback(obj_request);
1806         else
1807                 obj_request_done_set(obj_request);
1808 }
1809
1810 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1811 {
1812         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1813                 obj_request->result, obj_request->length);
1814         /*
1815          * There is no such thing as a successful short write.  Set
1816          * it to our originally-requested length.
1817          */
1818         obj_request->xferred = obj_request->length;
1819         obj_request_done_set(obj_request);
1820 }
1821
1822 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1823 {
1824         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1825                 obj_request->result, obj_request->length);
1826         /*
1827          * There is no such thing as a successful short discard.  Set
1828          * it to our originally-requested length.
1829          */
1830         obj_request->xferred = obj_request->length;
1831         /* discarding a non-existent object is not a problem */
1832         if (obj_request->result == -ENOENT)
1833                 obj_request->result = 0;
1834         obj_request_done_set(obj_request);
1835 }
1836
1837 /*
1838  * For a simple stat call there's nothing to do.  We'll do more if
1839  * this is part of a write sequence for a layered image.
1840  */
1841 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1842 {
1843         dout("%s: obj %p\n", __func__, obj_request);
1844         obj_request_done_set(obj_request);
1845 }
1846
1847 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1848 {
1849         dout("%s: obj %p\n", __func__, obj_request);
1850
1851         if (obj_request_img_data_test(obj_request))
1852                 rbd_osd_copyup_callback(obj_request);
1853         else
1854                 obj_request_done_set(obj_request);
1855 }
1856
1857 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1858 {
1859         struct rbd_obj_request *obj_request = osd_req->r_priv;
1860         u16 opcode;
1861
1862         dout("%s: osd_req %p\n", __func__, osd_req);
1863         rbd_assert(osd_req == obj_request->osd_req);
1864         if (obj_request_img_data_test(obj_request)) {
1865                 rbd_assert(obj_request->img_request);
1866                 rbd_assert(obj_request->which != BAD_WHICH);
1867         } else {
1868                 rbd_assert(obj_request->which == BAD_WHICH);
1869         }
1870
1871         if (osd_req->r_result < 0)
1872                 obj_request->result = osd_req->r_result;
1873
1874         /*
1875          * We support a 64-bit length, but ultimately it has to be
1876          * passed to the block layer, which just supports a 32-bit
1877          * length field.
1878          */
1879         obj_request->xferred = osd_req->r_ops[0].outdata_len;
1880         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1881
1882         opcode = osd_req->r_ops[0].op;
1883         switch (opcode) {
1884         case CEPH_OSD_OP_READ:
1885                 rbd_osd_read_callback(obj_request);
1886                 break;
1887         case CEPH_OSD_OP_SETALLOCHINT:
1888                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1889                            osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1890                 /* fall through */
1891         case CEPH_OSD_OP_WRITE:
1892         case CEPH_OSD_OP_WRITEFULL:
1893                 rbd_osd_write_callback(obj_request);
1894                 break;
1895         case CEPH_OSD_OP_STAT:
1896                 rbd_osd_stat_callback(obj_request);
1897                 break;
1898         case CEPH_OSD_OP_DELETE:
1899         case CEPH_OSD_OP_TRUNCATE:
1900         case CEPH_OSD_OP_ZERO:
1901                 rbd_osd_discard_callback(obj_request);
1902                 break;
1903         case CEPH_OSD_OP_CALL:
1904                 rbd_osd_call_callback(obj_request);
1905                 break;
1906         default:
1907                 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1908                          obj_request->object_no, opcode);
1909                 break;
1910         }
1911
1912         if (obj_request_done_test(obj_request))
1913                 rbd_obj_request_complete(obj_request);
1914 }
1915
1916 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1917 {
1918         struct ceph_osd_request *osd_req = obj_request->osd_req;
1919
1920         rbd_assert(obj_request_img_data_test(obj_request));
1921         osd_req->r_snapid = obj_request->img_request->snap_id;
1922 }
1923
1924 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1925 {
1926         struct ceph_osd_request *osd_req = obj_request->osd_req;
1927
1928         ktime_get_real_ts(&osd_req->r_mtime);
1929         osd_req->r_data_offset = obj_request->offset;
1930 }
1931
1932 static struct ceph_osd_request *
1933 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1934                      struct ceph_snap_context *snapc,
1935                      int num_ops, unsigned int flags,
1936                      struct rbd_obj_request *obj_request)
1937 {
1938         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1939         struct ceph_osd_request *req;
1940         const char *name_format = rbd_dev->image_format == 1 ?
1941                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1942
1943         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1944         if (!req)
1945                 return NULL;
1946
1947         req->r_flags = flags;
1948         req->r_callback = rbd_osd_req_callback;
1949         req->r_priv = obj_request;
1950
1951         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1952         if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1953                         rbd_dev->header.object_prefix, obj_request->object_no))
1954                 goto err_req;
1955
1956         if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1957                 goto err_req;
1958
1959         return req;
1960
1961 err_req:
1962         ceph_osdc_put_request(req);
1963         return NULL;
1964 }
1965
1966 /*
1967  * Create an osd request.  A read request has one osd op (read).
1968  * A write request has either one (watch) or two (hint+write) osd ops.
1969  * (All rbd data writes are prefixed with an allocation hint op, but
1970  * technically osd watch is a write request, hence this distinction.)
1971  */
1972 static struct ceph_osd_request *rbd_osd_req_create(
1973                                         struct rbd_device *rbd_dev,
1974                                         enum obj_operation_type op_type,
1975                                         unsigned int num_ops,
1976                                         struct rbd_obj_request *obj_request)
1977 {
1978         struct ceph_snap_context *snapc = NULL;
1979
1980         if (obj_request_img_data_test(obj_request) &&
1981                 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1982                 struct rbd_img_request *img_request = obj_request->img_request;
1983                 if (op_type == OBJ_OP_WRITE) {
1984                         rbd_assert(img_request_write_test(img_request));
1985                 } else {
1986                         rbd_assert(img_request_discard_test(img_request));
1987                 }
1988                 snapc = img_request->snapc;
1989         }
1990
1991         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1992
1993         return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1994             (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1995             CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1996 }
1997
1998 /*
1999  * Create a copyup osd request based on the information in the object
2000  * request supplied.  A copyup request has two or three osd ops, a
2001  * copyup method call, potentially a hint op, and a write or truncate
2002  * or zero op.
2003  */
2004 static struct ceph_osd_request *
2005 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2006 {
2007         struct rbd_img_request *img_request;
2008         int num_osd_ops = 3;
2009
2010         rbd_assert(obj_request_img_data_test(obj_request));
2011         img_request = obj_request->img_request;
2012         rbd_assert(img_request);
2013         rbd_assert(img_request_write_test(img_request) ||
2014                         img_request_discard_test(img_request));
2015
2016         if (img_request_discard_test(img_request))
2017                 num_osd_ops = 2;
2018
2019         return __rbd_osd_req_create(img_request->rbd_dev,
2020                                     img_request->snapc, num_osd_ops,
2021                                     CEPH_OSD_FLAG_WRITE, obj_request);
2022 }
2023
2024 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2025 {
2026         ceph_osdc_put_request(osd_req);
2027 }
2028
2029 static struct rbd_obj_request *
2030 rbd_obj_request_create(enum obj_request_type type)
2031 {
2032         struct rbd_obj_request *obj_request;
2033
2034         rbd_assert(obj_request_type_valid(type));
2035
2036         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2037         if (!obj_request)
2038                 return NULL;
2039
2040         obj_request->which = BAD_WHICH;
2041         obj_request->type = type;
2042         INIT_LIST_HEAD(&obj_request->links);
2043         init_completion(&obj_request->completion);
2044         kref_init(&obj_request->kref);
2045
2046         dout("%s %p\n", __func__, obj_request);
2047         return obj_request;
2048 }
2049
2050 static void rbd_obj_request_destroy(struct kref *kref)
2051 {
2052         struct rbd_obj_request *obj_request;
2053
2054         obj_request = container_of(kref, struct rbd_obj_request, kref);
2055
2056         dout("%s: obj %p\n", __func__, obj_request);
2057
2058         rbd_assert(obj_request->img_request == NULL);
2059         rbd_assert(obj_request->which == BAD_WHICH);
2060
2061         if (obj_request->osd_req)
2062                 rbd_osd_req_destroy(obj_request->osd_req);
2063
2064         rbd_assert(obj_request_type_valid(obj_request->type));
2065         switch (obj_request->type) {
2066         case OBJ_REQUEST_NODATA:
2067                 break;          /* Nothing to do */
2068         case OBJ_REQUEST_BIO:
2069                 if (obj_request->bio_list)
2070                         bio_chain_put(obj_request->bio_list);
2071                 break;
2072         case OBJ_REQUEST_PAGES:
2073                 /* img_data requests don't own their page array */
2074                 if (obj_request->pages &&
2075                     !obj_request_img_data_test(obj_request))
2076                         ceph_release_page_vector(obj_request->pages,
2077                                                 obj_request->page_count);
2078                 break;
2079         }
2080
2081         kmem_cache_free(rbd_obj_request_cache, obj_request);
2082 }
2083
2084 /* It's OK to call this for a device with no parent */
2085
2086 static void rbd_spec_put(struct rbd_spec *spec);
2087 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2088 {
2089         rbd_dev_remove_parent(rbd_dev);
2090         rbd_spec_put(rbd_dev->parent_spec);
2091         rbd_dev->parent_spec = NULL;
2092         rbd_dev->parent_overlap = 0;
2093 }
2094
2095 /*
2096  * Parent image reference counting is used to determine when an
2097  * image's parent fields can be safely torn down--after there are no
2098  * more in-flight requests to the parent image.  When the last
2099  * reference is dropped, cleaning them up is safe.
2100  */
2101 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2102 {
2103         int counter;
2104
2105         if (!rbd_dev->parent_spec)
2106                 return;
2107
2108         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2109         if (counter > 0)
2110                 return;
2111
2112         /* Last reference; clean up parent data structures */
2113
2114         if (!counter)
2115                 rbd_dev_unparent(rbd_dev);
2116         else
2117                 rbd_warn(rbd_dev, "parent reference underflow");
2118 }
2119
2120 /*
2121  * If an image has a non-zero parent overlap, get a reference to its
2122  * parent.
2123  *
2124  * Returns true if the rbd device has a parent with a non-zero
2125  * overlap and a reference for it was successfully taken, or
2126  * false otherwise.
2127  */
2128 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2129 {
2130         int counter = 0;
2131
2132         if (!rbd_dev->parent_spec)
2133                 return false;
2134
2135         down_read(&rbd_dev->header_rwsem);
2136         if (rbd_dev->parent_overlap)
2137                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2138         up_read(&rbd_dev->header_rwsem);
2139
2140         if (counter < 0)
2141                 rbd_warn(rbd_dev, "parent reference overflow");
2142
2143         return counter > 0;
2144 }
2145
2146 /*
2147  * Caller is responsible for filling in the list of object requests
2148  * that comprises the image request, and the Linux request pointer
2149  * (if there is one).
2150  */
2151 static struct rbd_img_request *rbd_img_request_create(
2152                                         struct rbd_device *rbd_dev,
2153                                         u64 offset, u64 length,
2154                                         enum obj_operation_type op_type,
2155                                         struct ceph_snap_context *snapc)
2156 {
2157         struct rbd_img_request *img_request;
2158
2159         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2160         if (!img_request)
2161                 return NULL;
2162
2163         img_request->rq = NULL;
2164         img_request->rbd_dev = rbd_dev;
2165         img_request->offset = offset;
2166         img_request->length = length;
2167         img_request->flags = 0;
2168         if (op_type == OBJ_OP_DISCARD) {
2169                 img_request_discard_set(img_request);
2170                 img_request->snapc = snapc;
2171         } else if (op_type == OBJ_OP_WRITE) {
2172                 img_request_write_set(img_request);
2173                 img_request->snapc = snapc;
2174         } else {
2175                 img_request->snap_id = rbd_dev->spec->snap_id;
2176         }
2177         if (rbd_dev_parent_get(rbd_dev))
2178                 img_request_layered_set(img_request);
2179         spin_lock_init(&img_request->completion_lock);
2180         img_request->next_completion = 0;
2181         img_request->callback = NULL;
2182         img_request->result = 0;
2183         img_request->obj_request_count = 0;
2184         INIT_LIST_HEAD(&img_request->obj_requests);
2185         kref_init(&img_request->kref);
2186
2187         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2188                 obj_op_name(op_type), offset, length, img_request);
2189
2190         return img_request;
2191 }
2192
2193 static void rbd_img_request_destroy(struct kref *kref)
2194 {
2195         struct rbd_img_request *img_request;
2196         struct rbd_obj_request *obj_request;
2197         struct rbd_obj_request *next_obj_request;
2198
2199         img_request = container_of(kref, struct rbd_img_request, kref);
2200
2201         dout("%s: img %p\n", __func__, img_request);
2202
2203         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2204                 rbd_img_obj_request_del(img_request, obj_request);
2205         rbd_assert(img_request->obj_request_count == 0);
2206
2207         if (img_request_layered_test(img_request)) {
2208                 img_request_layered_clear(img_request);
2209                 rbd_dev_parent_put(img_request->rbd_dev);
2210         }
2211
2212         if (img_request_write_test(img_request) ||
2213                 img_request_discard_test(img_request))
2214                 ceph_put_snap_context(img_request->snapc);
2215
2216         kmem_cache_free(rbd_img_request_cache, img_request);
2217 }
2218
2219 static struct rbd_img_request *rbd_parent_request_create(
2220                                         struct rbd_obj_request *obj_request,
2221                                         u64 img_offset, u64 length)
2222 {
2223         struct rbd_img_request *parent_request;
2224         struct rbd_device *rbd_dev;
2225
2226         rbd_assert(obj_request->img_request);
2227         rbd_dev = obj_request->img_request->rbd_dev;
2228
2229         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2230                                                 length, OBJ_OP_READ, NULL);
2231         if (!parent_request)
2232                 return NULL;
2233
2234         img_request_child_set(parent_request);
2235         rbd_obj_request_get(obj_request);
2236         parent_request->obj_request = obj_request;
2237
2238         return parent_request;
2239 }
2240
2241 static void rbd_parent_request_destroy(struct kref *kref)
2242 {
2243         struct rbd_img_request *parent_request;
2244         struct rbd_obj_request *orig_request;
2245
2246         parent_request = container_of(kref, struct rbd_img_request, kref);
2247         orig_request = parent_request->obj_request;
2248
2249         parent_request->obj_request = NULL;
2250         rbd_obj_request_put(orig_request);
2251         img_request_child_clear(parent_request);
2252
2253         rbd_img_request_destroy(kref);
2254 }
2255
2256 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2257 {
2258         struct rbd_img_request *img_request;
2259         unsigned int xferred;
2260         int result;
2261         bool more;
2262
2263         rbd_assert(obj_request_img_data_test(obj_request));
2264         img_request = obj_request->img_request;
2265
2266         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2267         xferred = (unsigned int)obj_request->xferred;
2268         result = obj_request->result;
2269         if (result) {
2270                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2271                 enum obj_operation_type op_type;
2272
2273                 if (img_request_discard_test(img_request))
2274                         op_type = OBJ_OP_DISCARD;
2275                 else if (img_request_write_test(img_request))
2276                         op_type = OBJ_OP_WRITE;
2277                 else
2278                         op_type = OBJ_OP_READ;
2279
2280                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2281                         obj_op_name(op_type), obj_request->length,
2282                         obj_request->img_offset, obj_request->offset);
2283                 rbd_warn(rbd_dev, "  result %d xferred %x",
2284                         result, xferred);
2285                 if (!img_request->result)
2286                         img_request->result = result;
2287                 /*
2288                  * Need to end I/O on the entire obj_request worth of
2289                  * bytes in case of error.
2290                  */
2291                 xferred = obj_request->length;
2292         }
2293
2294         if (img_request_child_test(img_request)) {
2295                 rbd_assert(img_request->obj_request != NULL);
2296                 more = obj_request->which < img_request->obj_request_count - 1;
2297         } else {
2298                 blk_status_t status = errno_to_blk_status(result);
2299
2300                 rbd_assert(img_request->rq != NULL);
2301
2302                 more = blk_update_request(img_request->rq, status, xferred);
2303                 if (!more)
2304                         __blk_mq_end_request(img_request->rq, status);
2305         }
2306
2307         return more;
2308 }
2309
2310 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2311 {
2312         struct rbd_img_request *img_request;
2313         u32 which = obj_request->which;
2314         bool more = true;
2315
2316         rbd_assert(obj_request_img_data_test(obj_request));
2317         img_request = obj_request->img_request;
2318
2319         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2320         rbd_assert(img_request != NULL);
2321         rbd_assert(img_request->obj_request_count > 0);
2322         rbd_assert(which != BAD_WHICH);
2323         rbd_assert(which < img_request->obj_request_count);
2324
2325         spin_lock_irq(&img_request->completion_lock);
2326         if (which != img_request->next_completion)
2327                 goto out;
2328
2329         for_each_obj_request_from(img_request, obj_request) {
2330                 rbd_assert(more);
2331                 rbd_assert(which < img_request->obj_request_count);
2332
2333                 if (!obj_request_done_test(obj_request))
2334                         break;
2335                 more = rbd_img_obj_end_request(obj_request);
2336                 which++;
2337         }
2338
2339         rbd_assert(more ^ (which == img_request->obj_request_count));
2340         img_request->next_completion = which;
2341 out:
2342         spin_unlock_irq(&img_request->completion_lock);
2343         rbd_img_request_put(img_request);
2344
2345         if (!more)
2346                 rbd_img_request_complete(img_request);
2347 }
2348
2349 /*
2350  * Add individual osd ops to the given ceph_osd_request and prepare
2351  * them for submission. num_ops is the current number of
2352  * osd operations already to the object request.
2353  */
2354 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2355                                 struct ceph_osd_request *osd_request,
2356                                 enum obj_operation_type op_type,
2357                                 unsigned int num_ops)
2358 {
2359         struct rbd_img_request *img_request = obj_request->img_request;
2360         struct rbd_device *rbd_dev = img_request->rbd_dev;
2361         u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2362         u64 offset = obj_request->offset;
2363         u64 length = obj_request->length;
2364         u64 img_end;
2365         u16 opcode;
2366
2367         if (op_type == OBJ_OP_DISCARD) {
2368                 if (!offset && length == object_size &&
2369                     (!img_request_layered_test(img_request) ||
2370                      !obj_request_overlaps_parent(obj_request))) {
2371                         opcode = CEPH_OSD_OP_DELETE;
2372                 } else if ((offset + length == object_size)) {
2373                         opcode = CEPH_OSD_OP_TRUNCATE;
2374                 } else {
2375                         down_read(&rbd_dev->header_rwsem);
2376                         img_end = rbd_dev->header.image_size;
2377                         up_read(&rbd_dev->header_rwsem);
2378
2379                         if (obj_request->img_offset + length == img_end)
2380                                 opcode = CEPH_OSD_OP_TRUNCATE;
2381                         else
2382                                 opcode = CEPH_OSD_OP_ZERO;
2383                 }
2384         } else if (op_type == OBJ_OP_WRITE) {
2385                 if (!offset && length == object_size)
2386                         opcode = CEPH_OSD_OP_WRITEFULL;
2387                 else
2388                         opcode = CEPH_OSD_OP_WRITE;
2389                 osd_req_op_alloc_hint_init(osd_request, num_ops,
2390                                         object_size, object_size);
2391                 num_ops++;
2392         } else {
2393                 opcode = CEPH_OSD_OP_READ;
2394         }
2395
2396         if (opcode == CEPH_OSD_OP_DELETE)
2397                 osd_req_op_init(osd_request, num_ops, opcode, 0);
2398         else
2399                 osd_req_op_extent_init(osd_request, num_ops, opcode,
2400                                        offset, length, 0, 0);
2401
2402         if (obj_request->type == OBJ_REQUEST_BIO)
2403                 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2404                                         obj_request->bio_list, length);
2405         else if (obj_request->type == OBJ_REQUEST_PAGES)
2406                 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2407                                         obj_request->pages, length,
2408                                         offset & ~PAGE_MASK, false, false);
2409
2410         /* Discards are also writes */
2411         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2412                 rbd_osd_req_format_write(obj_request);
2413         else
2414                 rbd_osd_req_format_read(obj_request);
2415 }
2416
2417 /*
2418  * Split up an image request into one or more object requests, each
2419  * to a different object.  The "type" parameter indicates whether
2420  * "data_desc" is the pointer to the head of a list of bio
2421  * structures, or the base of a page array.  In either case this
2422  * function assumes data_desc describes memory sufficient to hold
2423  * all data described by the image request.
2424  */
2425 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2426                                         enum obj_request_type type,
2427                                         void *data_desc)
2428 {
2429         struct rbd_device *rbd_dev = img_request->rbd_dev;
2430         struct rbd_obj_request *obj_request = NULL;
2431         struct rbd_obj_request *next_obj_request;
2432         struct bio *bio_list = NULL;
2433         unsigned int bio_offset = 0;
2434         struct page **pages = NULL;
2435         enum obj_operation_type op_type;
2436         u64 img_offset;
2437         u64 resid;
2438
2439         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2440                 (int)type, data_desc);
2441
2442         img_offset = img_request->offset;
2443         resid = img_request->length;
2444         rbd_assert(resid > 0);
2445         op_type = rbd_img_request_op_type(img_request);
2446
2447         if (type == OBJ_REQUEST_BIO) {
2448                 bio_list = data_desc;
2449                 rbd_assert(img_offset ==
2450                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2451         } else if (type == OBJ_REQUEST_PAGES) {
2452                 pages = data_desc;
2453         }
2454
2455         while (resid) {
2456                 struct ceph_osd_request *osd_req;
2457                 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2458                 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2459                 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2460
2461                 obj_request = rbd_obj_request_create(type);
2462                 if (!obj_request)
2463                         goto out_unwind;
2464
2465                 obj_request->object_no = object_no;
2466                 obj_request->offset = offset;
2467                 obj_request->length = length;
2468
2469                 /*
2470                  * set obj_request->img_request before creating the
2471                  * osd_request so that it gets the right snapc
2472                  */
2473                 rbd_img_obj_request_add(img_request, obj_request);
2474
2475                 if (type == OBJ_REQUEST_BIO) {
2476                         unsigned int clone_size;
2477
2478                         rbd_assert(length <= (u64)UINT_MAX);
2479                         clone_size = (unsigned int)length;
2480                         obj_request->bio_list =
2481                                         bio_chain_clone_range(&bio_list,
2482                                                                 &bio_offset,
2483                                                                 clone_size,
2484                                                                 GFP_NOIO);
2485                         if (!obj_request->bio_list)
2486                                 goto out_unwind;
2487                 } else if (type == OBJ_REQUEST_PAGES) {
2488                         unsigned int page_count;
2489
2490                         obj_request->pages = pages;
2491                         page_count = (u32)calc_pages_for(offset, length);
2492                         obj_request->page_count = page_count;
2493                         if ((offset + length) & ~PAGE_MASK)
2494                                 page_count--;   /* more on last page */
2495                         pages += page_count;
2496                 }
2497
2498                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2499                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2500                                         obj_request);
2501                 if (!osd_req)
2502                         goto out_unwind;
2503
2504                 obj_request->osd_req = osd_req;
2505                 obj_request->callback = rbd_img_obj_callback;
2506                 obj_request->img_offset = img_offset;
2507
2508                 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2509
2510                 img_offset += length;
2511                 resid -= length;
2512         }
2513
2514         return 0;
2515
2516 out_unwind:
2517         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2518                 rbd_img_obj_request_del(img_request, obj_request);
2519
2520         return -ENOMEM;
2521 }
2522
2523 static void
2524 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2525 {
2526         struct rbd_img_request *img_request;
2527         struct rbd_device *rbd_dev;
2528         struct page **pages;
2529         u32 page_count;
2530
2531         dout("%s: obj %p\n", __func__, obj_request);
2532
2533         rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2534                 obj_request->type == OBJ_REQUEST_NODATA);
2535         rbd_assert(obj_request_img_data_test(obj_request));
2536         img_request = obj_request->img_request;
2537         rbd_assert(img_request);
2538
2539         rbd_dev = img_request->rbd_dev;
2540         rbd_assert(rbd_dev);
2541
2542         pages = obj_request->copyup_pages;
2543         rbd_assert(pages != NULL);
2544         obj_request->copyup_pages = NULL;
2545         page_count = obj_request->copyup_page_count;
2546         rbd_assert(page_count);
2547         obj_request->copyup_page_count = 0;
2548         ceph_release_page_vector(pages, page_count);
2549
2550         /*
2551          * We want the transfer count to reflect the size of the
2552          * original write request.  There is no such thing as a
2553          * successful short write, so if the request was successful
2554          * we can just set it to the originally-requested length.
2555          */
2556         if (!obj_request->result)
2557                 obj_request->xferred = obj_request->length;
2558
2559         obj_request_done_set(obj_request);
2560 }
2561
2562 static void
2563 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2564 {
2565         struct rbd_obj_request *orig_request;
2566         struct ceph_osd_request *osd_req;
2567         struct rbd_device *rbd_dev;
2568         struct page **pages;
2569         enum obj_operation_type op_type;
2570         u32 page_count;
2571         int img_result;
2572         u64 parent_length;
2573
2574         rbd_assert(img_request_child_test(img_request));
2575
2576         /* First get what we need from the image request */
2577
2578         pages = img_request->copyup_pages;
2579         rbd_assert(pages != NULL);
2580         img_request->copyup_pages = NULL;
2581         page_count = img_request->copyup_page_count;
2582         rbd_assert(page_count);
2583         img_request->copyup_page_count = 0;
2584
2585         orig_request = img_request->obj_request;
2586         rbd_assert(orig_request != NULL);
2587         rbd_assert(obj_request_type_valid(orig_request->type));
2588         img_result = img_request->result;
2589         parent_length = img_request->length;
2590         rbd_assert(img_result || parent_length == img_request->xferred);
2591         rbd_img_request_put(img_request);
2592
2593         rbd_assert(orig_request->img_request);
2594         rbd_dev = orig_request->img_request->rbd_dev;
2595         rbd_assert(rbd_dev);
2596
2597         /*
2598          * If the overlap has become 0 (most likely because the
2599          * image has been flattened) we need to free the pages
2600          * and re-submit the original write request.
2601          */
2602         if (!rbd_dev->parent_overlap) {
2603                 ceph_release_page_vector(pages, page_count);
2604                 rbd_obj_request_submit(orig_request);
2605                 return;
2606         }
2607
2608         if (img_result)
2609                 goto out_err;
2610
2611         /*
2612          * The original osd request is of no use to use any more.
2613          * We need a new one that can hold the three ops in a copyup
2614          * request.  Allocate the new copyup osd request for the
2615          * original request, and release the old one.
2616          */
2617         img_result = -ENOMEM;
2618         osd_req = rbd_osd_req_create_copyup(orig_request);
2619         if (!osd_req)
2620                 goto out_err;
2621         rbd_osd_req_destroy(orig_request->osd_req);
2622         orig_request->osd_req = osd_req;
2623         orig_request->copyup_pages = pages;
2624         orig_request->copyup_page_count = page_count;
2625
2626         /* Initialize the copyup op */
2627
2628         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2629         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2630                                                 false, false);
2631
2632         /* Add the other op(s) */
2633
2634         op_type = rbd_img_request_op_type(orig_request->img_request);
2635         rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2636
2637         /* All set, send it off. */
2638
2639         rbd_obj_request_submit(orig_request);
2640         return;
2641
2642 out_err:
2643         ceph_release_page_vector(pages, page_count);
2644         rbd_obj_request_error(orig_request, img_result);
2645 }
2646
2647 /*
2648  * Read from the parent image the range of data that covers the
2649  * entire target of the given object request.  This is used for
2650  * satisfying a layered image write request when the target of an
2651  * object request from the image request does not exist.
2652  *
2653  * A page array big enough to hold the returned data is allocated
2654  * and supplied to rbd_img_request_fill() as the "data descriptor."
2655  * When the read completes, this page array will be transferred to
2656  * the original object request for the copyup operation.
2657  *
2658  * If an error occurs, it is recorded as the result of the original
2659  * object request in rbd_img_obj_exists_callback().
2660  */
2661 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2662 {
2663         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2664         struct rbd_img_request *parent_request = NULL;
2665         u64 img_offset;
2666         u64 length;
2667         struct page **pages = NULL;
2668         u32 page_count;
2669         int result;
2670
2671         rbd_assert(rbd_dev->parent != NULL);
2672
2673         /*
2674          * Determine the byte range covered by the object in the
2675          * child image to which the original request was to be sent.
2676          */
2677         img_offset = obj_request->img_offset - obj_request->offset;
2678         length = rbd_obj_bytes(&rbd_dev->header);
2679
2680         /*
2681          * There is no defined parent data beyond the parent
2682          * overlap, so limit what we read at that boundary if
2683          * necessary.
2684          */
2685         if (img_offset + length > rbd_dev->parent_overlap) {
2686                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2687                 length = rbd_dev->parent_overlap - img_offset;
2688         }
2689
2690         /*
2691          * Allocate a page array big enough to receive the data read
2692          * from the parent.
2693          */
2694         page_count = (u32)calc_pages_for(0, length);
2695         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2696         if (IS_ERR(pages)) {
2697                 result = PTR_ERR(pages);
2698                 pages = NULL;
2699                 goto out_err;
2700         }
2701
2702         result = -ENOMEM;
2703         parent_request = rbd_parent_request_create(obj_request,
2704                                                 img_offset, length);
2705         if (!parent_request)
2706                 goto out_err;
2707
2708         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2709         if (result)
2710                 goto out_err;
2711
2712         parent_request->copyup_pages = pages;
2713         parent_request->copyup_page_count = page_count;
2714         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2715
2716         result = rbd_img_request_submit(parent_request);
2717         if (!result)
2718                 return 0;
2719
2720         parent_request->copyup_pages = NULL;
2721         parent_request->copyup_page_count = 0;
2722         parent_request->obj_request = NULL;
2723         rbd_obj_request_put(obj_request);
2724 out_err:
2725         if (pages)
2726                 ceph_release_page_vector(pages, page_count);
2727         if (parent_request)
2728                 rbd_img_request_put(parent_request);
2729         return result;
2730 }
2731
2732 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2733 {
2734         struct rbd_obj_request *orig_request;
2735         struct rbd_device *rbd_dev;
2736         int result;
2737
2738         rbd_assert(!obj_request_img_data_test(obj_request));
2739
2740         /*
2741          * All we need from the object request is the original
2742          * request and the result of the STAT op.  Grab those, then
2743          * we're done with the request.
2744          */
2745         orig_request = obj_request->obj_request;
2746         obj_request->obj_request = NULL;
2747         rbd_obj_request_put(orig_request);
2748         rbd_assert(orig_request);
2749         rbd_assert(orig_request->img_request);
2750
2751         result = obj_request->result;
2752         obj_request->result = 0;
2753
2754         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2755                 obj_request, orig_request, result,
2756                 obj_request->xferred, obj_request->length);
2757         rbd_obj_request_put(obj_request);
2758
2759         /*
2760          * If the overlap has become 0 (most likely because the
2761          * image has been flattened) we need to re-submit the
2762          * original request.
2763          */
2764         rbd_dev = orig_request->img_request->rbd_dev;
2765         if (!rbd_dev->parent_overlap) {
2766                 rbd_obj_request_submit(orig_request);
2767                 return;
2768         }
2769
2770         /*
2771          * Our only purpose here is to determine whether the object
2772          * exists, and we don't want to treat the non-existence as
2773          * an error.  If something else comes back, transfer the
2774          * error to the original request and complete it now.
2775          */
2776         if (!result) {
2777                 obj_request_existence_set(orig_request, true);
2778         } else if (result == -ENOENT) {
2779                 obj_request_existence_set(orig_request, false);
2780         } else {
2781                 goto fail_orig_request;
2782         }
2783
2784         /*
2785          * Resubmit the original request now that we have recorded
2786          * whether the target object exists.
2787          */
2788         result = rbd_img_obj_request_submit(orig_request);
2789         if (result)
2790                 goto fail_orig_request;
2791
2792         return;
2793
2794 fail_orig_request:
2795         rbd_obj_request_error(orig_request, result);
2796 }
2797
2798 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2799 {
2800         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2801         struct rbd_obj_request *stat_request;
2802         struct page **pages;
2803         u32 page_count;
2804         size_t size;
2805         int ret;
2806
2807         stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2808         if (!stat_request)
2809                 return -ENOMEM;
2810
2811         stat_request->object_no = obj_request->object_no;
2812
2813         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2814                                                    stat_request);
2815         if (!stat_request->osd_req) {
2816                 ret = -ENOMEM;
2817                 goto fail_stat_request;
2818         }
2819
2820         /*
2821          * The response data for a STAT call consists of:
2822          *     le64 length;
2823          *     struct {
2824          *         le32 tv_sec;
2825          *         le32 tv_nsec;
2826          *     } mtime;
2827          */
2828         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2829         page_count = (u32)calc_pages_for(0, size);
2830         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2831         if (IS_ERR(pages)) {
2832                 ret = PTR_ERR(pages);
2833                 goto fail_stat_request;
2834         }
2835
2836         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2837         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2838                                      false, false);
2839
2840         rbd_obj_request_get(obj_request);
2841         stat_request->obj_request = obj_request;
2842         stat_request->pages = pages;
2843         stat_request->page_count = page_count;
2844         stat_request->callback = rbd_img_obj_exists_callback;
2845
2846         rbd_obj_request_submit(stat_request);
2847         return 0;
2848
2849 fail_stat_request:
2850         rbd_obj_request_put(stat_request);
2851         return ret;
2852 }
2853
2854 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2855 {
2856         struct rbd_img_request *img_request = obj_request->img_request;
2857         struct rbd_device *rbd_dev = img_request->rbd_dev;
2858
2859         /* Reads */
2860         if (!img_request_write_test(img_request) &&
2861             !img_request_discard_test(img_request))
2862                 return true;
2863
2864         /* Non-layered writes */
2865         if (!img_request_layered_test(img_request))
2866                 return true;
2867
2868         /*
2869          * Layered writes outside of the parent overlap range don't
2870          * share any data with the parent.
2871          */
2872         if (!obj_request_overlaps_parent(obj_request))
2873                 return true;
2874
2875         /*
2876          * Entire-object layered writes - we will overwrite whatever
2877          * parent data there is anyway.
2878          */
2879         if (!obj_request->offset &&
2880             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2881                 return true;
2882
2883         /*
2884          * If the object is known to already exist, its parent data has
2885          * already been copied.
2886          */
2887         if (obj_request_known_test(obj_request) &&
2888             obj_request_exists_test(obj_request))
2889                 return true;
2890
2891         return false;
2892 }
2893
2894 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2895 {
2896         rbd_assert(obj_request_img_data_test(obj_request));
2897         rbd_assert(obj_request_type_valid(obj_request->type));
2898         rbd_assert(obj_request->img_request);
2899
2900         if (img_obj_request_simple(obj_request)) {
2901                 rbd_obj_request_submit(obj_request);
2902                 return 0;
2903         }
2904
2905         /*
2906          * It's a layered write.  The target object might exist but
2907          * we may not know that yet.  If we know it doesn't exist,
2908          * start by reading the data for the full target object from
2909          * the parent so we can use it for a copyup to the target.
2910          */
2911         if (obj_request_known_test(obj_request))
2912                 return rbd_img_obj_parent_read_full(obj_request);
2913
2914         /* We don't know whether the target exists.  Go find out. */
2915
2916         return rbd_img_obj_exists_submit(obj_request);
2917 }
2918
2919 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2920 {
2921         struct rbd_obj_request *obj_request;
2922         struct rbd_obj_request *next_obj_request;
2923         int ret = 0;
2924
2925         dout("%s: img %p\n", __func__, img_request);
2926
2927         rbd_img_request_get(img_request);
2928         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2929                 ret = rbd_img_obj_request_submit(obj_request);
2930                 if (ret)
2931                         goto out_put_ireq;
2932         }
2933
2934 out_put_ireq:
2935         rbd_img_request_put(img_request);
2936         return ret;
2937 }
2938
2939 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2940 {
2941         struct rbd_obj_request *obj_request;
2942         struct rbd_device *rbd_dev;
2943         u64 obj_end;
2944         u64 img_xferred;
2945         int img_result;
2946
2947         rbd_assert(img_request_child_test(img_request));
2948
2949         /* First get what we need from the image request and release it */
2950
2951         obj_request = img_request->obj_request;
2952         img_xferred = img_request->xferred;
2953         img_result = img_request->result;
2954         rbd_img_request_put(img_request);
2955
2956         /*
2957          * If the overlap has become 0 (most likely because the
2958          * image has been flattened) we need to re-submit the
2959          * original request.
2960          */
2961         rbd_assert(obj_request);
2962         rbd_assert(obj_request->img_request);
2963         rbd_dev = obj_request->img_request->rbd_dev;
2964         if (!rbd_dev->parent_overlap) {
2965                 rbd_obj_request_submit(obj_request);
2966                 return;
2967         }
2968
2969         obj_request->result = img_result;
2970         if (obj_request->result)
2971                 goto out;
2972
2973         /*
2974          * We need to zero anything beyond the parent overlap
2975          * boundary.  Since rbd_img_obj_request_read_callback()
2976          * will zero anything beyond the end of a short read, an
2977          * easy way to do this is to pretend the data from the
2978          * parent came up short--ending at the overlap boundary.
2979          */
2980         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2981         obj_end = obj_request->img_offset + obj_request->length;
2982         if (obj_end > rbd_dev->parent_overlap) {
2983                 u64 xferred = 0;
2984
2985                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2986                         xferred = rbd_dev->parent_overlap -
2987                                         obj_request->img_offset;
2988
2989                 obj_request->xferred = min(img_xferred, xferred);
2990         } else {
2991                 obj_request->xferred = img_xferred;
2992         }
2993 out:
2994         rbd_img_obj_request_read_callback(obj_request);
2995         rbd_obj_request_complete(obj_request);
2996 }
2997
2998 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2999 {
3000         struct rbd_img_request *img_request;
3001         int result;
3002
3003         rbd_assert(obj_request_img_data_test(obj_request));
3004         rbd_assert(obj_request->img_request != NULL);
3005         rbd_assert(obj_request->result == (s32) -ENOENT);
3006         rbd_assert(obj_request_type_valid(obj_request->type));
3007
3008         /* rbd_read_finish(obj_request, obj_request->length); */
3009         img_request = rbd_parent_request_create(obj_request,
3010                                                 obj_request->img_offset,
3011                                                 obj_request->length);
3012         result = -ENOMEM;
3013         if (!img_request)
3014                 goto out_err;
3015
3016         if (obj_request->type == OBJ_REQUEST_BIO)
3017                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3018                                                 obj_request->bio_list);
3019         else
3020                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3021                                                 obj_request->pages);
3022         if (result)
3023                 goto out_err;
3024
3025         img_request->callback = rbd_img_parent_read_callback;
3026         result = rbd_img_request_submit(img_request);
3027         if (result)
3028                 goto out_err;
3029
3030         return;
3031 out_err:
3032         if (img_request)
3033                 rbd_img_request_put(img_request);
3034         obj_request->result = result;
3035         obj_request->xferred = 0;
3036         obj_request_done_set(obj_request);
3037 }
3038
3039 static const struct rbd_client_id rbd_empty_cid;
3040
3041 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3042                           const struct rbd_client_id *rhs)
3043 {
3044         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3045 }
3046
3047 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3048 {
3049         struct rbd_client_id cid;
3050
3051         mutex_lock(&rbd_dev->watch_mutex);
3052         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3053         cid.handle = rbd_dev->watch_cookie;
3054         mutex_unlock(&rbd_dev->watch_mutex);
3055         return cid;
3056 }
3057
3058 /*
3059  * lock_rwsem must be held for write
3060  */
3061 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3062                               const struct rbd_client_id *cid)
3063 {
3064         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3065              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3066              cid->gid, cid->handle);
3067         rbd_dev->owner_cid = *cid; /* struct */
3068 }
3069
3070 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3071 {
3072         mutex_lock(&rbd_dev->watch_mutex);
3073         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3074         mutex_unlock(&rbd_dev->watch_mutex);
3075 }
3076
3077 /*
3078  * lock_rwsem must be held for write
3079  */
3080 static int rbd_lock(struct rbd_device *rbd_dev)
3081 {
3082         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3083         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3084         char cookie[32];
3085         int ret;
3086
3087         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3088                 rbd_dev->lock_cookie[0] != '\0');
3089
3090         format_lock_cookie(rbd_dev, cookie);
3091         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3092                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3093                             RBD_LOCK_TAG, "", 0);
3094         if (ret)
3095                 return ret;
3096
3097         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3098         strcpy(rbd_dev->lock_cookie, cookie);
3099         rbd_set_owner_cid(rbd_dev, &cid);
3100         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3101         return 0;
3102 }
3103
3104 /*
3105  * lock_rwsem must be held for write
3106  */
3107 static void rbd_unlock(struct rbd_device *rbd_dev)
3108 {
3109         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3110         int ret;
3111
3112         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3113                 rbd_dev->lock_cookie[0] == '\0');
3114
3115         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3116                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3117         if (ret && ret != -ENOENT)
3118                 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3119
3120         /* treat errors as the image is unlocked */
3121         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3122         rbd_dev->lock_cookie[0] = '\0';
3123         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3124         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3125 }
3126
3127 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3128                                 enum rbd_notify_op notify_op,
3129                                 struct page ***preply_pages,
3130                                 size_t *preply_len)
3131 {
3132         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3133         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3134         int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3135         char buf[buf_size];
3136         void *p = buf;
3137
3138         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3139
3140         /* encode *LockPayload NotifyMessage (op + ClientId) */
3141         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3142         ceph_encode_32(&p, notify_op);
3143         ceph_encode_64(&p, cid.gid);
3144         ceph_encode_64(&p, cid.handle);
3145
3146         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3147                                 &rbd_dev->header_oloc, buf, buf_size,
3148                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3149 }
3150
3151 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3152                                enum rbd_notify_op notify_op)
3153 {
3154         struct page **reply_pages;
3155         size_t reply_len;
3156
3157         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3158         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3159 }
3160
3161 static void rbd_notify_acquired_lock(struct work_struct *work)
3162 {
3163         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3164                                                   acquired_lock_work);
3165
3166         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3167 }
3168
3169 static void rbd_notify_released_lock(struct work_struct *work)
3170 {
3171         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3172                                                   released_lock_work);
3173
3174         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3175 }
3176
3177 static int rbd_request_lock(struct rbd_device *rbd_dev)
3178 {
3179         struct page **reply_pages;
3180         size_t reply_len;
3181         bool lock_owner_responded = false;
3182         int ret;
3183
3184         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3185
3186         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3187                                    &reply_pages, &reply_len);
3188         if (ret && ret != -ETIMEDOUT) {
3189                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3190                 goto out;
3191         }
3192
3193         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3194                 void *p = page_address(reply_pages[0]);
3195                 void *const end = p + reply_len;
3196                 u32 n;
3197
3198                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3199                 while (n--) {
3200                         u8 struct_v;
3201                         u32 len;
3202
3203                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3204                         p += 8 + 8; /* skip gid and cookie */
3205
3206                         ceph_decode_32_safe(&p, end, len, e_inval);
3207                         if (!len)
3208                                 continue;
3209
3210                         if (lock_owner_responded) {
3211                                 rbd_warn(rbd_dev,
3212                                          "duplicate lock owners detected");
3213                                 ret = -EIO;
3214                                 goto out;
3215                         }
3216
3217                         lock_owner_responded = true;
3218                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3219                                                   &struct_v, &len);
3220                         if (ret) {
3221                                 rbd_warn(rbd_dev,
3222                                          "failed to decode ResponseMessage: %d",
3223                                          ret);
3224                                 goto e_inval;
3225                         }
3226
3227                         ret = ceph_decode_32(&p);
3228                 }
3229         }
3230
3231         if (!lock_owner_responded) {
3232                 rbd_warn(rbd_dev, "no lock owners detected");
3233                 ret = -ETIMEDOUT;
3234         }
3235
3236 out:
3237         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3238         return ret;
3239
3240 e_inval:
3241         ret = -EINVAL;
3242         goto out;
3243 }
3244
3245 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3246 {
3247         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3248
3249         cancel_delayed_work(&rbd_dev->lock_dwork);
3250         if (wake_all)
3251                 wake_up_all(&rbd_dev->lock_waitq);
3252         else
3253                 wake_up(&rbd_dev->lock_waitq);
3254 }
3255
3256 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3257                                struct ceph_locker **lockers, u32 *num_lockers)
3258 {
3259         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3260         u8 lock_type;
3261         char *lock_tag;
3262         int ret;
3263
3264         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3265
3266         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3267                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3268                                  &lock_type, &lock_tag, lockers, num_lockers);
3269         if (ret)
3270                 return ret;
3271
3272         if (*num_lockers == 0) {
3273                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3274                 goto out;
3275         }
3276
3277         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3278                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3279                          lock_tag);
3280                 ret = -EBUSY;
3281                 goto out;
3282         }
3283
3284         if (lock_type == CEPH_CLS_LOCK_SHARED) {
3285                 rbd_warn(rbd_dev, "shared lock type detected");
3286                 ret = -EBUSY;
3287                 goto out;
3288         }
3289
3290         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3291                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
3292                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3293                          (*lockers)[0].id.cookie);
3294                 ret = -EBUSY;
3295                 goto out;
3296         }
3297
3298 out:
3299         kfree(lock_tag);
3300         return ret;
3301 }
3302
3303 static int find_watcher(struct rbd_device *rbd_dev,
3304                         const struct ceph_locker *locker)
3305 {
3306         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3307         struct ceph_watch_item *watchers;
3308         u32 num_watchers;
3309         u64 cookie;
3310         int i;
3311         int ret;
3312
3313         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3314                                       &rbd_dev->header_oloc, &watchers,
3315                                       &num_watchers);
3316         if (ret)
3317                 return ret;
3318
3319         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3320         for (i = 0; i < num_watchers; i++) {
3321                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3322                             sizeof(locker->info.addr)) &&
3323                     watchers[i].cookie == cookie) {
3324                         struct rbd_client_id cid = {
3325                                 .gid = le64_to_cpu(watchers[i].name.num),
3326                                 .handle = cookie,
3327                         };
3328
3329                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3330                              rbd_dev, cid.gid, cid.handle);
3331                         rbd_set_owner_cid(rbd_dev, &cid);
3332                         ret = 1;
3333                         goto out;
3334                 }
3335         }
3336
3337         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3338         ret = 0;
3339 out:
3340         kfree(watchers);
3341         return ret;
3342 }
3343
3344 /*
3345  * lock_rwsem must be held for write
3346  */
3347 static int rbd_try_lock(struct rbd_device *rbd_dev)
3348 {
3349         struct ceph_client *client = rbd_dev->rbd_client->client;
3350         struct ceph_locker *lockers;
3351         u32 num_lockers;
3352         int ret;
3353
3354         for (;;) {
3355                 ret = rbd_lock(rbd_dev);
3356                 if (ret != -EBUSY)
3357                         return ret;
3358
3359                 /* determine if the current lock holder is still alive */
3360                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3361                 if (ret)
3362                         return ret;
3363
3364                 if (num_lockers == 0)
3365                         goto again;
3366
3367                 ret = find_watcher(rbd_dev, lockers);
3368                 if (ret) {
3369                         if (ret > 0)
3370                                 ret = 0; /* have to request lock */
3371                         goto out;
3372                 }
3373
3374                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3375                          ENTITY_NAME(lockers[0].id.name));
3376
3377                 ret = ceph_monc_blacklist_add(&client->monc,
3378                                               &lockers[0].info.addr);
3379                 if (ret) {
3380                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3381                                  ENTITY_NAME(lockers[0].id.name), ret);
3382                         goto out;
3383                 }
3384
3385                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3386                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
3387                                           lockers[0].id.cookie,
3388                                           &lockers[0].id.name);
3389                 if (ret && ret != -ENOENT)
3390                         goto out;
3391
3392 again:
3393                 ceph_free_lockers(lockers, num_lockers);
3394         }
3395
3396 out:
3397         ceph_free_lockers(lockers, num_lockers);
3398         return ret;
3399 }
3400
3401 /*
3402  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3403  */
3404 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3405                                                 int *pret)
3406 {
3407         enum rbd_lock_state lock_state;
3408
3409         down_read(&rbd_dev->lock_rwsem);
3410         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3411              rbd_dev->lock_state);
3412         if (__rbd_is_lock_owner(rbd_dev)) {
3413                 lock_state = rbd_dev->lock_state;
3414                 up_read(&rbd_dev->lock_rwsem);
3415                 return lock_state;
3416         }
3417
3418         up_read(&rbd_dev->lock_rwsem);
3419         down_write(&rbd_dev->lock_rwsem);
3420         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3421              rbd_dev->lock_state);
3422         if (!__rbd_is_lock_owner(rbd_dev)) {
3423                 *pret = rbd_try_lock(rbd_dev);
3424                 if (*pret)
3425                         rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3426         }
3427
3428         lock_state = rbd_dev->lock_state;
3429         up_write(&rbd_dev->lock_rwsem);
3430         return lock_state;
3431 }
3432
3433 static void rbd_acquire_lock(struct work_struct *work)
3434 {
3435         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3436                                             struct rbd_device, lock_dwork);
3437         enum rbd_lock_state lock_state;
3438         int ret = 0;
3439
3440         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3441 again:
3442         lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3443         if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3444                 if (lock_state == RBD_LOCK_STATE_LOCKED)
3445                         wake_requests(rbd_dev, true);
3446                 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3447                      rbd_dev, lock_state, ret);
3448                 return;
3449         }
3450
3451         ret = rbd_request_lock(rbd_dev);
3452         if (ret == -ETIMEDOUT) {
3453                 goto again; /* treat this as a dead client */
3454         } else if (ret == -EROFS) {
3455                 rbd_warn(rbd_dev, "peer will not release lock");
3456                 /*
3457                  * If this is rbd_add_acquire_lock(), we want to fail
3458                  * immediately -- reuse BLACKLISTED flag.  Otherwise we
3459                  * want to block.
3460                  */
3461                 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3462                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3463                         /* wake "rbd map --exclusive" process */
3464                         wake_requests(rbd_dev, false);
3465                 }
3466         } else if (ret < 0) {
3467                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3468                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3469                                  RBD_RETRY_DELAY);
3470         } else {
3471                 /*
3472                  * lock owner acked, but resend if we don't see them
3473                  * release the lock
3474                  */
3475                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3476                      rbd_dev);
3477                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3478                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3479         }
3480 }
3481
3482 /*
3483  * lock_rwsem must be held for write
3484  */
3485 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3486 {
3487         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3488              rbd_dev->lock_state);
3489         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3490                 return false;
3491
3492         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3493         downgrade_write(&rbd_dev->lock_rwsem);
3494         /*
3495          * Ensure that all in-flight IO is flushed.
3496          *
3497          * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3498          * may be shared with other devices.
3499          */
3500         ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3501         up_read(&rbd_dev->lock_rwsem);
3502
3503         down_write(&rbd_dev->lock_rwsem);
3504         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3505              rbd_dev->lock_state);
3506         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3507                 return false;
3508
3509         rbd_unlock(rbd_dev);
3510         /*
3511          * Give others a chance to grab the lock - we would re-acquire
3512          * almost immediately if we got new IO during ceph_osdc_sync()
3513          * otherwise.  We need to ack our own notifications, so this
3514          * lock_dwork will be requeued from rbd_wait_state_locked()
3515          * after wake_requests() in rbd_handle_released_lock().
3516          */
3517         cancel_delayed_work(&rbd_dev->lock_dwork);
3518         return true;
3519 }
3520
3521 static void rbd_release_lock_work(struct work_struct *work)
3522 {
3523         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3524                                                   unlock_work);
3525
3526         down_write(&rbd_dev->lock_rwsem);
3527         rbd_release_lock(rbd_dev);
3528         up_write(&rbd_dev->lock_rwsem);
3529 }
3530
3531 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3532                                      void **p)
3533 {
3534         struct rbd_client_id cid = { 0 };
3535
3536         if (struct_v >= 2) {
3537                 cid.gid = ceph_decode_64(p);
3538                 cid.handle = ceph_decode_64(p);
3539         }
3540
3541         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3542              cid.handle);
3543         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3544                 down_write(&rbd_dev->lock_rwsem);
3545                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3546                         /*
3547                          * we already know that the remote client is
3548                          * the owner
3549                          */
3550                         up_write(&rbd_dev->lock_rwsem);
3551                         return;
3552                 }
3553
3554                 rbd_set_owner_cid(rbd_dev, &cid);
3555                 downgrade_write(&rbd_dev->lock_rwsem);
3556         } else {
3557                 down_read(&rbd_dev->lock_rwsem);
3558         }
3559
3560         if (!__rbd_is_lock_owner(rbd_dev))
3561                 wake_requests(rbd_dev, false);
3562         up_read(&rbd_dev->lock_rwsem);
3563 }
3564
3565 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3566                                      void **p)
3567 {
3568         struct rbd_client_id cid = { 0 };
3569
3570         if (struct_v >= 2) {
3571                 cid.gid = ceph_decode_64(p);
3572                 cid.handle = ceph_decode_64(p);
3573         }
3574
3575         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3576              cid.handle);
3577         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3578                 down_write(&rbd_dev->lock_rwsem);
3579                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3580                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3581                              __func__, rbd_dev, cid.gid, cid.handle,
3582                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3583                         up_write(&rbd_dev->lock_rwsem);
3584                         return;
3585                 }
3586
3587                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3588                 downgrade_write(&rbd_dev->lock_rwsem);
3589         } else {
3590                 down_read(&rbd_dev->lock_rwsem);
3591         }
3592
3593         if (!__rbd_is_lock_owner(rbd_dev))
3594                 wake_requests(rbd_dev, false);
3595         up_read(&rbd_dev->lock_rwsem);
3596 }
3597
3598 /*
3599  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3600  * ResponseMessage is needed.
3601  */
3602 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3603                                    void **p)
3604 {
3605         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3606         struct rbd_client_id cid = { 0 };
3607         int result = 1;
3608
3609         if (struct_v >= 2) {
3610                 cid.gid = ceph_decode_64(p);
3611                 cid.handle = ceph_decode_64(p);
3612         }
3613
3614         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3615              cid.handle);
3616         if (rbd_cid_equal(&cid, &my_cid))
3617                 return result;
3618
3619         down_read(&rbd_dev->lock_rwsem);
3620         if (__rbd_is_lock_owner(rbd_dev)) {
3621                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3622                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3623                         goto out_unlock;
3624
3625                 /*
3626                  * encode ResponseMessage(0) so the peer can detect
3627                  * a missing owner
3628                  */
3629                 result = 0;
3630
3631                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3632                         if (!rbd_dev->opts->exclusive) {
3633                                 dout("%s rbd_dev %p queueing unlock_work\n",
3634                                      __func__, rbd_dev);
3635                                 queue_work(rbd_dev->task_wq,
3636                                            &rbd_dev->unlock_work);
3637                         } else {
3638                                 /* refuse to release the lock */
3639                                 result = -EROFS;
3640                         }
3641                 }
3642         }
3643
3644 out_unlock:
3645         up_read(&rbd_dev->lock_rwsem);
3646         return result;
3647 }
3648
3649 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3650                                      u64 notify_id, u64 cookie, s32 *result)
3651 {
3652         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3653         int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3654         char buf[buf_size];
3655         int ret;
3656
3657         if (result) {
3658                 void *p = buf;
3659
3660                 /* encode ResponseMessage */
3661                 ceph_start_encoding(&p, 1, 1,
3662                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
3663                 ceph_encode_32(&p, *result);
3664         } else {
3665                 buf_size = 0;
3666         }
3667
3668         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3669                                    &rbd_dev->header_oloc, notify_id, cookie,
3670                                    buf, buf_size);
3671         if (ret)
3672                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3673 }
3674
3675 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3676                                    u64 cookie)
3677 {
3678         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3679         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3680 }
3681
3682 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3683                                           u64 notify_id, u64 cookie, s32 result)
3684 {
3685         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3686         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3687 }
3688
3689 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3690                          u64 notifier_id, void *data, size_t data_len)
3691 {
3692         struct rbd_device *rbd_dev = arg;
3693         void *p = data;
3694         void *const end = p + data_len;
3695         u8 struct_v = 0;
3696         u32 len;
3697         u32 notify_op;
3698         int ret;
3699
3700         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3701              __func__, rbd_dev, cookie, notify_id, data_len);
3702         if (data_len) {
3703                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3704                                           &struct_v, &len);
3705                 if (ret) {
3706                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3707                                  ret);
3708                         return;
3709                 }
3710
3711                 notify_op = ceph_decode_32(&p);
3712         } else {
3713                 /* legacy notification for header updates */
3714                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3715                 len = 0;
3716         }
3717
3718         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3719         switch (notify_op) {
3720         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3721                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3722                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3723                 break;
3724         case RBD_NOTIFY_OP_RELEASED_LOCK:
3725                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3726                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3727                 break;
3728         case RBD_NOTIFY_OP_REQUEST_LOCK:
3729                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3730                 if (ret <= 0)
3731                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3732                                                       cookie, ret);
3733                 else
3734                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3735                 break;
3736         case RBD_NOTIFY_OP_HEADER_UPDATE:
3737                 ret = rbd_dev_refresh(rbd_dev);
3738                 if (ret)
3739                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
3740
3741                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3742                 break;
3743         default:
3744                 if (rbd_is_lock_owner(rbd_dev))
3745                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3746                                                       cookie, -EOPNOTSUPP);
3747                 else
3748                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3749                 break;
3750         }
3751 }
3752
3753 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3754
3755 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3756 {
3757         struct rbd_device *rbd_dev = arg;
3758
3759         rbd_warn(rbd_dev, "encountered watch error: %d", err);
3760
3761         down_write(&rbd_dev->lock_rwsem);
3762         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3763         up_write(&rbd_dev->lock_rwsem);
3764
3765         mutex_lock(&rbd_dev->watch_mutex);
3766         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3767                 __rbd_unregister_watch(rbd_dev);
3768                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3769
3770                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3771         }
3772         mutex_unlock(&rbd_dev->watch_mutex);
3773 }
3774
3775 /*
3776  * watch_mutex must be locked
3777  */
3778 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3779 {
3780         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3781         struct ceph_osd_linger_request *handle;
3782
3783         rbd_assert(!rbd_dev->watch_handle);
3784         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3785
3786         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3787                                  &rbd_dev->header_oloc, rbd_watch_cb,
3788                                  rbd_watch_errcb, rbd_dev);
3789         if (IS_ERR(handle))
3790                 return PTR_ERR(handle);
3791
3792         rbd_dev->watch_handle = handle;
3793         return 0;
3794 }
3795
3796 /*
3797  * watch_mutex must be locked
3798  */
3799 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3800 {
3801         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3802         int ret;
3803
3804         rbd_assert(rbd_dev->watch_handle);
3805         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3806
3807         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3808         if (ret)
3809                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3810
3811         rbd_dev->watch_handle = NULL;
3812 }
3813
3814 static int rbd_register_watch(struct rbd_device *rbd_dev)
3815 {
3816         int ret;
3817
3818         mutex_lock(&rbd_dev->watch_mutex);
3819         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3820         ret = __rbd_register_watch(rbd_dev);
3821         if (ret)
3822                 goto out;
3823
3824         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3825         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3826
3827 out:
3828         mutex_unlock(&rbd_dev->watch_mutex);
3829         return ret;
3830 }
3831
3832 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3833 {
3834         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3835
3836         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3837         cancel_work_sync(&rbd_dev->acquired_lock_work);
3838         cancel_work_sync(&rbd_dev->released_lock_work);
3839         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3840         cancel_work_sync(&rbd_dev->unlock_work);
3841 }
3842
3843 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3844 {
3845         WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3846         cancel_tasks_sync(rbd_dev);
3847
3848         mutex_lock(&rbd_dev->watch_mutex);
3849         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3850                 __rbd_unregister_watch(rbd_dev);
3851         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3852         mutex_unlock(&rbd_dev->watch_mutex);
3853
3854         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3855 }
3856
3857 /*
3858  * lock_rwsem must be held for write
3859  */
3860 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3861 {
3862         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3863         char cookie[32];
3864         int ret;
3865
3866         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3867
3868         format_lock_cookie(rbd_dev, cookie);
3869         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3870                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
3871                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3872                                   RBD_LOCK_TAG, cookie);
3873         if (ret) {
3874                 if (ret != -EOPNOTSUPP)
3875                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3876                                  ret);
3877
3878                 /*
3879                  * Lock cookie cannot be updated on older OSDs, so do
3880                  * a manual release and queue an acquire.
3881                  */
3882                 if (rbd_release_lock(rbd_dev))
3883                         queue_delayed_work(rbd_dev->task_wq,
3884                                            &rbd_dev->lock_dwork, 0);
3885         } else {
3886                 strcpy(rbd_dev->lock_cookie, cookie);
3887         }
3888 }
3889
3890 static void rbd_reregister_watch(struct work_struct *work)
3891 {
3892         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3893                                             struct rbd_device, watch_dwork);
3894         int ret;
3895
3896         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3897
3898         mutex_lock(&rbd_dev->watch_mutex);
3899         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3900                 mutex_unlock(&rbd_dev->watch_mutex);
3901                 return;
3902         }
3903
3904         ret = __rbd_register_watch(rbd_dev);
3905         if (ret) {
3906                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3907                 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3908                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3909                         wake_requests(rbd_dev, true);
3910                 } else {
3911                         queue_delayed_work(rbd_dev->task_wq,
3912                                            &rbd_dev->watch_dwork,
3913                                            RBD_RETRY_DELAY);
3914                 }
3915                 mutex_unlock(&rbd_dev->watch_mutex);
3916                 return;
3917         }
3918
3919         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3920         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3921         mutex_unlock(&rbd_dev->watch_mutex);
3922
3923         down_write(&rbd_dev->lock_rwsem);
3924         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3925                 rbd_reacquire_lock(rbd_dev);
3926         up_write(&rbd_dev->lock_rwsem);
3927
3928         ret = rbd_dev_refresh(rbd_dev);
3929         if (ret)
3930                 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3931 }
3932
3933 /*
3934  * Synchronous osd object method call.  Returns the number of bytes
3935  * returned in the outbound buffer, or a negative error code.
3936  */
3937 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3938                              struct ceph_object_id *oid,
3939                              struct ceph_object_locator *oloc,
3940                              const char *method_name,
3941                              const void *outbound,
3942                              size_t outbound_size,
3943                              void *inbound,
3944                              size_t inbound_size)
3945 {
3946         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3947         struct page *req_page = NULL;
3948         struct page *reply_page;
3949         int ret;
3950
3951         /*
3952          * Method calls are ultimately read operations.  The result
3953          * should placed into the inbound buffer provided.  They
3954          * also supply outbound data--parameters for the object
3955          * method.  Currently if this is present it will be a
3956          * snapshot id.
3957          */
3958         if (outbound) {
3959                 if (outbound_size > PAGE_SIZE)
3960                         return -E2BIG;
3961
3962                 req_page = alloc_page(GFP_KERNEL);
3963                 if (!req_page)
3964                         return -ENOMEM;
3965
3966                 memcpy(page_address(req_page), outbound, outbound_size);
3967         }
3968
3969         reply_page = alloc_page(GFP_KERNEL);
3970         if (!reply_page) {
3971                 if (req_page)
3972                         __free_page(req_page);
3973                 return -ENOMEM;
3974         }
3975
3976         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3977                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
3978                              reply_page, &inbound_size);
3979         if (!ret) {
3980                 memcpy(inbound, page_address(reply_page), inbound_size);
3981                 ret = inbound_size;
3982         }
3983
3984         if (req_page)
3985                 __free_page(req_page);
3986         __free_page(reply_page);
3987         return ret;
3988 }
3989
3990 /*
3991  * lock_rwsem must be held for read
3992  */
3993 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3994 {
3995         DEFINE_WAIT(wait);
3996
3997         do {
3998                 /*
3999                  * Note the use of mod_delayed_work() in rbd_acquire_lock()
4000                  * and cancel_delayed_work() in wake_requests().
4001                  */
4002                 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4003                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4004                 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4005                                           TASK_UNINTERRUPTIBLE);
4006                 up_read(&rbd_dev->lock_rwsem);
4007                 schedule();
4008                 down_read(&rbd_dev->lock_rwsem);
4009         } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4010                  !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4011
4012         finish_wait(&rbd_dev->lock_waitq, &wait);
4013 }
4014
4015 static void rbd_queue_workfn(struct work_struct *work)
4016 {
4017         struct request *rq = blk_mq_rq_from_pdu(work);
4018         struct rbd_device *rbd_dev = rq->q->queuedata;
4019         struct rbd_img_request *img_request;
4020         struct ceph_snap_context *snapc = NULL;
4021         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4022         u64 length = blk_rq_bytes(rq);
4023         enum obj_operation_type op_type;
4024         u64 mapping_size;
4025         bool must_be_locked;
4026         int result;
4027
4028         switch (req_op(rq)) {
4029         case REQ_OP_DISCARD:
4030         case REQ_OP_WRITE_ZEROES:
4031                 op_type = OBJ_OP_DISCARD;
4032                 break;
4033         case REQ_OP_WRITE:
4034                 op_type = OBJ_OP_WRITE;
4035                 break;
4036         case REQ_OP_READ:
4037                 op_type = OBJ_OP_READ;
4038                 break;
4039         default:
4040                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4041                 result = -EIO;
4042                 goto err;
4043         }
4044
4045         /* Ignore/skip any zero-length requests */
4046
4047         if (!length) {
4048                 dout("%s: zero-length request\n", __func__);
4049                 result = 0;
4050                 goto err_rq;
4051         }
4052
4053         /* Only reads are allowed to a read-only device */
4054
4055         if (op_type != OBJ_OP_READ) {
4056                 if (rbd_dev->mapping.read_only) {
4057                         result = -EROFS;
4058                         goto err_rq;
4059                 }
4060                 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4061         }
4062
4063         /*
4064          * Quit early if the mapped snapshot no longer exists.  It's
4065          * still possible the snapshot will have disappeared by the
4066          * time our request arrives at the osd, but there's no sense in
4067          * sending it if we already know.
4068          */
4069         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4070                 dout("request for non-existent snapshot");
4071                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4072                 result = -ENXIO;
4073                 goto err_rq;
4074         }
4075
4076         if (offset && length > U64_MAX - offset + 1) {
4077                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4078                          length);
4079                 result = -EINVAL;
4080                 goto err_rq;    /* Shouldn't happen */
4081         }
4082
4083         blk_mq_start_request(rq);
4084
4085         down_read(&rbd_dev->header_rwsem);
4086         mapping_size = rbd_dev->mapping.size;
4087         if (op_type != OBJ_OP_READ) {
4088                 snapc = rbd_dev->header.snapc;
4089                 ceph_get_snap_context(snapc);
4090         }
4091         up_read(&rbd_dev->header_rwsem);
4092
4093         if (offset + length > mapping_size) {
4094                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4095                          length, mapping_size);
4096                 result = -EIO;
4097                 goto err_rq;
4098         }
4099
4100         must_be_locked =
4101             (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4102             (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4103         if (must_be_locked) {
4104                 down_read(&rbd_dev->lock_rwsem);
4105                 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4106                     !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4107                         if (rbd_dev->opts->exclusive) {
4108                                 rbd_warn(rbd_dev, "exclusive lock required");
4109                                 result = -EROFS;
4110                                 goto err_unlock;
4111                         }
4112                         rbd_wait_state_locked(rbd_dev);
4113                 }
4114                 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4115                         result = -EBLACKLISTED;
4116                         goto err_unlock;
4117                 }
4118         }
4119
4120         img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4121                                              snapc);
4122         if (!img_request) {
4123                 result = -ENOMEM;
4124                 goto err_unlock;
4125         }
4126         img_request->rq = rq;
4127         snapc = NULL; /* img_request consumes a ref */
4128
4129         if (op_type == OBJ_OP_DISCARD)
4130                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4131                                               NULL);
4132         else
4133                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4134                                               rq->bio);
4135         if (result)
4136                 goto err_img_request;
4137
4138         result = rbd_img_request_submit(img_request);
4139         if (result)
4140                 goto err_img_request;
4141
4142         if (must_be_locked)
4143                 up_read(&rbd_dev->lock_rwsem);
4144         return;
4145
4146 err_img_request:
4147         rbd_img_request_put(img_request);
4148 err_unlock:
4149         if (must_be_locked)
4150                 up_read(&rbd_dev->lock_rwsem);
4151 err_rq:
4152         if (result)
4153                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4154                          obj_op_name(op_type), length, offset, result);
4155         ceph_put_snap_context(snapc);
4156 err:
4157         blk_mq_end_request(rq, errno_to_blk_status(result));
4158 }
4159
4160 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4161                 const struct blk_mq_queue_data *bd)
4162 {
4163         struct request *rq = bd->rq;
4164         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4165
4166         queue_work(rbd_wq, work);
4167         return BLK_STS_OK;
4168 }
4169
4170 static void rbd_free_disk(struct rbd_device *rbd_dev)
4171 {
4172         blk_cleanup_queue(rbd_dev->disk->queue);
4173         blk_mq_free_tag_set(&rbd_dev->tag_set);
4174         put_disk(rbd_dev->disk);
4175         rbd_dev->disk = NULL;
4176 }
4177
4178 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4179                              struct ceph_object_id *oid,
4180                              struct ceph_object_locator *oloc,
4181                              void *buf, int buf_len)
4182
4183 {
4184         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4185         struct ceph_osd_request *req;
4186         struct page **pages;
4187         int num_pages = calc_pages_for(0, buf_len);
4188         int ret;
4189
4190         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4191         if (!req)
4192                 return -ENOMEM;
4193
4194         ceph_oid_copy(&req->r_base_oid, oid);
4195         ceph_oloc_copy(&req->r_base_oloc, oloc);
4196         req->r_flags = CEPH_OSD_FLAG_READ;
4197
4198         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4199         if (ret)
4200                 goto out_req;
4201
4202         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4203         if (IS_ERR(pages)) {
4204                 ret = PTR_ERR(pages);
4205                 goto out_req;
4206         }
4207
4208         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4209         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4210                                          true);
4211
4212         ceph_osdc_start_request(osdc, req, false);
4213         ret = ceph_osdc_wait_request(osdc, req);
4214         if (ret >= 0)
4215                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4216
4217 out_req:
4218         ceph_osdc_put_request(req);
4219         return ret;
4220 }
4221
4222 /*
4223  * Read the complete header for the given rbd device.  On successful
4224  * return, the rbd_dev->header field will contain up-to-date
4225  * information about the image.
4226  */
4227 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4228 {
4229         struct rbd_image_header_ondisk *ondisk = NULL;
4230         u32 snap_count = 0;
4231         u64 names_size = 0;
4232         u32 want_count;
4233         int ret;
4234
4235         /*
4236          * The complete header will include an array of its 64-bit
4237          * snapshot ids, followed by the names of those snapshots as
4238          * a contiguous block of NUL-terminated strings.  Note that
4239          * the number of snapshots could change by the time we read
4240          * it in, in which case we re-read it.
4241          */
4242         do {
4243                 size_t size;
4244
4245                 kfree(ondisk);
4246
4247                 size = sizeof (*ondisk);
4248                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4249                 size += names_size;
4250                 ondisk = kmalloc(size, GFP_KERNEL);
4251                 if (!ondisk)
4252                         return -ENOMEM;
4253
4254                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4255                                         &rbd_dev->header_oloc, ondisk, size);
4256                 if (ret < 0)
4257                         goto out;
4258                 if ((size_t)ret < size) {
4259                         ret = -ENXIO;
4260                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4261                                 size, ret);
4262                         goto out;
4263                 }
4264                 if (!rbd_dev_ondisk_valid(ondisk)) {
4265                         ret = -ENXIO;
4266                         rbd_warn(rbd_dev, "invalid header");
4267                         goto out;
4268                 }
4269
4270                 names_size = le64_to_cpu(ondisk->snap_names_len);
4271                 want_count = snap_count;
4272                 snap_count = le32_to_cpu(ondisk->snap_count);
4273         } while (snap_count != want_count);
4274
4275         ret = rbd_header_from_disk(rbd_dev, ondisk);
4276 out:
4277         kfree(ondisk);
4278
4279         return ret;
4280 }
4281
4282 /*
4283  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4284  * has disappeared from the (just updated) snapshot context.
4285  */
4286 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4287 {
4288         u64 snap_id;
4289
4290         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4291                 return;
4292
4293         snap_id = rbd_dev->spec->snap_id;
4294         if (snap_id == CEPH_NOSNAP)
4295                 return;
4296
4297         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4298                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4299 }
4300
4301 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4302 {
4303         sector_t size;
4304
4305         /*
4306          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4307          * try to update its size.  If REMOVING is set, updating size
4308          * is just useless work since the device can't be opened.
4309          */
4310         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4311             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4312                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4313                 dout("setting size to %llu sectors", (unsigned long long)size);
4314                 set_capacity(rbd_dev->disk, size);
4315                 revalidate_disk(rbd_dev->disk);
4316         }
4317 }
4318
4319 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4320 {
4321         u64 mapping_size;
4322         int ret;
4323
4324         down_write(&rbd_dev->header_rwsem);
4325         mapping_size = rbd_dev->mapping.size;
4326
4327         ret = rbd_dev_header_info(rbd_dev);
4328         if (ret)
4329                 goto out;
4330
4331         /*
4332          * If there is a parent, see if it has disappeared due to the
4333          * mapped image getting flattened.
4334          */
4335         if (rbd_dev->parent) {
4336                 ret = rbd_dev_v2_parent_info(rbd_dev);
4337                 if (ret)
4338                         goto out;
4339         }
4340
4341         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4342                 rbd_dev->mapping.size = rbd_dev->header.image_size;
4343         } else {
4344                 /* validate mapped snapshot's EXISTS flag */
4345                 rbd_exists_validate(rbd_dev);
4346         }
4347
4348 out:
4349         up_write(&rbd_dev->header_rwsem);
4350         if (!ret && mapping_size != rbd_dev->mapping.size)
4351                 rbd_dev_update_size(rbd_dev);
4352
4353         return ret;
4354 }
4355
4356 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4357                 unsigned int hctx_idx, unsigned int numa_node)
4358 {
4359         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4360
4361         INIT_WORK(work, rbd_queue_workfn);
4362         return 0;
4363 }
4364
4365 static const struct blk_mq_ops rbd_mq_ops = {
4366         .queue_rq       = rbd_queue_rq,
4367         .init_request   = rbd_init_request,
4368 };
4369
4370 static int rbd_init_disk(struct rbd_device *rbd_dev)
4371 {
4372         struct gendisk *disk;
4373         struct request_queue *q;
4374         u64 segment_size;
4375         int err;
4376
4377         /* create gendisk info */
4378         disk = alloc_disk(single_major ?
4379                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4380                           RBD_MINORS_PER_MAJOR);
4381         if (!disk)
4382                 return -ENOMEM;
4383
4384         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4385                  rbd_dev->dev_id);
4386         disk->major = rbd_dev->major;
4387         disk->first_minor = rbd_dev->minor;
4388         if (single_major)
4389                 disk->flags |= GENHD_FL_EXT_DEVT;
4390         disk->fops = &rbd_bd_ops;
4391         disk->private_data = rbd_dev;
4392
4393         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4394         rbd_dev->tag_set.ops = &rbd_mq_ops;
4395         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4396         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4397         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4398         rbd_dev->tag_set.nr_hw_queues = 1;
4399         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4400
4401         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4402         if (err)
4403                 goto out_disk;
4404
4405         q = blk_mq_init_queue(&rbd_dev->tag_set);
4406         if (IS_ERR(q)) {
4407                 err = PTR_ERR(q);
4408                 goto out_tag_set;
4409         }
4410
4411         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4412         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4413
4414         /* set io sizes to object size */
4415         segment_size = rbd_obj_bytes(&rbd_dev->header);
4416         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4417         q->limits.max_sectors = queue_max_hw_sectors(q);
4418         blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
4419         blk_queue_max_segment_size(q, segment_size);
4420         blk_queue_io_min(q, segment_size);
4421         blk_queue_io_opt(q, segment_size);
4422
4423         /* enable the discard support */
4424         queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4425         q->limits.discard_granularity = segment_size;
4426         q->limits.discard_alignment = segment_size;
4427         blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4428         blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
4429
4430         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4431                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4432
4433         /*
4434          * disk_release() expects a queue ref from add_disk() and will
4435          * put it.  Hold an extra ref until add_disk() is called.
4436          */
4437         WARN_ON(!blk_get_queue(q));
4438         disk->queue = q;
4439         q->queuedata = rbd_dev;
4440
4441         rbd_dev->disk = disk;
4442
4443         return 0;
4444 out_tag_set:
4445         blk_mq_free_tag_set(&rbd_dev->tag_set);
4446 out_disk:
4447         put_disk(disk);
4448         return err;
4449 }
4450
4451 /*
4452   sysfs
4453 */
4454
4455 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4456 {
4457         return container_of(dev, struct rbd_device, dev);
4458 }
4459
4460 static ssize_t rbd_size_show(struct device *dev,
4461                              struct device_attribute *attr, char *buf)
4462 {
4463         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4464
4465         return sprintf(buf, "%llu\n",
4466                 (unsigned long long)rbd_dev->mapping.size);
4467 }
4468
4469 /*
4470  * Note this shows the features for whatever's mapped, which is not
4471  * necessarily the base image.
4472  */
4473 static ssize_t rbd_features_show(struct device *dev,
4474                              struct device_attribute *attr, char *buf)
4475 {
4476         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4477
4478         return sprintf(buf, "0x%016llx\n",
4479                         (unsigned long long)rbd_dev->mapping.features);
4480 }
4481
4482 static ssize_t rbd_major_show(struct device *dev,
4483                               struct device_attribute *attr, char *buf)
4484 {
4485         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4486
4487         if (rbd_dev->major)
4488                 return sprintf(buf, "%d\n", rbd_dev->major);
4489
4490         return sprintf(buf, "(none)\n");
4491 }
4492
4493 static ssize_t rbd_minor_show(struct device *dev,
4494                               struct device_attribute *attr, char *buf)
4495 {
4496         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4497
4498         return sprintf(buf, "%d\n", rbd_dev->minor);
4499 }
4500
4501 static ssize_t rbd_client_addr_show(struct device *dev,
4502                                     struct device_attribute *attr, char *buf)
4503 {
4504         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4505         struct ceph_entity_addr *client_addr =
4506             ceph_client_addr(rbd_dev->rbd_client->client);
4507
4508         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4509                        le32_to_cpu(client_addr->nonce));
4510 }
4511
4512 static ssize_t rbd_client_id_show(struct device *dev,
4513                                   struct device_attribute *attr, char *buf)
4514 {
4515         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4516
4517         return sprintf(buf, "client%lld\n",
4518                        ceph_client_gid(rbd_dev->rbd_client->client));
4519 }
4520
4521 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4522                                      struct device_attribute *attr, char *buf)
4523 {
4524         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4525
4526         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4527 }
4528
4529 static ssize_t rbd_config_info_show(struct device *dev,
4530                                     struct device_attribute *attr, char *buf)
4531 {
4532         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4533
4534         return sprintf(buf, "%s\n", rbd_dev->config_info);
4535 }
4536
4537 static ssize_t rbd_pool_show(struct device *dev,
4538                              struct device_attribute *attr, char *buf)
4539 {
4540         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4541
4542         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4543 }
4544
4545 static ssize_t rbd_pool_id_show(struct device *dev,
4546                              struct device_attribute *attr, char *buf)
4547 {
4548         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4549
4550         return sprintf(buf, "%llu\n",
4551                         (unsigned long long) rbd_dev->spec->pool_id);
4552 }
4553
4554 static ssize_t rbd_name_show(struct device *dev,
4555                              struct device_attribute *attr, char *buf)
4556 {
4557         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4558
4559         if (rbd_dev->spec->image_name)
4560                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4561
4562         return sprintf(buf, "(unknown)\n");
4563 }
4564
4565 static ssize_t rbd_image_id_show(struct device *dev,
4566                              struct device_attribute *attr, char *buf)
4567 {
4568         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4569
4570         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4571 }
4572
4573 /*
4574  * Shows the name of the currently-mapped snapshot (or
4575  * RBD_SNAP_HEAD_NAME for the base image).
4576  */
4577 static ssize_t rbd_snap_show(struct device *dev,
4578                              struct device_attribute *attr,
4579                              char *buf)
4580 {
4581         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4582
4583         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4584 }
4585
4586 static ssize_t rbd_snap_id_show(struct device *dev,
4587                                 struct device_attribute *attr, char *buf)
4588 {
4589         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4590
4591         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4592 }
4593
4594 /*
4595  * For a v2 image, shows the chain of parent images, separated by empty
4596  * lines.  For v1 images or if there is no parent, shows "(no parent
4597  * image)".
4598  */
4599 static ssize_t rbd_parent_show(struct device *dev,
4600                                struct device_attribute *attr,
4601                                char *buf)
4602 {
4603         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4604         ssize_t count = 0;
4605
4606         if (!rbd_dev->parent)
4607                 return sprintf(buf, "(no parent image)\n");
4608
4609         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4610                 struct rbd_spec *spec = rbd_dev->parent_spec;
4611
4612                 count += sprintf(&buf[count], "%s"
4613                             "pool_id %llu\npool_name %s\n"
4614                             "image_id %s\nimage_name %s\n"
4615                             "snap_id %llu\nsnap_name %s\n"
4616                             "overlap %llu\n",
4617                             !count ? "" : "\n", /* first? */
4618                             spec->pool_id, spec->pool_name,
4619                             spec->image_id, spec->image_name ?: "(unknown)",
4620                             spec->snap_id, spec->snap_name,
4621                             rbd_dev->parent_overlap);
4622         }
4623
4624         return count;
4625 }
4626
4627 static ssize_t rbd_image_refresh(struct device *dev,
4628                                  struct device_attribute *attr,
4629                                  const char *buf,
4630                                  size_t size)
4631 {
4632         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4633         int ret;
4634
4635         ret = rbd_dev_refresh(rbd_dev);
4636         if (ret)
4637                 return ret;
4638
4639         return size;
4640 }
4641
4642 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4643 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4644 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4645 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4646 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4647 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4648 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4649 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4650 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4651 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4652 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4653 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4654 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4655 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4656 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4657 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4658
4659 static struct attribute *rbd_attrs[] = {
4660         &dev_attr_size.attr,
4661         &dev_attr_features.attr,
4662         &dev_attr_major.attr,
4663         &dev_attr_minor.attr,
4664         &dev_attr_client_addr.attr,
4665         &dev_attr_client_id.attr,
4666         &dev_attr_cluster_fsid.attr,
4667         &dev_attr_config_info.attr,
4668         &dev_attr_pool.attr,
4669         &dev_attr_pool_id.attr,
4670         &dev_attr_name.attr,
4671         &dev_attr_image_id.attr,
4672         &dev_attr_current_snap.attr,
4673         &dev_attr_snap_id.attr,
4674         &dev_attr_parent.attr,
4675         &dev_attr_refresh.attr,
4676         NULL
4677 };
4678
4679 static struct attribute_group rbd_attr_group = {
4680         .attrs = rbd_attrs,
4681 };
4682
4683 static const struct attribute_group *rbd_attr_groups[] = {
4684         &rbd_attr_group,
4685         NULL
4686 };
4687
4688 static void rbd_dev_release(struct device *dev);
4689
4690 static const struct device_type rbd_device_type = {
4691         .name           = "rbd",
4692         .groups         = rbd_attr_groups,
4693         .release        = rbd_dev_release,
4694 };
4695
4696 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4697 {
4698         kref_get(&spec->kref);
4699
4700         return spec;
4701 }
4702
4703 static void rbd_spec_free(struct kref *kref);
4704 static void rbd_spec_put(struct rbd_spec *spec)
4705 {
4706         if (spec)
4707                 kref_put(&spec->kref, rbd_spec_free);
4708 }
4709
4710 static struct rbd_spec *rbd_spec_alloc(void)
4711 {
4712         struct rbd_spec *spec;
4713
4714         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4715         if (!spec)
4716                 return NULL;
4717
4718         spec->pool_id = CEPH_NOPOOL;
4719         spec->snap_id = CEPH_NOSNAP;
4720         kref_init(&spec->kref);
4721
4722         return spec;
4723 }
4724
4725 static void rbd_spec_free(struct kref *kref)
4726 {
4727         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4728
4729         kfree(spec->pool_name);
4730         kfree(spec->image_id);
4731         kfree(spec->image_name);
4732         kfree(spec->snap_name);
4733         kfree(spec);
4734 }
4735
4736 static void rbd_dev_free(struct rbd_device *rbd_dev)
4737 {
4738         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4739         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4740
4741         ceph_oid_destroy(&rbd_dev->header_oid);
4742         ceph_oloc_destroy(&rbd_dev->header_oloc);
4743         kfree(rbd_dev->config_info);
4744
4745         rbd_put_client(rbd_dev->rbd_client);
4746         rbd_spec_put(rbd_dev->spec);
4747         kfree(rbd_dev->opts);
4748         kfree(rbd_dev);
4749 }
4750
4751 static void rbd_dev_release(struct device *dev)
4752 {
4753         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4754         bool need_put = !!rbd_dev->opts;
4755
4756         if (need_put) {
4757                 destroy_workqueue(rbd_dev->task_wq);
4758                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4759         }
4760
4761         rbd_dev_free(rbd_dev);
4762
4763         /*
4764          * This is racy, but way better than putting module outside of
4765          * the release callback.  The race window is pretty small, so
4766          * doing something similar to dm (dm-builtin.c) is overkill.
4767          */
4768         if (need_put)
4769                 module_put(THIS_MODULE);
4770 }
4771
4772 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4773                                            struct rbd_spec *spec)
4774 {
4775         struct rbd_device *rbd_dev;
4776
4777         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4778         if (!rbd_dev)
4779                 return NULL;
4780
4781         spin_lock_init(&rbd_dev->lock);
4782         INIT_LIST_HEAD(&rbd_dev->node);
4783         init_rwsem(&rbd_dev->header_rwsem);
4784
4785         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4786         ceph_oid_init(&rbd_dev->header_oid);
4787         rbd_dev->header_oloc.pool = spec->pool_id;
4788
4789         mutex_init(&rbd_dev->watch_mutex);
4790         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4791         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4792
4793         init_rwsem(&rbd_dev->lock_rwsem);
4794         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4795         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4796         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4797         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4798         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4799         init_waitqueue_head(&rbd_dev->lock_waitq);
4800
4801         rbd_dev->dev.bus = &rbd_bus_type;
4802         rbd_dev->dev.type = &rbd_device_type;
4803         rbd_dev->dev.parent = &rbd_root_dev;
4804         device_initialize(&rbd_dev->dev);
4805
4806         rbd_dev->rbd_client = rbdc;
4807         rbd_dev->spec = spec;
4808
4809         return rbd_dev;
4810 }
4811
4812 /*
4813  * Create a mapping rbd_dev.
4814  */
4815 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4816                                          struct rbd_spec *spec,
4817                                          struct rbd_options *opts)
4818 {
4819         struct rbd_device *rbd_dev;
4820
4821         rbd_dev = __rbd_dev_create(rbdc, spec);
4822         if (!rbd_dev)
4823                 return NULL;
4824
4825         rbd_dev->opts = opts;
4826
4827         /* get an id and fill in device name */
4828         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4829                                          minor_to_rbd_dev_id(1 << MINORBITS),
4830                                          GFP_KERNEL);
4831         if (rbd_dev->dev_id < 0)
4832                 goto fail_rbd_dev;
4833
4834         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4835         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4836                                                    rbd_dev->name);
4837         if (!rbd_dev->task_wq)
4838                 goto fail_dev_id;
4839
4840         /* we have a ref from do_rbd_add() */
4841         __module_get(THIS_MODULE);
4842
4843         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4844         return rbd_dev;
4845
4846 fail_dev_id:
4847         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4848 fail_rbd_dev:
4849         rbd_dev_free(rbd_dev);
4850         return NULL;
4851 }
4852
4853 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4854 {
4855         if (rbd_dev)
4856                 put_device(&rbd_dev->dev);
4857 }
4858
4859 /*
4860  * Get the size and object order for an image snapshot, or if
4861  * snap_id is CEPH_NOSNAP, gets this information for the base
4862  * image.
4863  */
4864 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4865                                 u8 *order, u64 *snap_size)
4866 {
4867         __le64 snapid = cpu_to_le64(snap_id);
4868         int ret;
4869         struct {
4870                 u8 order;
4871                 __le64 size;
4872         } __attribute__ ((packed)) size_buf = { 0 };
4873
4874         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4875                                   &rbd_dev->header_oloc, "get_size",
4876                                   &snapid, sizeof(snapid),
4877                                   &size_buf, sizeof(size_buf));
4878         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4879         if (ret < 0)
4880                 return ret;
4881         if (ret < sizeof (size_buf))
4882                 return -ERANGE;
4883
4884         if (order) {
4885                 *order = size_buf.order;
4886                 dout("  order %u", (unsigned int)*order);
4887         }
4888         *snap_size = le64_to_cpu(size_buf.size);
4889
4890         dout("  snap_id 0x%016llx snap_size = %llu\n",
4891                 (unsigned long long)snap_id,
4892                 (unsigned long long)*snap_size);
4893
4894         return 0;
4895 }
4896
4897 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4898 {
4899         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4900                                         &rbd_dev->header.obj_order,
4901                                         &rbd_dev->header.image_size);
4902 }
4903
4904 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4905 {
4906         void *reply_buf;
4907         int ret;
4908         void *p;
4909
4910         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4911         if (!reply_buf)
4912                 return -ENOMEM;
4913
4914         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4915                                   &rbd_dev->header_oloc, "get_object_prefix",
4916                                   NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4917         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4918         if (ret < 0)
4919                 goto out;
4920
4921         p = reply_buf;
4922         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4923                                                 p + ret, NULL, GFP_NOIO);
4924         ret = 0;
4925
4926         if (IS_ERR(rbd_dev->header.object_prefix)) {
4927                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4928                 rbd_dev->header.object_prefix = NULL;
4929         } else {
4930                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4931         }
4932 out:
4933         kfree(reply_buf);
4934
4935         return ret;
4936 }
4937
4938 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4939                 u64 *snap_features)
4940 {
4941         __le64 snapid = cpu_to_le64(snap_id);
4942         struct {
4943                 __le64 features;
4944                 __le64 incompat;
4945         } __attribute__ ((packed)) features_buf = { 0 };
4946         u64 unsup;
4947         int ret;
4948
4949         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4950                                   &rbd_dev->header_oloc, "get_features",
4951                                   &snapid, sizeof(snapid),
4952                                   &features_buf, sizeof(features_buf));
4953         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4954         if (ret < 0)
4955                 return ret;
4956         if (ret < sizeof (features_buf))
4957                 return -ERANGE;
4958
4959         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4960         if (unsup) {
4961                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4962                          unsup);
4963                 return -ENXIO;
4964         }
4965
4966         *snap_features = le64_to_cpu(features_buf.features);
4967
4968         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4969                 (unsigned long long)snap_id,
4970                 (unsigned long long)*snap_features,
4971                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4972
4973         return 0;
4974 }
4975
4976 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4977 {
4978         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4979                                                 &rbd_dev->header.features);
4980 }
4981
4982 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4983 {
4984         struct rbd_spec *parent_spec;
4985         size_t size;
4986         void *reply_buf = NULL;
4987         __le64 snapid;
4988         void *p;
4989         void *end;
4990         u64 pool_id;
4991         char *image_id;
4992         u64 snap_id;
4993         u64 overlap;
4994         int ret;
4995
4996         parent_spec = rbd_spec_alloc();
4997         if (!parent_spec)
4998                 return -ENOMEM;
4999
5000         size = sizeof (__le64) +                                /* pool_id */
5001                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
5002                 sizeof (__le64) +                               /* snap_id */
5003                 sizeof (__le64);                                /* overlap */
5004         reply_buf = kmalloc(size, GFP_KERNEL);
5005         if (!reply_buf) {
5006                 ret = -ENOMEM;
5007                 goto out_err;
5008         }
5009
5010         snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5011         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5012                                   &rbd_dev->header_oloc, "get_parent",
5013                                   &snapid, sizeof(snapid), reply_buf, size);
5014         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5015         if (ret < 0)
5016                 goto out_err;
5017
5018         p = reply_buf;
5019         end = reply_buf + ret;
5020         ret = -ERANGE;
5021         ceph_decode_64_safe(&p, end, pool_id, out_err);
5022         if (pool_id == CEPH_NOPOOL) {
5023                 /*
5024                  * Either the parent never existed, or we have
5025                  * record of it but the image got flattened so it no
5026                  * longer has a parent.  When the parent of a
5027                  * layered image disappears we immediately set the
5028                  * overlap to 0.  The effect of this is that all new
5029                  * requests will be treated as if the image had no
5030                  * parent.
5031                  */
5032                 if (rbd_dev->parent_overlap) {
5033                         rbd_dev->parent_overlap = 0;
5034                         rbd_dev_parent_put(rbd_dev);
5035                         pr_info("%s: clone image has been flattened\n",
5036                                 rbd_dev->disk->disk_name);
5037                 }
5038
5039                 goto out;       /* No parent?  No problem. */
5040         }
5041
5042         /* The ceph file layout needs to fit pool id in 32 bits */
5043
5044         ret = -EIO;
5045         if (pool_id > (u64)U32_MAX) {
5046                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5047                         (unsigned long long)pool_id, U32_MAX);
5048                 goto out_err;
5049         }
5050
5051         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5052         if (IS_ERR(image_id)) {
5053                 ret = PTR_ERR(image_id);
5054                 goto out_err;
5055         }
5056         ceph_decode_64_safe(&p, end, snap_id, out_err);
5057         ceph_decode_64_safe(&p, end, overlap, out_err);
5058
5059         /*
5060          * The parent won't change (except when the clone is
5061          * flattened, already handled that).  So we only need to
5062          * record the parent spec we have not already done so.
5063          */
5064         if (!rbd_dev->parent_spec) {
5065                 parent_spec->pool_id = pool_id;
5066                 parent_spec->image_id = image_id;
5067                 parent_spec->snap_id = snap_id;
5068                 rbd_dev->parent_spec = parent_spec;
5069                 parent_spec = NULL;     /* rbd_dev now owns this */
5070         } else {
5071                 kfree(image_id);
5072         }
5073
5074         /*
5075          * We always update the parent overlap.  If it's zero we issue
5076          * a warning, as we will proceed as if there was no parent.
5077          */
5078         if (!overlap) {
5079                 if (parent_spec) {
5080                         /* refresh, careful to warn just once */
5081                         if (rbd_dev->parent_overlap)
5082                                 rbd_warn(rbd_dev,
5083                                     "clone now standalone (overlap became 0)");
5084                 } else {
5085                         /* initial probe */
5086                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5087                 }
5088         }
5089         rbd_dev->parent_overlap = overlap;
5090
5091 out:
5092         ret = 0;
5093 out_err:
5094         kfree(reply_buf);
5095         rbd_spec_put(parent_spec);
5096
5097         return ret;
5098 }
5099
5100 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5101 {
5102         struct {
5103                 __le64 stripe_unit;
5104                 __le64 stripe_count;
5105         } __attribute__ ((packed)) striping_info_buf = { 0 };
5106         size_t size = sizeof (striping_info_buf);
5107         void *p;
5108         u64 obj_size;
5109         u64 stripe_unit;
5110         u64 stripe_count;
5111         int ret;
5112
5113         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5114                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
5115                                 NULL, 0, &striping_info_buf, size);
5116         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5117         if (ret < 0)
5118                 return ret;
5119         if (ret < size)
5120                 return -ERANGE;
5121
5122         /*
5123          * We don't actually support the "fancy striping" feature
5124          * (STRIPINGV2) yet, but if the striping sizes are the
5125          * defaults the behavior is the same as before.  So find
5126          * out, and only fail if the image has non-default values.
5127          */
5128         ret = -EINVAL;
5129         obj_size = rbd_obj_bytes(&rbd_dev->header);
5130         p = &striping_info_buf;
5131         stripe_unit = ceph_decode_64(&p);
5132         if (stripe_unit != obj_size) {
5133                 rbd_warn(rbd_dev, "unsupported stripe unit "
5134                                 "(got %llu want %llu)",
5135                                 stripe_unit, obj_size);
5136                 return -EINVAL;
5137         }
5138         stripe_count = ceph_decode_64(&p);
5139         if (stripe_count != 1) {
5140                 rbd_warn(rbd_dev, "unsupported stripe count "
5141                                 "(got %llu want 1)", stripe_count);
5142                 return -EINVAL;
5143         }
5144         rbd_dev->header.stripe_unit = stripe_unit;
5145         rbd_dev->header.stripe_count = stripe_count;
5146
5147         return 0;
5148 }
5149
5150 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5151 {
5152         __le64 data_pool_id;
5153         int ret;
5154
5155         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5156                                   &rbd_dev->header_oloc, "get_data_pool",
5157                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
5158         if (ret < 0)
5159                 return ret;
5160         if (ret < sizeof(data_pool_id))
5161                 return -EBADMSG;
5162
5163         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5164         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5165         return 0;
5166 }
5167
5168 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5169 {
5170         CEPH_DEFINE_OID_ONSTACK(oid);
5171         size_t image_id_size;
5172         char *image_id;
5173         void *p;
5174         void *end;
5175         size_t size;
5176         void *reply_buf = NULL;
5177         size_t len = 0;
5178         char *image_name = NULL;
5179         int ret;
5180
5181         rbd_assert(!rbd_dev->spec->image_name);
5182
5183         len = strlen(rbd_dev->spec->image_id);
5184         image_id_size = sizeof (__le32) + len;
5185         image_id = kmalloc(image_id_size, GFP_KERNEL);
5186         if (!image_id)
5187                 return NULL;
5188
5189         p = image_id;
5190         end = image_id + image_id_size;
5191         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5192
5193         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5194         reply_buf = kmalloc(size, GFP_KERNEL);
5195         if (!reply_buf)
5196                 goto out;
5197
5198         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5199         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5200                                   "dir_get_name", image_id, image_id_size,
5201                                   reply_buf, size);
5202         if (ret < 0)
5203                 goto out;
5204         p = reply_buf;
5205         end = reply_buf + ret;
5206
5207         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5208         if (IS_ERR(image_name))
5209                 image_name = NULL;
5210         else
5211                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5212 out:
5213         kfree(reply_buf);
5214         kfree(image_id);
5215
5216         return image_name;
5217 }
5218
5219 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5220 {
5221         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5222         const char *snap_name;
5223         u32 which = 0;
5224
5225         /* Skip over names until we find the one we are looking for */
5226
5227         snap_name = rbd_dev->header.snap_names;
5228         while (which < snapc->num_snaps) {
5229                 if (!strcmp(name, snap_name))
5230                         return snapc->snaps[which];
5231                 snap_name += strlen(snap_name) + 1;
5232                 which++;
5233         }
5234         return CEPH_NOSNAP;
5235 }
5236
5237 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5238 {
5239         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5240         u32 which;
5241         bool found = false;
5242         u64 snap_id;
5243
5244         for (which = 0; !found && which < snapc->num_snaps; which++) {
5245                 const char *snap_name;
5246
5247                 snap_id = snapc->snaps[which];
5248                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5249                 if (IS_ERR(snap_name)) {
5250                         /* ignore no-longer existing snapshots */
5251                         if (PTR_ERR(snap_name) == -ENOENT)
5252                                 continue;
5253                         else
5254                                 break;
5255                 }
5256                 found = !strcmp(name, snap_name);
5257                 kfree(snap_name);
5258         }
5259         return found ? snap_id : CEPH_NOSNAP;
5260 }
5261
5262 /*
5263  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5264  * no snapshot by that name is found, or if an error occurs.
5265  */
5266 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5267 {
5268         if (rbd_dev->image_format == 1)
5269                 return rbd_v1_snap_id_by_name(rbd_dev, name);
5270
5271         return rbd_v2_snap_id_by_name(rbd_dev, name);
5272 }
5273
5274 /*
5275  * An image being mapped will have everything but the snap id.
5276  */
5277 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5278 {
5279         struct rbd_spec *spec = rbd_dev->spec;
5280
5281         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5282         rbd_assert(spec->image_id && spec->image_name);
5283         rbd_assert(spec->snap_name);
5284
5285         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5286                 u64 snap_id;
5287
5288                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5289                 if (snap_id == CEPH_NOSNAP)
5290                         return -ENOENT;
5291
5292                 spec->snap_id = snap_id;
5293         } else {
5294                 spec->snap_id = CEPH_NOSNAP;
5295         }
5296
5297         return 0;
5298 }
5299
5300 /*
5301  * A parent image will have all ids but none of the names.
5302  *
5303  * All names in an rbd spec are dynamically allocated.  It's OK if we
5304  * can't figure out the name for an image id.
5305  */
5306 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5307 {
5308         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5309         struct rbd_spec *spec = rbd_dev->spec;
5310         const char *pool_name;
5311         const char *image_name;
5312         const char *snap_name;
5313         int ret;
5314
5315         rbd_assert(spec->pool_id != CEPH_NOPOOL);
5316         rbd_assert(spec->image_id);
5317         rbd_assert(spec->snap_id != CEPH_NOSNAP);
5318
5319         /* Get the pool name; we have to make our own copy of this */
5320
5321         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5322         if (!pool_name) {
5323                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5324                 return -EIO;
5325         }
5326         pool_name = kstrdup(pool_name, GFP_KERNEL);
5327         if (!pool_name)
5328                 return -ENOMEM;
5329
5330         /* Fetch the image name; tolerate failure here */
5331
5332         image_name = rbd_dev_image_name(rbd_dev);
5333         if (!image_name)
5334                 rbd_warn(rbd_dev, "unable to get image name");
5335
5336         /* Fetch the snapshot name */
5337
5338         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5339         if (IS_ERR(snap_name)) {
5340                 ret = PTR_ERR(snap_name);
5341                 goto out_err;
5342         }
5343
5344         spec->pool_name = pool_name;
5345         spec->image_name = image_name;
5346         spec->snap_name = snap_name;
5347
5348         return 0;
5349
5350 out_err:
5351         kfree(image_name);
5352         kfree(pool_name);
5353         return ret;
5354 }
5355
5356 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5357 {
5358         size_t size;
5359         int ret;
5360         void *reply_buf;
5361         void *p;
5362         void *end;
5363         u64 seq;
5364         u32 snap_count;
5365         struct ceph_snap_context *snapc;
5366         u32 i;
5367
5368         /*
5369          * We'll need room for the seq value (maximum snapshot id),
5370          * snapshot count, and array of that many snapshot ids.
5371          * For now we have a fixed upper limit on the number we're
5372          * prepared to receive.
5373          */
5374         size = sizeof (__le64) + sizeof (__le32) +
5375                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
5376         reply_buf = kzalloc(size, GFP_KERNEL);
5377         if (!reply_buf)
5378                 return -ENOMEM;
5379
5380         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5381                                   &rbd_dev->header_oloc, "get_snapcontext",
5382                                   NULL, 0, reply_buf, size);
5383         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5384         if (ret < 0)
5385                 goto out;
5386
5387         p = reply_buf;
5388         end = reply_buf + ret;
5389         ret = -ERANGE;
5390         ceph_decode_64_safe(&p, end, seq, out);
5391         ceph_decode_32_safe(&p, end, snap_count, out);
5392
5393         /*
5394          * Make sure the reported number of snapshot ids wouldn't go
5395          * beyond the end of our buffer.  But before checking that,
5396          * make sure the computed size of the snapshot context we
5397          * allocate is representable in a size_t.
5398          */
5399         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5400                                  / sizeof (u64)) {
5401                 ret = -EINVAL;
5402                 goto out;
5403         }
5404         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5405                 goto out;
5406         ret = 0;
5407
5408         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5409         if (!snapc) {
5410                 ret = -ENOMEM;
5411                 goto out;
5412         }
5413         snapc->seq = seq;
5414         for (i = 0; i < snap_count; i++)
5415                 snapc->snaps[i] = ceph_decode_64(&p);
5416
5417         ceph_put_snap_context(rbd_dev->header.snapc);
5418         rbd_dev->header.snapc = snapc;
5419
5420         dout("  snap context seq = %llu, snap_count = %u\n",
5421                 (unsigned long long)seq, (unsigned int)snap_count);
5422 out:
5423         kfree(reply_buf);
5424
5425         return ret;
5426 }
5427
5428 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5429                                         u64 snap_id)
5430 {
5431         size_t size;
5432         void *reply_buf;
5433         __le64 snapid;
5434         int ret;
5435         void *p;
5436         void *end;
5437         char *snap_name;
5438
5439         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5440         reply_buf = kmalloc(size, GFP_KERNEL);
5441         if (!reply_buf)
5442                 return ERR_PTR(-ENOMEM);
5443
5444         snapid = cpu_to_le64(snap_id);
5445         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5446                                   &rbd_dev->header_oloc, "get_snapshot_name",
5447                                   &snapid, sizeof(snapid), reply_buf, size);
5448         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5449         if (ret < 0) {
5450                 snap_name = ERR_PTR(ret);
5451                 goto out;
5452         }
5453
5454         p = reply_buf;
5455         end = reply_buf + ret;
5456         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5457         if (IS_ERR(snap_name))
5458                 goto out;
5459
5460         dout("  snap_id 0x%016llx snap_name = %s\n",
5461                 (unsigned long long)snap_id, snap_name);
5462 out:
5463         kfree(reply_buf);
5464
5465         return snap_name;
5466 }
5467
5468 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5469 {
5470         bool first_time = rbd_dev->header.object_prefix == NULL;
5471         int ret;
5472
5473         ret = rbd_dev_v2_image_size(rbd_dev);
5474         if (ret)
5475                 return ret;
5476
5477         if (first_time) {
5478                 ret = rbd_dev_v2_header_onetime(rbd_dev);
5479                 if (ret)
5480                         return ret;
5481         }
5482
5483         ret = rbd_dev_v2_snap_context(rbd_dev);
5484         if (ret && first_time) {
5485                 kfree(rbd_dev->header.object_prefix);
5486                 rbd_dev->header.object_prefix = NULL;
5487         }
5488
5489         return ret;
5490 }
5491
5492 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5493 {
5494         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5495
5496         if (rbd_dev->image_format == 1)
5497                 return rbd_dev_v1_header_info(rbd_dev);
5498
5499         return rbd_dev_v2_header_info(rbd_dev);
5500 }
5501
5502 /*
5503  * Skips over white space at *buf, and updates *buf to point to the
5504  * first found non-space character (if any). Returns the length of
5505  * the token (string of non-white space characters) found.  Note
5506  * that *buf must be terminated with '\0'.
5507  */
5508 static inline size_t next_token(const char **buf)
5509 {
5510         /*
5511         * These are the characters that produce nonzero for
5512         * isspace() in the "C" and "POSIX" locales.
5513         */
5514         const char *spaces = " \f\n\r\t\v";
5515
5516         *buf += strspn(*buf, spaces);   /* Find start of token */
5517
5518         return strcspn(*buf, spaces);   /* Return token length */
5519 }
5520
5521 /*
5522  * Finds the next token in *buf, dynamically allocates a buffer big
5523  * enough to hold a copy of it, and copies the token into the new
5524  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5525  * that a duplicate buffer is created even for a zero-length token.
5526  *
5527  * Returns a pointer to the newly-allocated duplicate, or a null
5528  * pointer if memory for the duplicate was not available.  If
5529  * the lenp argument is a non-null pointer, the length of the token
5530  * (not including the '\0') is returned in *lenp.
5531  *
5532  * If successful, the *buf pointer will be updated to point beyond
5533  * the end of the found token.
5534  *
5535  * Note: uses GFP_KERNEL for allocation.
5536  */
5537 static inline char *dup_token(const char **buf, size_t *lenp)
5538 {
5539         char *dup;
5540         size_t len;
5541
5542         len = next_token(buf);
5543         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5544         if (!dup)
5545                 return NULL;
5546         *(dup + len) = '\0';
5547         *buf += len;
5548
5549         if (lenp)
5550                 *lenp = len;
5551
5552         return dup;
5553 }
5554
5555 /*
5556  * Parse the options provided for an "rbd add" (i.e., rbd image
5557  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5558  * and the data written is passed here via a NUL-terminated buffer.
5559  * Returns 0 if successful or an error code otherwise.
5560  *
5561  * The information extracted from these options is recorded in
5562  * the other parameters which return dynamically-allocated
5563  * structures:
5564  *  ceph_opts
5565  *      The address of a pointer that will refer to a ceph options
5566  *      structure.  Caller must release the returned pointer using
5567  *      ceph_destroy_options() when it is no longer needed.
5568  *  rbd_opts
5569  *      Address of an rbd options pointer.  Fully initialized by
5570  *      this function; caller must release with kfree().
5571  *  spec
5572  *      Address of an rbd image specification pointer.  Fully
5573  *      initialized by this function based on parsed options.
5574  *      Caller must release with rbd_spec_put().
5575  *
5576  * The options passed take this form:
5577  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5578  * where:
5579  *  <mon_addrs>
5580  *      A comma-separated list of one or more monitor addresses.
5581  *      A monitor address is an ip address, optionally followed
5582  *      by a port number (separated by a colon).
5583  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5584  *  <options>
5585  *      A comma-separated list of ceph and/or rbd options.
5586  *  <pool_name>
5587  *      The name of the rados pool containing the rbd image.
5588  *  <image_name>
5589  *      The name of the image in that pool to map.
5590  *  <snap_id>
5591  *      An optional snapshot id.  If provided, the mapping will
5592  *      present data from the image at the time that snapshot was
5593  *      created.  The image head is used if no snapshot id is
5594  *      provided.  Snapshot mappings are always read-only.
5595  */
5596 static int rbd_add_parse_args(const char *buf,
5597                                 struct ceph_options **ceph_opts,
5598                                 struct rbd_options **opts,
5599                                 struct rbd_spec **rbd_spec)
5600 {
5601         size_t len;
5602         char *options;
5603         const char *mon_addrs;
5604         char *snap_name;
5605         size_t mon_addrs_size;
5606         struct rbd_spec *spec = NULL;
5607         struct rbd_options *rbd_opts = NULL;
5608         struct ceph_options *copts;
5609         int ret;
5610
5611         /* The first four tokens are required */
5612
5613         len = next_token(&buf);
5614         if (!len) {
5615                 rbd_warn(NULL, "no monitor address(es) provided");
5616                 return -EINVAL;
5617         }
5618         mon_addrs = buf;
5619         mon_addrs_size = len + 1;
5620         buf += len;
5621
5622         ret = -EINVAL;
5623         options = dup_token(&buf, NULL);
5624         if (!options)
5625                 return -ENOMEM;
5626         if (!*options) {
5627                 rbd_warn(NULL, "no options provided");
5628                 goto out_err;
5629         }
5630
5631         spec = rbd_spec_alloc();
5632         if (!spec)
5633                 goto out_mem;
5634
5635         spec->pool_name = dup_token(&buf, NULL);
5636         if (!spec->pool_name)
5637                 goto out_mem;
5638         if (!*spec->pool_name) {
5639                 rbd_warn(NULL, "no pool name provided");
5640                 goto out_err;
5641         }
5642
5643         spec->image_name = dup_token(&buf, NULL);
5644         if (!spec->image_name)
5645                 goto out_mem;
5646         if (!*spec->image_name) {
5647                 rbd_warn(NULL, "no image name provided");
5648                 goto out_err;
5649         }
5650
5651         /*
5652          * Snapshot name is optional; default is to use "-"
5653          * (indicating the head/no snapshot).
5654          */
5655         len = next_token(&buf);
5656         if (!len) {
5657                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5658                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5659         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5660                 ret = -ENAMETOOLONG;
5661                 goto out_err;
5662         }
5663         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5664         if (!snap_name)
5665                 goto out_mem;
5666         *(snap_name + len) = '\0';
5667         spec->snap_name = snap_name;
5668
5669         /* Initialize all rbd options to the defaults */
5670
5671         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5672         if (!rbd_opts)
5673                 goto out_mem;
5674
5675         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5676         rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5677         rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5678         rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5679
5680         copts = ceph_parse_options(options, mon_addrs,
5681                                         mon_addrs + mon_addrs_size - 1,
5682                                         parse_rbd_opts_token, rbd_opts);
5683         if (IS_ERR(copts)) {
5684                 ret = PTR_ERR(copts);
5685                 goto out_err;
5686         }
5687         kfree(options);
5688
5689         *ceph_opts = copts;
5690         *opts = rbd_opts;
5691         *rbd_spec = spec;
5692
5693         return 0;
5694 out_mem:
5695         ret = -ENOMEM;
5696 out_err:
5697         kfree(rbd_opts);
5698         rbd_spec_put(spec);
5699         kfree(options);
5700
5701         return ret;
5702 }
5703
5704 /*
5705  * Return pool id (>= 0) or a negative error code.
5706  */
5707 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5708 {
5709         struct ceph_options *opts = rbdc->client->options;
5710         u64 newest_epoch;
5711         int tries = 0;
5712         int ret;
5713
5714 again:
5715         ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5716         if (ret == -ENOENT && tries++ < 1) {
5717                 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5718                                             &newest_epoch);
5719                 if (ret < 0)
5720                         return ret;
5721
5722                 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5723                         ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5724                         (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5725                                                      newest_epoch,
5726                                                      opts->mount_timeout);
5727                         goto again;
5728                 } else {
5729                         /* the osdmap we have is new enough */
5730                         return -ENOENT;
5731                 }
5732         }
5733
5734         return ret;
5735 }
5736
5737 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5738 {
5739         down_write(&rbd_dev->lock_rwsem);
5740         if (__rbd_is_lock_owner(rbd_dev))
5741                 rbd_unlock(rbd_dev);
5742         up_write(&rbd_dev->lock_rwsem);
5743 }
5744
5745 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5746 {
5747         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5748                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5749                 return -EINVAL;
5750         }
5751
5752         /* FIXME: "rbd map --exclusive" should be in interruptible */
5753         down_read(&rbd_dev->lock_rwsem);
5754         rbd_wait_state_locked(rbd_dev);
5755         up_read(&rbd_dev->lock_rwsem);
5756         if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5757                 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5758                 return -EROFS;
5759         }
5760
5761         return 0;
5762 }
5763
5764 /*
5765  * An rbd format 2 image has a unique identifier, distinct from the
5766  * name given to it by the user.  Internally, that identifier is
5767  * what's used to specify the names of objects related to the image.
5768  *
5769  * A special "rbd id" object is used to map an rbd image name to its
5770  * id.  If that object doesn't exist, then there is no v2 rbd image
5771  * with the supplied name.
5772  *
5773  * This function will record the given rbd_dev's image_id field if
5774  * it can be determined, and in that case will return 0.  If any
5775  * errors occur a negative errno will be returned and the rbd_dev's
5776  * image_id field will be unchanged (and should be NULL).
5777  */
5778 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5779 {
5780         int ret;
5781         size_t size;
5782         CEPH_DEFINE_OID_ONSTACK(oid);
5783         void *response;
5784         char *image_id;
5785
5786         /*
5787          * When probing a parent image, the image id is already
5788          * known (and the image name likely is not).  There's no
5789          * need to fetch the image id again in this case.  We
5790          * do still need to set the image format though.
5791          */
5792         if (rbd_dev->spec->image_id) {
5793                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5794
5795                 return 0;
5796         }
5797
5798         /*
5799          * First, see if the format 2 image id file exists, and if
5800          * so, get the image's persistent id from it.
5801          */
5802         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5803                                rbd_dev->spec->image_name);
5804         if (ret)
5805                 return ret;
5806
5807         dout("rbd id object name is %s\n", oid.name);
5808
5809         /* Response will be an encoded string, which includes a length */
5810
5811         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5812         response = kzalloc(size, GFP_NOIO);
5813         if (!response) {
5814                 ret = -ENOMEM;
5815                 goto out;
5816         }
5817
5818         /* If it doesn't exist we'll assume it's a format 1 image */
5819
5820         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5821                                   "get_id", NULL, 0,
5822                                   response, RBD_IMAGE_ID_LEN_MAX);
5823         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5824         if (ret == -ENOENT) {
5825                 image_id = kstrdup("", GFP_KERNEL);
5826                 ret = image_id ? 0 : -ENOMEM;
5827                 if (!ret)
5828                         rbd_dev->image_format = 1;
5829         } else if (ret >= 0) {
5830                 void *p = response;
5831
5832                 image_id = ceph_extract_encoded_string(&p, p + ret,
5833                                                 NULL, GFP_NOIO);
5834                 ret = PTR_ERR_OR_ZERO(image_id);
5835                 if (!ret)
5836                         rbd_dev->image_format = 2;
5837         }
5838
5839         if (!ret) {
5840                 rbd_dev->spec->image_id = image_id;
5841                 dout("image_id is %s\n", image_id);
5842         }
5843 out:
5844         kfree(response);
5845         ceph_oid_destroy(&oid);
5846         return ret;
5847 }
5848
5849 /*
5850  * Undo whatever state changes are made by v1 or v2 header info
5851  * call.
5852  */
5853 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5854 {
5855         struct rbd_image_header *header;
5856
5857         rbd_dev_parent_put(rbd_dev);
5858
5859         /* Free dynamic fields from the header, then zero it out */
5860
5861         header = &rbd_dev->header;
5862         ceph_put_snap_context(header->snapc);
5863         kfree(header->snap_sizes);
5864         kfree(header->snap_names);
5865         kfree(header->object_prefix);
5866         memset(header, 0, sizeof (*header));
5867 }
5868
5869 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5870 {
5871         int ret;
5872
5873         ret = rbd_dev_v2_object_prefix(rbd_dev);
5874         if (ret)
5875                 goto out_err;
5876
5877         /*
5878          * Get the and check features for the image.  Currently the
5879          * features are assumed to never change.
5880          */
5881         ret = rbd_dev_v2_features(rbd_dev);
5882         if (ret)
5883                 goto out_err;
5884
5885         /* If the image supports fancy striping, get its parameters */
5886
5887         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5888                 ret = rbd_dev_v2_striping_info(rbd_dev);
5889                 if (ret < 0)
5890                         goto out_err;
5891         }
5892
5893         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5894                 ret = rbd_dev_v2_data_pool(rbd_dev);
5895                 if (ret)
5896                         goto out_err;
5897         }
5898
5899         rbd_init_layout(rbd_dev);
5900         return 0;
5901
5902 out_err:
5903         rbd_dev->header.features = 0;
5904         kfree(rbd_dev->header.object_prefix);
5905         rbd_dev->header.object_prefix = NULL;
5906         return ret;
5907 }
5908
5909 /*
5910  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5911  * rbd_dev_image_probe() recursion depth, which means it's also the
5912  * length of the already discovered part of the parent chain.
5913  */
5914 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5915 {
5916         struct rbd_device *parent = NULL;
5917         int ret;
5918
5919         if (!rbd_dev->parent_spec)
5920                 return 0;
5921
5922         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5923                 pr_info("parent chain is too long (%d)\n", depth);
5924                 ret = -EINVAL;
5925                 goto out_err;
5926         }
5927
5928         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5929         if (!parent) {
5930                 ret = -ENOMEM;
5931                 goto out_err;
5932         }
5933
5934         /*
5935          * Images related by parent/child relationships always share
5936          * rbd_client and spec/parent_spec, so bump their refcounts.
5937          */
5938         __rbd_get_client(rbd_dev->rbd_client);
5939         rbd_spec_get(rbd_dev->parent_spec);
5940
5941         ret = rbd_dev_image_probe(parent, depth);
5942         if (ret < 0)
5943                 goto out_err;
5944
5945         rbd_dev->parent = parent;
5946         atomic_set(&rbd_dev->parent_ref, 1);
5947         return 0;
5948
5949 out_err:
5950         rbd_dev_unparent(rbd_dev);
5951         rbd_dev_destroy(parent);
5952         return ret;
5953 }
5954
5955 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5956 {
5957         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5958         rbd_dev_mapping_clear(rbd_dev);
5959         rbd_free_disk(rbd_dev);
5960         if (!single_major)
5961                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5962 }
5963
5964 /*
5965  * rbd_dev->header_rwsem must be locked for write and will be unlocked
5966  * upon return.
5967  */
5968 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5969 {
5970         int ret;
5971
5972         /* Record our major and minor device numbers. */
5973
5974         if (!single_major) {
5975                 ret = register_blkdev(0, rbd_dev->name);
5976                 if (ret < 0)
5977                         goto err_out_unlock;
5978
5979                 rbd_dev->major = ret;
5980                 rbd_dev->minor = 0;
5981         } else {
5982                 rbd_dev->major = rbd_major;
5983                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5984         }
5985
5986         /* Set up the blkdev mapping. */
5987
5988         ret = rbd_init_disk(rbd_dev);
5989         if (ret)
5990                 goto err_out_blkdev;
5991
5992         ret = rbd_dev_mapping_set(rbd_dev);
5993         if (ret)
5994                 goto err_out_disk;
5995
5996         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5997         set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5998
5999         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6000         if (ret)
6001                 goto err_out_mapping;
6002
6003         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6004         up_write(&rbd_dev->header_rwsem);
6005         return 0;
6006
6007 err_out_mapping:
6008         rbd_dev_mapping_clear(rbd_dev);
6009 err_out_disk:
6010         rbd_free_disk(rbd_dev);
6011 err_out_blkdev:
6012         if (!single_major)
6013                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6014 err_out_unlock:
6015         up_write(&rbd_dev->header_rwsem);
6016         return ret;
6017 }
6018
6019 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6020 {
6021         struct rbd_spec *spec = rbd_dev->spec;
6022         int ret;
6023
6024         /* Record the header object name for this rbd image. */
6025
6026         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6027         if (rbd_dev->image_format == 1)
6028                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6029                                        spec->image_name, RBD_SUFFIX);
6030         else
6031                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6032                                        RBD_HEADER_PREFIX, spec->image_id);
6033
6034         return ret;
6035 }
6036
6037 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6038 {
6039         rbd_dev_unprobe(rbd_dev);
6040         if (rbd_dev->opts)
6041                 rbd_unregister_watch(rbd_dev);
6042         rbd_dev->image_format = 0;
6043         kfree(rbd_dev->spec->image_id);
6044         rbd_dev->spec->image_id = NULL;
6045 }
6046
6047 /*
6048  * Probe for the existence of the header object for the given rbd
6049  * device.  If this image is the one being mapped (i.e., not a
6050  * parent), initiate a watch on its header object before using that
6051  * object to get detailed information about the rbd image.
6052  */
6053 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6054 {
6055         int ret;
6056
6057         /*
6058          * Get the id from the image id object.  Unless there's an
6059          * error, rbd_dev->spec->image_id will be filled in with
6060          * a dynamically-allocated string, and rbd_dev->image_format
6061          * will be set to either 1 or 2.
6062          */
6063         ret = rbd_dev_image_id(rbd_dev);
6064         if (ret)
6065                 return ret;
6066
6067         ret = rbd_dev_header_name(rbd_dev);
6068         if (ret)
6069                 goto err_out_format;
6070
6071         if (!depth) {
6072                 ret = rbd_register_watch(rbd_dev);
6073                 if (ret) {
6074                         if (ret == -ENOENT)
6075                                 pr_info("image %s/%s does not exist\n",
6076                                         rbd_dev->spec->pool_name,
6077                                         rbd_dev->spec->image_name);
6078                         goto err_out_format;
6079                 }
6080         }
6081
6082         ret = rbd_dev_header_info(rbd_dev);
6083         if (ret)
6084                 goto err_out_watch;
6085
6086         /*
6087          * If this image is the one being mapped, we have pool name and
6088          * id, image name and id, and snap name - need to fill snap id.
6089          * Otherwise this is a parent image, identified by pool, image
6090          * and snap ids - need to fill in names for those ids.
6091          */
6092         if (!depth)
6093                 ret = rbd_spec_fill_snap_id(rbd_dev);
6094         else
6095                 ret = rbd_spec_fill_names(rbd_dev);
6096         if (ret) {
6097                 if (ret == -ENOENT)
6098                         pr_info("snap %s/%s@%s does not exist\n",
6099                                 rbd_dev->spec->pool_name,
6100                                 rbd_dev->spec->image_name,
6101                                 rbd_dev->spec->snap_name);
6102                 goto err_out_probe;
6103         }
6104
6105         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6106                 ret = rbd_dev_v2_parent_info(rbd_dev);
6107                 if (ret)
6108                         goto err_out_probe;
6109
6110                 /*
6111                  * Need to warn users if this image is the one being
6112                  * mapped and has a parent.
6113                  */
6114                 if (!depth && rbd_dev->parent_spec)
6115                         rbd_warn(rbd_dev,
6116                                  "WARNING: kernel layering is EXPERIMENTAL!");
6117         }
6118
6119         ret = rbd_dev_probe_parent(rbd_dev, depth);
6120         if (ret)
6121                 goto err_out_probe;
6122
6123         dout("discovered format %u image, header name is %s\n",
6124                 rbd_dev->image_format, rbd_dev->header_oid.name);
6125         return 0;
6126
6127 err_out_probe:
6128         rbd_dev_unprobe(rbd_dev);
6129 err_out_watch:
6130         if (!depth)
6131                 rbd_unregister_watch(rbd_dev);
6132 err_out_format:
6133         rbd_dev->image_format = 0;
6134         kfree(rbd_dev->spec->image_id);
6135         rbd_dev->spec->image_id = NULL;
6136         return ret;
6137 }
6138
6139 static ssize_t do_rbd_add(struct bus_type *bus,
6140                           const char *buf,
6141                           size_t count)
6142 {
6143         struct rbd_device *rbd_dev = NULL;
6144         struct ceph_options *ceph_opts = NULL;
6145         struct rbd_options *rbd_opts = NULL;
6146         struct rbd_spec *spec = NULL;
6147         struct rbd_client *rbdc;
6148         bool read_only;
6149         int rc;
6150
6151         if (!try_module_get(THIS_MODULE))
6152                 return -ENODEV;
6153
6154         /* parse add command */
6155         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6156         if (rc < 0)
6157                 goto out;
6158
6159         rbdc = rbd_get_client(ceph_opts);
6160         if (IS_ERR(rbdc)) {
6161                 rc = PTR_ERR(rbdc);
6162                 goto err_out_args;
6163         }
6164
6165         /* pick the pool */
6166         rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6167         if (rc < 0) {
6168                 if (rc == -ENOENT)
6169                         pr_info("pool %s does not exist\n", spec->pool_name);
6170                 goto err_out_client;
6171         }
6172         spec->pool_id = (u64)rc;
6173
6174         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6175         if (!rbd_dev) {
6176                 rc = -ENOMEM;
6177                 goto err_out_client;
6178         }
6179         rbdc = NULL;            /* rbd_dev now owns this */
6180         spec = NULL;            /* rbd_dev now owns this */
6181         rbd_opts = NULL;        /* rbd_dev now owns this */
6182
6183         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6184         if (!rbd_dev->config_info) {
6185                 rc = -ENOMEM;
6186                 goto err_out_rbd_dev;
6187         }
6188
6189         down_write(&rbd_dev->header_rwsem);
6190         rc = rbd_dev_image_probe(rbd_dev, 0);
6191         if (rc < 0) {
6192                 up_write(&rbd_dev->header_rwsem);
6193                 goto err_out_rbd_dev;
6194         }
6195
6196         /* If we are mapping a snapshot it must be marked read-only */
6197
6198         read_only = rbd_dev->opts->read_only;
6199         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6200                 read_only = true;
6201         rbd_dev->mapping.read_only = read_only;
6202
6203         rc = rbd_dev_device_setup(rbd_dev);
6204         if (rc)
6205                 goto err_out_image_probe;
6206
6207         if (rbd_dev->opts->exclusive) {
6208                 rc = rbd_add_acquire_lock(rbd_dev);
6209                 if (rc)
6210                         goto err_out_device_setup;
6211         }
6212
6213         /* Everything's ready.  Announce the disk to the world. */
6214
6215         rc = device_add(&rbd_dev->dev);
6216         if (rc)
6217                 goto err_out_image_lock;
6218
6219         add_disk(rbd_dev->disk);
6220         /* see rbd_init_disk() */
6221         blk_put_queue(rbd_dev->disk->queue);
6222
6223         spin_lock(&rbd_dev_list_lock);
6224         list_add_tail(&rbd_dev->node, &rbd_dev_list);
6225         spin_unlock(&rbd_dev_list_lock);
6226
6227         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6228                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6229                 rbd_dev->header.features);
6230         rc = count;
6231 out:
6232         module_put(THIS_MODULE);
6233         return rc;
6234
6235 err_out_image_lock:
6236         rbd_dev_image_unlock(rbd_dev);
6237 err_out_device_setup:
6238         rbd_dev_device_release(rbd_dev);
6239 err_out_image_probe:
6240         rbd_dev_image_release(rbd_dev);
6241 err_out_rbd_dev:
6242         rbd_dev_destroy(rbd_dev);
6243 err_out_client:
6244         rbd_put_client(rbdc);
6245 err_out_args:
6246         rbd_spec_put(spec);
6247         kfree(rbd_opts);
6248         goto out;
6249 }
6250
6251 static ssize_t rbd_add(struct bus_type *bus,
6252                        const char *buf,
6253                        size_t count)
6254 {
6255         if (single_major)
6256                 return -EINVAL;
6257
6258         return do_rbd_add(bus, buf, count);
6259 }
6260
6261 static ssize_t rbd_add_single_major(struct bus_type *bus,
6262                                     const char *buf,
6263                                     size_t count)
6264 {
6265         return do_rbd_add(bus, buf, count);
6266 }
6267
6268 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6269 {
6270         while (rbd_dev->parent) {
6271                 struct rbd_device *first = rbd_dev;
6272                 struct rbd_device *second = first->parent;
6273                 struct rbd_device *third;
6274
6275                 /*
6276                  * Follow to the parent with no grandparent and
6277                  * remove it.
6278                  */
6279                 while (second && (third = second->parent)) {
6280                         first = second;
6281                         second = third;
6282                 }
6283                 rbd_assert(second);
6284                 rbd_dev_image_release(second);
6285                 rbd_dev_destroy(second);
6286                 first->parent = NULL;
6287                 first->parent_overlap = 0;
6288
6289                 rbd_assert(first->parent_spec);
6290                 rbd_spec_put(first->parent_spec);
6291                 first->parent_spec = NULL;
6292         }
6293 }
6294
6295 static ssize_t do_rbd_remove(struct bus_type *bus,
6296                              const char *buf,
6297                              size_t count)
6298 {
6299         struct rbd_device *rbd_dev = NULL;
6300         struct list_head *tmp;
6301         int dev_id;
6302         char opt_buf[6];
6303         bool already = false;
6304         bool force = false;
6305         int ret;
6306
6307         dev_id = -1;
6308         opt_buf[0] = '\0';
6309         sscanf(buf, "%d %5s", &dev_id, opt_buf);
6310         if (dev_id < 0) {
6311                 pr_err("dev_id out of range\n");
6312                 return -EINVAL;
6313         }
6314         if (opt_buf[0] != '\0') {
6315                 if (!strcmp(opt_buf, "force")) {
6316                         force = true;
6317                 } else {
6318                         pr_err("bad remove option at '%s'\n", opt_buf);
6319                         return -EINVAL;
6320                 }
6321         }
6322
6323         ret = -ENOENT;
6324         spin_lock(&rbd_dev_list_lock);
6325         list_for_each(tmp, &rbd_dev_list) {
6326                 rbd_dev = list_entry(tmp, struct rbd_device, node);
6327                 if (rbd_dev->dev_id == dev_id) {
6328                         ret = 0;
6329                         break;
6330                 }
6331         }
6332         if (!ret) {
6333                 spin_lock_irq(&rbd_dev->lock);
6334                 if (rbd_dev->open_count && !force)
6335                         ret = -EBUSY;
6336                 else
6337                         already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6338                                                         &rbd_dev->flags);
6339                 spin_unlock_irq(&rbd_dev->lock);
6340         }
6341         spin_unlock(&rbd_dev_list_lock);
6342         if (ret < 0 || already)
6343                 return ret;
6344
6345         if (force) {
6346                 /*
6347                  * Prevent new IO from being queued and wait for existing
6348                  * IO to complete/fail.
6349                  */
6350                 blk_mq_freeze_queue(rbd_dev->disk->queue);
6351                 blk_set_queue_dying(rbd_dev->disk->queue);
6352         }
6353
6354         del_gendisk(rbd_dev->disk);
6355         spin_lock(&rbd_dev_list_lock);
6356         list_del_init(&rbd_dev->node);
6357         spin_unlock(&rbd_dev_list_lock);
6358         device_del(&rbd_dev->dev);
6359
6360         rbd_dev_image_unlock(rbd_dev);
6361         rbd_dev_device_release(rbd_dev);
6362         rbd_dev_image_release(rbd_dev);
6363         rbd_dev_destroy(rbd_dev);
6364         return count;
6365 }
6366
6367 static ssize_t rbd_remove(struct bus_type *bus,
6368                           const char *buf,
6369                           size_t count)
6370 {
6371         if (single_major)
6372                 return -EINVAL;
6373
6374         return do_rbd_remove(bus, buf, count);
6375 }
6376
6377 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6378                                        const char *buf,
6379                                        size_t count)
6380 {
6381         return do_rbd_remove(bus, buf, count);
6382 }
6383
6384 /*
6385  * create control files in sysfs
6386  * /sys/bus/rbd/...
6387  */
6388 static int rbd_sysfs_init(void)
6389 {
6390         int ret;
6391
6392         ret = device_register(&rbd_root_dev);
6393         if (ret < 0)
6394                 return ret;
6395
6396         ret = bus_register(&rbd_bus_type);
6397         if (ret < 0)
6398                 device_unregister(&rbd_root_dev);
6399
6400         return ret;
6401 }
6402
6403 static void rbd_sysfs_cleanup(void)
6404 {
6405         bus_unregister(&rbd_bus_type);
6406         device_unregister(&rbd_root_dev);
6407 }
6408
6409 static int rbd_slab_init(void)
6410 {
6411         rbd_assert(!rbd_img_request_cache);
6412         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6413         if (!rbd_img_request_cache)
6414                 return -ENOMEM;
6415
6416         rbd_assert(!rbd_obj_request_cache);
6417         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6418         if (!rbd_obj_request_cache)
6419                 goto out_err;
6420
6421         rbd_assert(!rbd_bio_clone);
6422         rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6423         if (!rbd_bio_clone)
6424                 goto out_err_clone;
6425
6426         return 0;
6427
6428 out_err_clone:
6429         kmem_cache_destroy(rbd_obj_request_cache);
6430         rbd_obj_request_cache = NULL;
6431 out_err:
6432         kmem_cache_destroy(rbd_img_request_cache);
6433         rbd_img_request_cache = NULL;
6434         return -ENOMEM;
6435 }
6436
6437 static void rbd_slab_exit(void)
6438 {
6439         rbd_assert(rbd_obj_request_cache);
6440         kmem_cache_destroy(rbd_obj_request_cache);
6441         rbd_obj_request_cache = NULL;
6442
6443         rbd_assert(rbd_img_request_cache);
6444         kmem_cache_destroy(rbd_img_request_cache);
6445         rbd_img_request_cache = NULL;
6446
6447         rbd_assert(rbd_bio_clone);
6448         bioset_free(rbd_bio_clone);
6449         rbd_bio_clone = NULL;
6450 }
6451
6452 static int __init rbd_init(void)
6453 {
6454         int rc;
6455
6456         if (!libceph_compatible(NULL)) {
6457                 rbd_warn(NULL, "libceph incompatibility (quitting)");
6458                 return -EINVAL;
6459         }
6460
6461         rc = rbd_slab_init();
6462         if (rc)
6463                 return rc;
6464
6465         /*
6466          * The number of active work items is limited by the number of
6467          * rbd devices * queue depth, so leave @max_active at default.
6468          */
6469         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6470         if (!rbd_wq) {
6471                 rc = -ENOMEM;
6472                 goto err_out_slab;
6473         }
6474
6475         if (single_major) {
6476                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6477                 if (rbd_major < 0) {
6478                         rc = rbd_major;
6479                         goto err_out_wq;
6480                 }
6481         }
6482
6483         rc = rbd_sysfs_init();
6484         if (rc)
6485                 goto err_out_blkdev;
6486
6487         if (single_major)
6488                 pr_info("loaded (major %d)\n", rbd_major);
6489         else
6490                 pr_info("loaded\n");
6491
6492         return 0;
6493
6494 err_out_blkdev:
6495         if (single_major)
6496                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6497 err_out_wq:
6498         destroy_workqueue(rbd_wq);
6499 err_out_slab:
6500         rbd_slab_exit();
6501         return rc;
6502 }
6503
6504 static void __exit rbd_exit(void)
6505 {
6506         ida_destroy(&rbd_dev_id_ida);
6507         rbd_sysfs_cleanup();
6508         if (single_major)
6509                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6510         destroy_workqueue(rbd_wq);
6511         rbd_slab_exit();
6512 }
6513
6514 module_init(rbd_init);
6515 module_exit(rbd_exit);
6516
6517 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6518 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6519 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6520 /* following authorship retained from original osdblk.c */
6521 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6522
6523 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6524 MODULE_LICENSE("GPL");