aacae6f7163eed3a4935fcd53b397caad3b67a11
[sfrench/cifs-2.6.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
38
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
48
49 #include "rbd_types.h"
50
51 #define RBD_DEBUG       /* Activate rbd_assert() calls */
52
53 /*
54  * The basic unit of block I/O is a sector.  It is interpreted in a
55  * number of contexts in Linux (blk, bio, genhd), but the default is
56  * universally 512 bytes.  These symbols are just slightly more
57  * meaningful than the bare numbers they represent.
58  */
59 #define SECTOR_SHIFT    9
60 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
61
62 /*
63  * Increment the given counter and return its updated value.
64  * If the counter is already 0 it will not be incremented.
65  * If the counter is already at its maximum value returns
66  * -EINVAL without updating it.
67  */
68 static int atomic_inc_return_safe(atomic_t *v)
69 {
70         unsigned int counter;
71
72         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73         if (counter <= (unsigned int)INT_MAX)
74                 return (int)counter;
75
76         atomic_dec(v);
77
78         return -EINVAL;
79 }
80
81 /* Decrement the counter.  Return the resulting value, or -EINVAL */
82 static int atomic_dec_return_safe(atomic_t *v)
83 {
84         int counter;
85
86         counter = atomic_dec_return(v);
87         if (counter >= 0)
88                 return counter;
89
90         atomic_inc(v);
91
92         return -EINVAL;
93 }
94
95 #define RBD_DRV_NAME "rbd"
96
97 #define RBD_MINORS_PER_MAJOR            256
98 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
99
100 #define RBD_MAX_PARENT_CHAIN_LEN        16
101
102 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
103 #define RBD_MAX_SNAP_NAME_LEN   \
104                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
106 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
107
108 #define RBD_SNAP_HEAD_NAME      "-"
109
110 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
111
112 /* This allows a single page to hold an image name sent by OSD */
113 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
114 #define RBD_IMAGE_ID_LEN_MAX    64
115
116 #define RBD_OBJ_PREFIX_LEN_MAX  64
117
118 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
119 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
120
121 /* Feature bits */
122
123 #define RBD_FEATURE_LAYERING            (1ULL<<0)
124 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
125 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
126 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
127
128 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
129                                  RBD_FEATURE_STRIPINGV2 |       \
130                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
131                                  RBD_FEATURE_DATA_POOL)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
136
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN            32
142
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147         /* These six fields never change for a given rbd image */
148         char *object_prefix;
149         __u8 obj_order;
150         u64 stripe_unit;
151         u64 stripe_count;
152         s64 data_pool_id;
153         u64 features;           /* Might be changeable someday? */
154
155         /* The remaining fields need to be updated occasionally */
156         u64 image_size;
157         struct ceph_snap_context *snapc;
158         char *snap_names;       /* format 1 only */
159         u64 *snap_sizes;        /* format 1 only */
160 };
161
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188         u64             pool_id;
189         const char      *pool_name;
190
191         const char      *image_id;
192         const char      *image_name;
193
194         u64             snap_id;
195         const char      *snap_name;
196
197         struct kref     kref;
198 };
199
200 /*
201  * an instance of the client.  multiple devices may share an rbd client.
202  */
203 struct rbd_client {
204         struct ceph_client      *client;
205         struct kref             kref;
206         struct list_head        node;
207 };
208
209 struct rbd_img_request;
210 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
211
212 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
213
214 struct rbd_obj_request;
215 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
216
217 enum obj_request_type {
218         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
219 };
220
221 enum obj_operation_type {
222         OBJ_OP_WRITE,
223         OBJ_OP_READ,
224         OBJ_OP_DISCARD,
225 };
226
227 enum obj_req_flags {
228         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
229         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
230         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
231         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
232 };
233
234 struct rbd_obj_request {
235         u64                     object_no;
236         u64                     offset;         /* object start byte */
237         u64                     length;         /* bytes from offset */
238         unsigned long           flags;
239
240         /*
241          * An object request associated with an image will have its
242          * img_data flag set; a standalone object request will not.
243          *
244          * A standalone object request will have which == BAD_WHICH
245          * and a null obj_request pointer.
246          *
247          * An object request initiated in support of a layered image
248          * object (to check for its existence before a write) will
249          * have which == BAD_WHICH and a non-null obj_request pointer.
250          *
251          * Finally, an object request for rbd image data will have
252          * which != BAD_WHICH, and will have a non-null img_request
253          * pointer.  The value of which will be in the range
254          * 0..(img_request->obj_request_count-1).
255          */
256         union {
257                 struct rbd_obj_request  *obj_request;   /* STAT op */
258                 struct {
259                         struct rbd_img_request  *img_request;
260                         u64                     img_offset;
261                         /* links for img_request->obj_requests list */
262                         struct list_head        links;
263                 };
264         };
265         u32                     which;          /* posn image request list */
266
267         enum obj_request_type   type;
268         union {
269                 struct bio      *bio_list;
270                 struct {
271                         struct page     **pages;
272                         u32             page_count;
273                 };
274         };
275         struct page             **copyup_pages;
276         u32                     copyup_page_count;
277
278         struct ceph_osd_request *osd_req;
279
280         u64                     xferred;        /* bytes transferred */
281         int                     result;
282
283         rbd_obj_callback_t      callback;
284         struct completion       completion;
285
286         struct kref             kref;
287 };
288
289 enum img_req_flags {
290         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
291         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
292         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
293         IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
294 };
295
296 struct rbd_img_request {
297         struct rbd_device       *rbd_dev;
298         u64                     offset; /* starting image byte offset */
299         u64                     length; /* byte count from offset */
300         unsigned long           flags;
301         union {
302                 u64                     snap_id;        /* for reads */
303                 struct ceph_snap_context *snapc;        /* for writes */
304         };
305         union {
306                 struct request          *rq;            /* block request */
307                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
308         };
309         struct page             **copyup_pages;
310         u32                     copyup_page_count;
311         spinlock_t              completion_lock;/* protects next_completion */
312         u32                     next_completion;
313         rbd_img_callback_t      callback;
314         u64                     xferred;/* aggregate bytes transferred */
315         int                     result; /* first nonzero obj_request result */
316
317         u32                     obj_request_count;
318         struct list_head        obj_requests;   /* rbd_obj_request structs */
319
320         struct kref             kref;
321 };
322
323 #define for_each_obj_request(ireq, oreq) \
324         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
325 #define for_each_obj_request_from(ireq, oreq) \
326         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
327 #define for_each_obj_request_safe(ireq, oreq, n) \
328         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
329
330 enum rbd_watch_state {
331         RBD_WATCH_STATE_UNREGISTERED,
332         RBD_WATCH_STATE_REGISTERED,
333         RBD_WATCH_STATE_ERROR,
334 };
335
336 enum rbd_lock_state {
337         RBD_LOCK_STATE_UNLOCKED,
338         RBD_LOCK_STATE_LOCKED,
339         RBD_LOCK_STATE_RELEASING,
340 };
341
342 /* WatchNotify::ClientId */
343 struct rbd_client_id {
344         u64 gid;
345         u64 handle;
346 };
347
348 struct rbd_mapping {
349         u64                     size;
350         u64                     features;
351 };
352
353 /*
354  * a single device
355  */
356 struct rbd_device {
357         int                     dev_id;         /* blkdev unique id */
358
359         int                     major;          /* blkdev assigned major */
360         int                     minor;
361         struct gendisk          *disk;          /* blkdev's gendisk and rq */
362
363         u32                     image_format;   /* Either 1 or 2 */
364         struct rbd_client       *rbd_client;
365
366         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
367
368         spinlock_t              lock;           /* queue, flags, open_count */
369
370         struct rbd_image_header header;
371         unsigned long           flags;          /* possibly lock protected */
372         struct rbd_spec         *spec;
373         struct rbd_options      *opts;
374         char                    *config_info;   /* add{,_single_major} string */
375
376         struct ceph_object_id   header_oid;
377         struct ceph_object_locator header_oloc;
378
379         struct ceph_file_layout layout;         /* used for all rbd requests */
380
381         struct mutex            watch_mutex;
382         enum rbd_watch_state    watch_state;
383         struct ceph_osd_linger_request *watch_handle;
384         u64                     watch_cookie;
385         struct delayed_work     watch_dwork;
386
387         struct rw_semaphore     lock_rwsem;
388         enum rbd_lock_state     lock_state;
389         char                    lock_cookie[32];
390         struct rbd_client_id    owner_cid;
391         struct work_struct      acquired_lock_work;
392         struct work_struct      released_lock_work;
393         struct delayed_work     lock_dwork;
394         struct work_struct      unlock_work;
395         wait_queue_head_t       lock_waitq;
396
397         struct workqueue_struct *task_wq;
398
399         struct rbd_spec         *parent_spec;
400         u64                     parent_overlap;
401         atomic_t                parent_ref;
402         struct rbd_device       *parent;
403
404         /* Block layer tags. */
405         struct blk_mq_tag_set   tag_set;
406
407         /* protects updating the header */
408         struct rw_semaphore     header_rwsem;
409
410         struct rbd_mapping      mapping;
411
412         struct list_head        node;
413
414         /* sysfs related */
415         struct device           dev;
416         unsigned long           open_count;     /* protected by lock */
417 };
418
419 /*
420  * Flag bits for rbd_dev->flags:
421  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
422  *   by rbd_dev->lock
423  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
424  */
425 enum rbd_dev_flags {
426         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
427         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
428         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
429 };
430
431 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
432
433 static LIST_HEAD(rbd_dev_list);    /* devices */
434 static DEFINE_SPINLOCK(rbd_dev_list_lock);
435
436 static LIST_HEAD(rbd_client_list);              /* clients */
437 static DEFINE_SPINLOCK(rbd_client_list_lock);
438
439 /* Slab caches for frequently-allocated structures */
440
441 static struct kmem_cache        *rbd_img_request_cache;
442 static struct kmem_cache        *rbd_obj_request_cache;
443
444 static struct bio_set           *rbd_bio_clone;
445
446 static int rbd_major;
447 static DEFINE_IDA(rbd_dev_id_ida);
448
449 static struct workqueue_struct *rbd_wq;
450
451 /*
452  * single-major requires >= 0.75 version of userspace rbd utility.
453  */
454 static bool single_major = true;
455 module_param(single_major, bool, S_IRUGO);
456 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
457
458 static int rbd_img_request_submit(struct rbd_img_request *img_request);
459
460 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
461                        size_t count);
462 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
463                           size_t count);
464 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
465                                     size_t count);
466 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
467                                        size_t count);
468 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
469 static void rbd_spec_put(struct rbd_spec *spec);
470
471 static int rbd_dev_id_to_minor(int dev_id)
472 {
473         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
474 }
475
476 static int minor_to_rbd_dev_id(int minor)
477 {
478         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
479 }
480
481 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
482 {
483         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
484                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
485 }
486
487 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
488 {
489         bool is_lock_owner;
490
491         down_read(&rbd_dev->lock_rwsem);
492         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
493         up_read(&rbd_dev->lock_rwsem);
494         return is_lock_owner;
495 }
496
497 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
498 {
499         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
500 }
501
502 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
503 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
504 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
505 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
506 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
507
508 static struct attribute *rbd_bus_attrs[] = {
509         &bus_attr_add.attr,
510         &bus_attr_remove.attr,
511         &bus_attr_add_single_major.attr,
512         &bus_attr_remove_single_major.attr,
513         &bus_attr_supported_features.attr,
514         NULL,
515 };
516
517 static umode_t rbd_bus_is_visible(struct kobject *kobj,
518                                   struct attribute *attr, int index)
519 {
520         if (!single_major &&
521             (attr == &bus_attr_add_single_major.attr ||
522              attr == &bus_attr_remove_single_major.attr))
523                 return 0;
524
525         return attr->mode;
526 }
527
528 static const struct attribute_group rbd_bus_group = {
529         .attrs = rbd_bus_attrs,
530         .is_visible = rbd_bus_is_visible,
531 };
532 __ATTRIBUTE_GROUPS(rbd_bus);
533
534 static struct bus_type rbd_bus_type = {
535         .name           = "rbd",
536         .bus_groups     = rbd_bus_groups,
537 };
538
539 static void rbd_root_dev_release(struct device *dev)
540 {
541 }
542
543 static struct device rbd_root_dev = {
544         .init_name =    "rbd",
545         .release =      rbd_root_dev_release,
546 };
547
548 static __printf(2, 3)
549 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
550 {
551         struct va_format vaf;
552         va_list args;
553
554         va_start(args, fmt);
555         vaf.fmt = fmt;
556         vaf.va = &args;
557
558         if (!rbd_dev)
559                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
560         else if (rbd_dev->disk)
561                 printk(KERN_WARNING "%s: %s: %pV\n",
562                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
563         else if (rbd_dev->spec && rbd_dev->spec->image_name)
564                 printk(KERN_WARNING "%s: image %s: %pV\n",
565                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
566         else if (rbd_dev->spec && rbd_dev->spec->image_id)
567                 printk(KERN_WARNING "%s: id %s: %pV\n",
568                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
569         else    /* punt */
570                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
571                         RBD_DRV_NAME, rbd_dev, &vaf);
572         va_end(args);
573 }
574
575 #ifdef RBD_DEBUG
576 #define rbd_assert(expr)                                                \
577                 if (unlikely(!(expr))) {                                \
578                         printk(KERN_ERR "\nAssertion failure in %s() "  \
579                                                 "at line %d:\n\n"       \
580                                         "\trbd_assert(%s);\n\n",        \
581                                         __func__, __LINE__, #expr);     \
582                         BUG();                                          \
583                 }
584 #else /* !RBD_DEBUG */
585 #  define rbd_assert(expr)      ((void) 0)
586 #endif /* !RBD_DEBUG */
587
588 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
589 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
590 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
591 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
592
593 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
594 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
595 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
596 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
597 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
598                                         u64 snap_id);
599 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
600                                 u8 *order, u64 *snap_size);
601 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
602                 u64 *snap_features);
603
604 static int rbd_open(struct block_device *bdev, fmode_t mode)
605 {
606         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
607         bool removing = false;
608
609         spin_lock_irq(&rbd_dev->lock);
610         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
611                 removing = true;
612         else
613                 rbd_dev->open_count++;
614         spin_unlock_irq(&rbd_dev->lock);
615         if (removing)
616                 return -ENOENT;
617
618         (void) get_device(&rbd_dev->dev);
619
620         return 0;
621 }
622
623 static void rbd_release(struct gendisk *disk, fmode_t mode)
624 {
625         struct rbd_device *rbd_dev = disk->private_data;
626         unsigned long open_count_before;
627
628         spin_lock_irq(&rbd_dev->lock);
629         open_count_before = rbd_dev->open_count--;
630         spin_unlock_irq(&rbd_dev->lock);
631         rbd_assert(open_count_before > 0);
632
633         put_device(&rbd_dev->dev);
634 }
635
636 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
637 {
638         int ro;
639
640         if (get_user(ro, (int __user *)arg))
641                 return -EFAULT;
642
643         /* Snapshots can't be marked read-write */
644         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
645                 return -EROFS;
646
647         /* Let blkdev_roset() handle it */
648         return -ENOTTY;
649 }
650
651 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
652                         unsigned int cmd, unsigned long arg)
653 {
654         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
655         int ret;
656
657         switch (cmd) {
658         case BLKROSET:
659                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
660                 break;
661         default:
662                 ret = -ENOTTY;
663         }
664
665         return ret;
666 }
667
668 #ifdef CONFIG_COMPAT
669 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
670                                 unsigned int cmd, unsigned long arg)
671 {
672         return rbd_ioctl(bdev, mode, cmd, arg);
673 }
674 #endif /* CONFIG_COMPAT */
675
676 static const struct block_device_operations rbd_bd_ops = {
677         .owner                  = THIS_MODULE,
678         .open                   = rbd_open,
679         .release                = rbd_release,
680         .ioctl                  = rbd_ioctl,
681 #ifdef CONFIG_COMPAT
682         .compat_ioctl           = rbd_compat_ioctl,
683 #endif
684 };
685
686 /*
687  * Initialize an rbd client instance.  Success or not, this function
688  * consumes ceph_opts.  Caller holds client_mutex.
689  */
690 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
691 {
692         struct rbd_client *rbdc;
693         int ret = -ENOMEM;
694
695         dout("%s:\n", __func__);
696         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
697         if (!rbdc)
698                 goto out_opt;
699
700         kref_init(&rbdc->kref);
701         INIT_LIST_HEAD(&rbdc->node);
702
703         rbdc->client = ceph_create_client(ceph_opts, rbdc);
704         if (IS_ERR(rbdc->client))
705                 goto out_rbdc;
706         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
707
708         ret = ceph_open_session(rbdc->client);
709         if (ret < 0)
710                 goto out_client;
711
712         spin_lock(&rbd_client_list_lock);
713         list_add_tail(&rbdc->node, &rbd_client_list);
714         spin_unlock(&rbd_client_list_lock);
715
716         dout("%s: rbdc %p\n", __func__, rbdc);
717
718         return rbdc;
719 out_client:
720         ceph_destroy_client(rbdc->client);
721 out_rbdc:
722         kfree(rbdc);
723 out_opt:
724         if (ceph_opts)
725                 ceph_destroy_options(ceph_opts);
726         dout("%s: error %d\n", __func__, ret);
727
728         return ERR_PTR(ret);
729 }
730
731 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
732 {
733         kref_get(&rbdc->kref);
734
735         return rbdc;
736 }
737
738 /*
739  * Find a ceph client with specific addr and configuration.  If
740  * found, bump its reference count.
741  */
742 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
743 {
744         struct rbd_client *client_node;
745         bool found = false;
746
747         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
748                 return NULL;
749
750         spin_lock(&rbd_client_list_lock);
751         list_for_each_entry(client_node, &rbd_client_list, node) {
752                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
753                         __rbd_get_client(client_node);
754
755                         found = true;
756                         break;
757                 }
758         }
759         spin_unlock(&rbd_client_list_lock);
760
761         return found ? client_node : NULL;
762 }
763
764 /*
765  * (Per device) rbd map options
766  */
767 enum {
768         Opt_queue_depth,
769         Opt_last_int,
770         /* int args above */
771         Opt_last_string,
772         /* string args above */
773         Opt_read_only,
774         Opt_read_write,
775         Opt_lock_on_read,
776         Opt_exclusive,
777         Opt_err
778 };
779
780 static match_table_t rbd_opts_tokens = {
781         {Opt_queue_depth, "queue_depth=%d"},
782         /* int args above */
783         /* string args above */
784         {Opt_read_only, "read_only"},
785         {Opt_read_only, "ro"},          /* Alternate spelling */
786         {Opt_read_write, "read_write"},
787         {Opt_read_write, "rw"},         /* Alternate spelling */
788         {Opt_lock_on_read, "lock_on_read"},
789         {Opt_exclusive, "exclusive"},
790         {Opt_err, NULL}
791 };
792
793 struct rbd_options {
794         int     queue_depth;
795         bool    read_only;
796         bool    lock_on_read;
797         bool    exclusive;
798 };
799
800 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
801 #define RBD_READ_ONLY_DEFAULT   false
802 #define RBD_LOCK_ON_READ_DEFAULT false
803 #define RBD_EXCLUSIVE_DEFAULT   false
804
805 static int parse_rbd_opts_token(char *c, void *private)
806 {
807         struct rbd_options *rbd_opts = private;
808         substring_t argstr[MAX_OPT_ARGS];
809         int token, intval, ret;
810
811         token = match_token(c, rbd_opts_tokens, argstr);
812         if (token < Opt_last_int) {
813                 ret = match_int(&argstr[0], &intval);
814                 if (ret < 0) {
815                         pr_err("bad mount option arg (not int) at '%s'\n", c);
816                         return ret;
817                 }
818                 dout("got int token %d val %d\n", token, intval);
819         } else if (token > Opt_last_int && token < Opt_last_string) {
820                 dout("got string token %d val %s\n", token, argstr[0].from);
821         } else {
822                 dout("got token %d\n", token);
823         }
824
825         switch (token) {
826         case Opt_queue_depth:
827                 if (intval < 1) {
828                         pr_err("queue_depth out of range\n");
829                         return -EINVAL;
830                 }
831                 rbd_opts->queue_depth = intval;
832                 break;
833         case Opt_read_only:
834                 rbd_opts->read_only = true;
835                 break;
836         case Opt_read_write:
837                 rbd_opts->read_only = false;
838                 break;
839         case Opt_lock_on_read:
840                 rbd_opts->lock_on_read = true;
841                 break;
842         case Opt_exclusive:
843                 rbd_opts->exclusive = true;
844                 break;
845         default:
846                 /* libceph prints "bad option" msg */
847                 return -EINVAL;
848         }
849
850         return 0;
851 }
852
853 static char* obj_op_name(enum obj_operation_type op_type)
854 {
855         switch (op_type) {
856         case OBJ_OP_READ:
857                 return "read";
858         case OBJ_OP_WRITE:
859                 return "write";
860         case OBJ_OP_DISCARD:
861                 return "discard";
862         default:
863                 return "???";
864         }
865 }
866
867 /*
868  * Get a ceph client with specific addr and configuration, if one does
869  * not exist create it.  Either way, ceph_opts is consumed by this
870  * function.
871  */
872 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
873 {
874         struct rbd_client *rbdc;
875
876         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
877         rbdc = rbd_client_find(ceph_opts);
878         if (rbdc)       /* using an existing client */
879                 ceph_destroy_options(ceph_opts);
880         else
881                 rbdc = rbd_client_create(ceph_opts);
882         mutex_unlock(&client_mutex);
883
884         return rbdc;
885 }
886
887 /*
888  * Destroy ceph client
889  *
890  * Caller must hold rbd_client_list_lock.
891  */
892 static void rbd_client_release(struct kref *kref)
893 {
894         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
895
896         dout("%s: rbdc %p\n", __func__, rbdc);
897         spin_lock(&rbd_client_list_lock);
898         list_del(&rbdc->node);
899         spin_unlock(&rbd_client_list_lock);
900
901         ceph_destroy_client(rbdc->client);
902         kfree(rbdc);
903 }
904
905 /*
906  * Drop reference to ceph client node. If it's not referenced anymore, release
907  * it.
908  */
909 static void rbd_put_client(struct rbd_client *rbdc)
910 {
911         if (rbdc)
912                 kref_put(&rbdc->kref, rbd_client_release);
913 }
914
915 static bool rbd_image_format_valid(u32 image_format)
916 {
917         return image_format == 1 || image_format == 2;
918 }
919
920 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
921 {
922         size_t size;
923         u32 snap_count;
924
925         /* The header has to start with the magic rbd header text */
926         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
927                 return false;
928
929         /* The bio layer requires at least sector-sized I/O */
930
931         if (ondisk->options.order < SECTOR_SHIFT)
932                 return false;
933
934         /* If we use u64 in a few spots we may be able to loosen this */
935
936         if (ondisk->options.order > 8 * sizeof (int) - 1)
937                 return false;
938
939         /*
940          * The size of a snapshot header has to fit in a size_t, and
941          * that limits the number of snapshots.
942          */
943         snap_count = le32_to_cpu(ondisk->snap_count);
944         size = SIZE_MAX - sizeof (struct ceph_snap_context);
945         if (snap_count > size / sizeof (__le64))
946                 return false;
947
948         /*
949          * Not only that, but the size of the entire the snapshot
950          * header must also be representable in a size_t.
951          */
952         size -= snap_count * sizeof (__le64);
953         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
954                 return false;
955
956         return true;
957 }
958
959 /*
960  * returns the size of an object in the image
961  */
962 static u32 rbd_obj_bytes(struct rbd_image_header *header)
963 {
964         return 1U << header->obj_order;
965 }
966
967 static void rbd_init_layout(struct rbd_device *rbd_dev)
968 {
969         if (rbd_dev->header.stripe_unit == 0 ||
970             rbd_dev->header.stripe_count == 0) {
971                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
972                 rbd_dev->header.stripe_count = 1;
973         }
974
975         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
976         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
977         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
978         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
979                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
980         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
981 }
982
983 /*
984  * Fill an rbd image header with information from the given format 1
985  * on-disk header.
986  */
987 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
988                                  struct rbd_image_header_ondisk *ondisk)
989 {
990         struct rbd_image_header *header = &rbd_dev->header;
991         bool first_time = header->object_prefix == NULL;
992         struct ceph_snap_context *snapc;
993         char *object_prefix = NULL;
994         char *snap_names = NULL;
995         u64 *snap_sizes = NULL;
996         u32 snap_count;
997         int ret = -ENOMEM;
998         u32 i;
999
1000         /* Allocate this now to avoid having to handle failure below */
1001
1002         if (first_time) {
1003                 object_prefix = kstrndup(ondisk->object_prefix,
1004                                          sizeof(ondisk->object_prefix),
1005                                          GFP_KERNEL);
1006                 if (!object_prefix)
1007                         return -ENOMEM;
1008         }
1009
1010         /* Allocate the snapshot context and fill it in */
1011
1012         snap_count = le32_to_cpu(ondisk->snap_count);
1013         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1014         if (!snapc)
1015                 goto out_err;
1016         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1017         if (snap_count) {
1018                 struct rbd_image_snap_ondisk *snaps;
1019                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1020
1021                 /* We'll keep a copy of the snapshot names... */
1022
1023                 if (snap_names_len > (u64)SIZE_MAX)
1024                         goto out_2big;
1025                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1026                 if (!snap_names)
1027                         goto out_err;
1028
1029                 /* ...as well as the array of their sizes. */
1030                 snap_sizes = kmalloc_array(snap_count,
1031                                            sizeof(*header->snap_sizes),
1032                                            GFP_KERNEL);
1033                 if (!snap_sizes)
1034                         goto out_err;
1035
1036                 /*
1037                  * Copy the names, and fill in each snapshot's id
1038                  * and size.
1039                  *
1040                  * Note that rbd_dev_v1_header_info() guarantees the
1041                  * ondisk buffer we're working with has
1042                  * snap_names_len bytes beyond the end of the
1043                  * snapshot id array, this memcpy() is safe.
1044                  */
1045                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1046                 snaps = ondisk->snaps;
1047                 for (i = 0; i < snap_count; i++) {
1048                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1049                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1050                 }
1051         }
1052
1053         /* We won't fail any more, fill in the header */
1054
1055         if (first_time) {
1056                 header->object_prefix = object_prefix;
1057                 header->obj_order = ondisk->options.order;
1058                 rbd_init_layout(rbd_dev);
1059         } else {
1060                 ceph_put_snap_context(header->snapc);
1061                 kfree(header->snap_names);
1062                 kfree(header->snap_sizes);
1063         }
1064
1065         /* The remaining fields always get updated (when we refresh) */
1066
1067         header->image_size = le64_to_cpu(ondisk->image_size);
1068         header->snapc = snapc;
1069         header->snap_names = snap_names;
1070         header->snap_sizes = snap_sizes;
1071
1072         return 0;
1073 out_2big:
1074         ret = -EIO;
1075 out_err:
1076         kfree(snap_sizes);
1077         kfree(snap_names);
1078         ceph_put_snap_context(snapc);
1079         kfree(object_prefix);
1080
1081         return ret;
1082 }
1083
1084 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1085 {
1086         const char *snap_name;
1087
1088         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1089
1090         /* Skip over names until we find the one we are looking for */
1091
1092         snap_name = rbd_dev->header.snap_names;
1093         while (which--)
1094                 snap_name += strlen(snap_name) + 1;
1095
1096         return kstrdup(snap_name, GFP_KERNEL);
1097 }
1098
1099 /*
1100  * Snapshot id comparison function for use with qsort()/bsearch().
1101  * Note that result is for snapshots in *descending* order.
1102  */
1103 static int snapid_compare_reverse(const void *s1, const void *s2)
1104 {
1105         u64 snap_id1 = *(u64 *)s1;
1106         u64 snap_id2 = *(u64 *)s2;
1107
1108         if (snap_id1 < snap_id2)
1109                 return 1;
1110         return snap_id1 == snap_id2 ? 0 : -1;
1111 }
1112
1113 /*
1114  * Search a snapshot context to see if the given snapshot id is
1115  * present.
1116  *
1117  * Returns the position of the snapshot id in the array if it's found,
1118  * or BAD_SNAP_INDEX otherwise.
1119  *
1120  * Note: The snapshot array is in kept sorted (by the osd) in
1121  * reverse order, highest snapshot id first.
1122  */
1123 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1124 {
1125         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1126         u64 *found;
1127
1128         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1129                                 sizeof (snap_id), snapid_compare_reverse);
1130
1131         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1132 }
1133
1134 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1135                                         u64 snap_id)
1136 {
1137         u32 which;
1138         const char *snap_name;
1139
1140         which = rbd_dev_snap_index(rbd_dev, snap_id);
1141         if (which == BAD_SNAP_INDEX)
1142                 return ERR_PTR(-ENOENT);
1143
1144         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1145         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1146 }
1147
1148 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1149 {
1150         if (snap_id == CEPH_NOSNAP)
1151                 return RBD_SNAP_HEAD_NAME;
1152
1153         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1154         if (rbd_dev->image_format == 1)
1155                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1156
1157         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1158 }
1159
1160 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1161                                 u64 *snap_size)
1162 {
1163         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1164         if (snap_id == CEPH_NOSNAP) {
1165                 *snap_size = rbd_dev->header.image_size;
1166         } else if (rbd_dev->image_format == 1) {
1167                 u32 which;
1168
1169                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1170                 if (which == BAD_SNAP_INDEX)
1171                         return -ENOENT;
1172
1173                 *snap_size = rbd_dev->header.snap_sizes[which];
1174         } else {
1175                 u64 size = 0;
1176                 int ret;
1177
1178                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1179                 if (ret)
1180                         return ret;
1181
1182                 *snap_size = size;
1183         }
1184         return 0;
1185 }
1186
1187 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1188                         u64 *snap_features)
1189 {
1190         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1191         if (snap_id == CEPH_NOSNAP) {
1192                 *snap_features = rbd_dev->header.features;
1193         } else if (rbd_dev->image_format == 1) {
1194                 *snap_features = 0;     /* No features for format 1 */
1195         } else {
1196                 u64 features = 0;
1197                 int ret;
1198
1199                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1200                 if (ret)
1201                         return ret;
1202
1203                 *snap_features = features;
1204         }
1205         return 0;
1206 }
1207
1208 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1209 {
1210         u64 snap_id = rbd_dev->spec->snap_id;
1211         u64 size = 0;
1212         u64 features = 0;
1213         int ret;
1214
1215         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1216         if (ret)
1217                 return ret;
1218         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1219         if (ret)
1220                 return ret;
1221
1222         rbd_dev->mapping.size = size;
1223         rbd_dev->mapping.features = features;
1224
1225         return 0;
1226 }
1227
1228 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1229 {
1230         rbd_dev->mapping.size = 0;
1231         rbd_dev->mapping.features = 0;
1232 }
1233
1234 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1235 {
1236         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1237
1238         return offset & (segment_size - 1);
1239 }
1240
1241 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1242                                 u64 offset, u64 length)
1243 {
1244         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1245
1246         offset &= segment_size - 1;
1247
1248         rbd_assert(length <= U64_MAX - offset);
1249         if (offset + length > segment_size)
1250                 length = segment_size - offset;
1251
1252         return length;
1253 }
1254
1255 /*
1256  * bio helpers
1257  */
1258
1259 static void bio_chain_put(struct bio *chain)
1260 {
1261         struct bio *tmp;
1262
1263         while (chain) {
1264                 tmp = chain;
1265                 chain = chain->bi_next;
1266                 bio_put(tmp);
1267         }
1268 }
1269
1270 /*
1271  * zeros a bio chain, starting at specific offset
1272  */
1273 static void zero_bio_chain(struct bio *chain, int start_ofs)
1274 {
1275         struct bio_vec bv;
1276         struct bvec_iter iter;
1277         unsigned long flags;
1278         void *buf;
1279         int pos = 0;
1280
1281         while (chain) {
1282                 bio_for_each_segment(bv, chain, iter) {
1283                         if (pos + bv.bv_len > start_ofs) {
1284                                 int remainder = max(start_ofs - pos, 0);
1285                                 buf = bvec_kmap_irq(&bv, &flags);
1286                                 memset(buf + remainder, 0,
1287                                        bv.bv_len - remainder);
1288                                 flush_dcache_page(bv.bv_page);
1289                                 bvec_kunmap_irq(buf, &flags);
1290                         }
1291                         pos += bv.bv_len;
1292                 }
1293
1294                 chain = chain->bi_next;
1295         }
1296 }
1297
1298 /*
1299  * similar to zero_bio_chain(), zeros data defined by a page array,
1300  * starting at the given byte offset from the start of the array and
1301  * continuing up to the given end offset.  The pages array is
1302  * assumed to be big enough to hold all bytes up to the end.
1303  */
1304 static void zero_pages(struct page **pages, u64 offset, u64 end)
1305 {
1306         struct page **page = &pages[offset >> PAGE_SHIFT];
1307
1308         rbd_assert(end > offset);
1309         rbd_assert(end - offset <= (u64)SIZE_MAX);
1310         while (offset < end) {
1311                 size_t page_offset;
1312                 size_t length;
1313                 unsigned long flags;
1314                 void *kaddr;
1315
1316                 page_offset = offset & ~PAGE_MASK;
1317                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1318                 local_irq_save(flags);
1319                 kaddr = kmap_atomic(*page);
1320                 memset(kaddr + page_offset, 0, length);
1321                 flush_dcache_page(*page);
1322                 kunmap_atomic(kaddr);
1323                 local_irq_restore(flags);
1324
1325                 offset += length;
1326                 page++;
1327         }
1328 }
1329
1330 /*
1331  * Clone a portion of a bio, starting at the given byte offset
1332  * and continuing for the number of bytes indicated.
1333  */
1334 static struct bio *bio_clone_range(struct bio *bio_src,
1335                                         unsigned int offset,
1336                                         unsigned int len,
1337                                         gfp_t gfpmask)
1338 {
1339         struct bio *bio;
1340
1341         bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1342         if (!bio)
1343                 return NULL;    /* ENOMEM */
1344
1345         bio_advance(bio, offset);
1346         bio->bi_iter.bi_size = len;
1347
1348         return bio;
1349 }
1350
1351 /*
1352  * Clone a portion of a bio chain, starting at the given byte offset
1353  * into the first bio in the source chain and continuing for the
1354  * number of bytes indicated.  The result is another bio chain of
1355  * exactly the given length, or a null pointer on error.
1356  *
1357  * The bio_src and offset parameters are both in-out.  On entry they
1358  * refer to the first source bio and the offset into that bio where
1359  * the start of data to be cloned is located.
1360  *
1361  * On return, bio_src is updated to refer to the bio in the source
1362  * chain that contains first un-cloned byte, and *offset will
1363  * contain the offset of that byte within that bio.
1364  */
1365 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1366                                         unsigned int *offset,
1367                                         unsigned int len,
1368                                         gfp_t gfpmask)
1369 {
1370         struct bio *bi = *bio_src;
1371         unsigned int off = *offset;
1372         struct bio *chain = NULL;
1373         struct bio **end;
1374
1375         /* Build up a chain of clone bios up to the limit */
1376
1377         if (!bi || off >= bi->bi_iter.bi_size || !len)
1378                 return NULL;            /* Nothing to clone */
1379
1380         end = &chain;
1381         while (len) {
1382                 unsigned int bi_size;
1383                 struct bio *bio;
1384
1385                 if (!bi) {
1386                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1387                         goto out_err;   /* EINVAL; ran out of bio's */
1388                 }
1389                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1390                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1391                 if (!bio)
1392                         goto out_err;   /* ENOMEM */
1393
1394                 *end = bio;
1395                 end = &bio->bi_next;
1396
1397                 off += bi_size;
1398                 if (off == bi->bi_iter.bi_size) {
1399                         bi = bi->bi_next;
1400                         off = 0;
1401                 }
1402                 len -= bi_size;
1403         }
1404         *bio_src = bi;
1405         *offset = off;
1406
1407         return chain;
1408 out_err:
1409         bio_chain_put(chain);
1410
1411         return NULL;
1412 }
1413
1414 /*
1415  * The default/initial value for all object request flags is 0.  For
1416  * each flag, once its value is set to 1 it is never reset to 0
1417  * again.
1418  */
1419 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1420 {
1421         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1422                 struct rbd_device *rbd_dev;
1423
1424                 rbd_dev = obj_request->img_request->rbd_dev;
1425                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1426                         obj_request);
1427         }
1428 }
1429
1430 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1431 {
1432         smp_mb();
1433         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1434 }
1435
1436 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1437 {
1438         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1439                 struct rbd_device *rbd_dev = NULL;
1440
1441                 if (obj_request_img_data_test(obj_request))
1442                         rbd_dev = obj_request->img_request->rbd_dev;
1443                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1444                         obj_request);
1445         }
1446 }
1447
1448 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1449 {
1450         smp_mb();
1451         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1452 }
1453
1454 /*
1455  * This sets the KNOWN flag after (possibly) setting the EXISTS
1456  * flag.  The latter is set based on the "exists" value provided.
1457  *
1458  * Note that for our purposes once an object exists it never goes
1459  * away again.  It's possible that the response from two existence
1460  * checks are separated by the creation of the target object, and
1461  * the first ("doesn't exist") response arrives *after* the second
1462  * ("does exist").  In that case we ignore the second one.
1463  */
1464 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1465                                 bool exists)
1466 {
1467         if (exists)
1468                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1469         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1470         smp_mb();
1471 }
1472
1473 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1474 {
1475         smp_mb();
1476         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1477 }
1478
1479 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1480 {
1481         smp_mb();
1482         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1483 }
1484
1485 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1486 {
1487         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1488
1489         return obj_request->img_offset <
1490             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1491 }
1492
1493 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1494 {
1495         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1496                 kref_read(&obj_request->kref));
1497         kref_get(&obj_request->kref);
1498 }
1499
1500 static void rbd_obj_request_destroy(struct kref *kref);
1501 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1502 {
1503         rbd_assert(obj_request != NULL);
1504         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1505                 kref_read(&obj_request->kref));
1506         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1507 }
1508
1509 static void rbd_img_request_get(struct rbd_img_request *img_request)
1510 {
1511         dout("%s: img %p (was %d)\n", __func__, img_request,
1512              kref_read(&img_request->kref));
1513         kref_get(&img_request->kref);
1514 }
1515
1516 static bool img_request_child_test(struct rbd_img_request *img_request);
1517 static void rbd_parent_request_destroy(struct kref *kref);
1518 static void rbd_img_request_destroy(struct kref *kref);
1519 static void rbd_img_request_put(struct rbd_img_request *img_request)
1520 {
1521         rbd_assert(img_request != NULL);
1522         dout("%s: img %p (was %d)\n", __func__, img_request,
1523                 kref_read(&img_request->kref));
1524         if (img_request_child_test(img_request))
1525                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1526         else
1527                 kref_put(&img_request->kref, rbd_img_request_destroy);
1528 }
1529
1530 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1531                                         struct rbd_obj_request *obj_request)
1532 {
1533         rbd_assert(obj_request->img_request == NULL);
1534
1535         /* Image request now owns object's original reference */
1536         obj_request->img_request = img_request;
1537         obj_request->which = img_request->obj_request_count;
1538         rbd_assert(!obj_request_img_data_test(obj_request));
1539         obj_request_img_data_set(obj_request);
1540         rbd_assert(obj_request->which != BAD_WHICH);
1541         img_request->obj_request_count++;
1542         list_add_tail(&obj_request->links, &img_request->obj_requests);
1543         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1544                 obj_request->which);
1545 }
1546
1547 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1548                                         struct rbd_obj_request *obj_request)
1549 {
1550         rbd_assert(obj_request->which != BAD_WHICH);
1551
1552         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1553                 obj_request->which);
1554         list_del(&obj_request->links);
1555         rbd_assert(img_request->obj_request_count > 0);
1556         img_request->obj_request_count--;
1557         rbd_assert(obj_request->which == img_request->obj_request_count);
1558         obj_request->which = BAD_WHICH;
1559         rbd_assert(obj_request_img_data_test(obj_request));
1560         rbd_assert(obj_request->img_request == img_request);
1561         obj_request->img_request = NULL;
1562         obj_request->callback = NULL;
1563         rbd_obj_request_put(obj_request);
1564 }
1565
1566 static bool obj_request_type_valid(enum obj_request_type type)
1567 {
1568         switch (type) {
1569         case OBJ_REQUEST_NODATA:
1570         case OBJ_REQUEST_BIO:
1571         case OBJ_REQUEST_PAGES:
1572                 return true;
1573         default:
1574                 return false;
1575         }
1576 }
1577
1578 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1579
1580 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1581 {
1582         struct ceph_osd_request *osd_req = obj_request->osd_req;
1583
1584         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1585              obj_request, obj_request->object_no, obj_request->offset,
1586              obj_request->length, osd_req);
1587         if (obj_request_img_data_test(obj_request)) {
1588                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1589                 rbd_img_request_get(obj_request->img_request);
1590         }
1591         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1592 }
1593
1594 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1595 {
1596
1597         dout("%s: img %p\n", __func__, img_request);
1598
1599         /*
1600          * If no error occurred, compute the aggregate transfer
1601          * count for the image request.  We could instead use
1602          * atomic64_cmpxchg() to update it as each object request
1603          * completes; not clear which way is better off hand.
1604          */
1605         if (!img_request->result) {
1606                 struct rbd_obj_request *obj_request;
1607                 u64 xferred = 0;
1608
1609                 for_each_obj_request(img_request, obj_request)
1610                         xferred += obj_request->xferred;
1611                 img_request->xferred = xferred;
1612         }
1613
1614         if (img_request->callback)
1615                 img_request->callback(img_request);
1616         else
1617                 rbd_img_request_put(img_request);
1618 }
1619
1620 /*
1621  * The default/initial value for all image request flags is 0.  Each
1622  * is conditionally set to 1 at image request initialization time
1623  * and currently never change thereafter.
1624  */
1625 static void img_request_write_set(struct rbd_img_request *img_request)
1626 {
1627         set_bit(IMG_REQ_WRITE, &img_request->flags);
1628         smp_mb();
1629 }
1630
1631 static bool img_request_write_test(struct rbd_img_request *img_request)
1632 {
1633         smp_mb();
1634         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1635 }
1636
1637 /*
1638  * Set the discard flag when the img_request is an discard request
1639  */
1640 static void img_request_discard_set(struct rbd_img_request *img_request)
1641 {
1642         set_bit(IMG_REQ_DISCARD, &img_request->flags);
1643         smp_mb();
1644 }
1645
1646 static bool img_request_discard_test(struct rbd_img_request *img_request)
1647 {
1648         smp_mb();
1649         return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1650 }
1651
1652 static void img_request_child_set(struct rbd_img_request *img_request)
1653 {
1654         set_bit(IMG_REQ_CHILD, &img_request->flags);
1655         smp_mb();
1656 }
1657
1658 static void img_request_child_clear(struct rbd_img_request *img_request)
1659 {
1660         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1661         smp_mb();
1662 }
1663
1664 static bool img_request_child_test(struct rbd_img_request *img_request)
1665 {
1666         smp_mb();
1667         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1668 }
1669
1670 static void img_request_layered_set(struct rbd_img_request *img_request)
1671 {
1672         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1673         smp_mb();
1674 }
1675
1676 static void img_request_layered_clear(struct rbd_img_request *img_request)
1677 {
1678         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1679         smp_mb();
1680 }
1681
1682 static bool img_request_layered_test(struct rbd_img_request *img_request)
1683 {
1684         smp_mb();
1685         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1686 }
1687
1688 static enum obj_operation_type
1689 rbd_img_request_op_type(struct rbd_img_request *img_request)
1690 {
1691         if (img_request_write_test(img_request))
1692                 return OBJ_OP_WRITE;
1693         else if (img_request_discard_test(img_request))
1694                 return OBJ_OP_DISCARD;
1695         else
1696                 return OBJ_OP_READ;
1697 }
1698
1699 static void
1700 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1701 {
1702         u64 xferred = obj_request->xferred;
1703         u64 length = obj_request->length;
1704
1705         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1706                 obj_request, obj_request->img_request, obj_request->result,
1707                 xferred, length);
1708         /*
1709          * ENOENT means a hole in the image.  We zero-fill the entire
1710          * length of the request.  A short read also implies zero-fill
1711          * to the end of the request.  An error requires the whole
1712          * length of the request to be reported finished with an error
1713          * to the block layer.  In each case we update the xferred
1714          * count to indicate the whole request was satisfied.
1715          */
1716         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1717         if (obj_request->result == -ENOENT) {
1718                 if (obj_request->type == OBJ_REQUEST_BIO)
1719                         zero_bio_chain(obj_request->bio_list, 0);
1720                 else
1721                         zero_pages(obj_request->pages, 0, length);
1722                 obj_request->result = 0;
1723         } else if (xferred < length && !obj_request->result) {
1724                 if (obj_request->type == OBJ_REQUEST_BIO)
1725                         zero_bio_chain(obj_request->bio_list, xferred);
1726                 else
1727                         zero_pages(obj_request->pages, xferred, length);
1728         }
1729         obj_request->xferred = length;
1730         obj_request_done_set(obj_request);
1731 }
1732
1733 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1734 {
1735         dout("%s: obj %p cb %p\n", __func__, obj_request,
1736                 obj_request->callback);
1737         if (obj_request->callback)
1738                 obj_request->callback(obj_request);
1739         else
1740                 complete_all(&obj_request->completion);
1741 }
1742
1743 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1744 {
1745         obj_request->result = err;
1746         obj_request->xferred = 0;
1747         /*
1748          * kludge - mirror rbd_obj_request_submit() to match a put in
1749          * rbd_img_obj_callback()
1750          */
1751         if (obj_request_img_data_test(obj_request)) {
1752                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1753                 rbd_img_request_get(obj_request->img_request);
1754         }
1755         obj_request_done_set(obj_request);
1756         rbd_obj_request_complete(obj_request);
1757 }
1758
1759 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1760 {
1761         struct rbd_img_request *img_request = NULL;
1762         struct rbd_device *rbd_dev = NULL;
1763         bool layered = false;
1764
1765         if (obj_request_img_data_test(obj_request)) {
1766                 img_request = obj_request->img_request;
1767                 layered = img_request && img_request_layered_test(img_request);
1768                 rbd_dev = img_request->rbd_dev;
1769         }
1770
1771         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1772                 obj_request, img_request, obj_request->result,
1773                 obj_request->xferred, obj_request->length);
1774         if (layered && obj_request->result == -ENOENT &&
1775                         obj_request->img_offset < rbd_dev->parent_overlap)
1776                 rbd_img_parent_read(obj_request);
1777         else if (img_request)
1778                 rbd_img_obj_request_read_callback(obj_request);
1779         else
1780                 obj_request_done_set(obj_request);
1781 }
1782
1783 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1784 {
1785         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1786                 obj_request->result, obj_request->length);
1787         /*
1788          * There is no such thing as a successful short write.  Set
1789          * it to our originally-requested length.
1790          */
1791         obj_request->xferred = obj_request->length;
1792         obj_request_done_set(obj_request);
1793 }
1794
1795 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1796 {
1797         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1798                 obj_request->result, obj_request->length);
1799         /*
1800          * There is no such thing as a successful short discard.  Set
1801          * it to our originally-requested length.
1802          */
1803         obj_request->xferred = obj_request->length;
1804         /* discarding a non-existent object is not a problem */
1805         if (obj_request->result == -ENOENT)
1806                 obj_request->result = 0;
1807         obj_request_done_set(obj_request);
1808 }
1809
1810 /*
1811  * For a simple stat call there's nothing to do.  We'll do more if
1812  * this is part of a write sequence for a layered image.
1813  */
1814 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1815 {
1816         dout("%s: obj %p\n", __func__, obj_request);
1817         obj_request_done_set(obj_request);
1818 }
1819
1820 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1821 {
1822         dout("%s: obj %p\n", __func__, obj_request);
1823
1824         if (obj_request_img_data_test(obj_request))
1825                 rbd_osd_copyup_callback(obj_request);
1826         else
1827                 obj_request_done_set(obj_request);
1828 }
1829
1830 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1831 {
1832         struct rbd_obj_request *obj_request = osd_req->r_priv;
1833         u16 opcode;
1834
1835         dout("%s: osd_req %p\n", __func__, osd_req);
1836         rbd_assert(osd_req == obj_request->osd_req);
1837         if (obj_request_img_data_test(obj_request)) {
1838                 rbd_assert(obj_request->img_request);
1839                 rbd_assert(obj_request->which != BAD_WHICH);
1840         } else {
1841                 rbd_assert(obj_request->which == BAD_WHICH);
1842         }
1843
1844         if (osd_req->r_result < 0)
1845                 obj_request->result = osd_req->r_result;
1846
1847         /*
1848          * We support a 64-bit length, but ultimately it has to be
1849          * passed to the block layer, which just supports a 32-bit
1850          * length field.
1851          */
1852         obj_request->xferred = osd_req->r_ops[0].outdata_len;
1853         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1854
1855         opcode = osd_req->r_ops[0].op;
1856         switch (opcode) {
1857         case CEPH_OSD_OP_READ:
1858                 rbd_osd_read_callback(obj_request);
1859                 break;
1860         case CEPH_OSD_OP_SETALLOCHINT:
1861                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1862                            osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1863                 /* fall through */
1864         case CEPH_OSD_OP_WRITE:
1865         case CEPH_OSD_OP_WRITEFULL:
1866                 rbd_osd_write_callback(obj_request);
1867                 break;
1868         case CEPH_OSD_OP_STAT:
1869                 rbd_osd_stat_callback(obj_request);
1870                 break;
1871         case CEPH_OSD_OP_DELETE:
1872         case CEPH_OSD_OP_TRUNCATE:
1873         case CEPH_OSD_OP_ZERO:
1874                 rbd_osd_discard_callback(obj_request);
1875                 break;
1876         case CEPH_OSD_OP_CALL:
1877                 rbd_osd_call_callback(obj_request);
1878                 break;
1879         default:
1880                 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1881                          obj_request->object_no, opcode);
1882                 break;
1883         }
1884
1885         if (obj_request_done_test(obj_request))
1886                 rbd_obj_request_complete(obj_request);
1887 }
1888
1889 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1890 {
1891         struct ceph_osd_request *osd_req = obj_request->osd_req;
1892
1893         rbd_assert(obj_request_img_data_test(obj_request));
1894         osd_req->r_snapid = obj_request->img_request->snap_id;
1895 }
1896
1897 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1898 {
1899         struct ceph_osd_request *osd_req = obj_request->osd_req;
1900
1901         ktime_get_real_ts(&osd_req->r_mtime);
1902         osd_req->r_data_offset = obj_request->offset;
1903 }
1904
1905 static struct ceph_osd_request *
1906 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1907                      struct ceph_snap_context *snapc,
1908                      int num_ops, unsigned int flags,
1909                      struct rbd_obj_request *obj_request)
1910 {
1911         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1912         struct ceph_osd_request *req;
1913         const char *name_format = rbd_dev->image_format == 1 ?
1914                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1915
1916         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1917         if (!req)
1918                 return NULL;
1919
1920         req->r_flags = flags;
1921         req->r_callback = rbd_osd_req_callback;
1922         req->r_priv = obj_request;
1923
1924         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1925         if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1926                         rbd_dev->header.object_prefix, obj_request->object_no))
1927                 goto err_req;
1928
1929         if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1930                 goto err_req;
1931
1932         return req;
1933
1934 err_req:
1935         ceph_osdc_put_request(req);
1936         return NULL;
1937 }
1938
1939 /*
1940  * Create an osd request.  A read request has one osd op (read).
1941  * A write request has either one (watch) or two (hint+write) osd ops.
1942  * (All rbd data writes are prefixed with an allocation hint op, but
1943  * technically osd watch is a write request, hence this distinction.)
1944  */
1945 static struct ceph_osd_request *rbd_osd_req_create(
1946                                         struct rbd_device *rbd_dev,
1947                                         enum obj_operation_type op_type,
1948                                         unsigned int num_ops,
1949                                         struct rbd_obj_request *obj_request)
1950 {
1951         struct ceph_snap_context *snapc = NULL;
1952
1953         if (obj_request_img_data_test(obj_request) &&
1954                 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1955                 struct rbd_img_request *img_request = obj_request->img_request;
1956                 if (op_type == OBJ_OP_WRITE) {
1957                         rbd_assert(img_request_write_test(img_request));
1958                 } else {
1959                         rbd_assert(img_request_discard_test(img_request));
1960                 }
1961                 snapc = img_request->snapc;
1962         }
1963
1964         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1965
1966         return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1967             (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1968             CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1969 }
1970
1971 /*
1972  * Create a copyup osd request based on the information in the object
1973  * request supplied.  A copyup request has two or three osd ops, a
1974  * copyup method call, potentially a hint op, and a write or truncate
1975  * or zero op.
1976  */
1977 static struct ceph_osd_request *
1978 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1979 {
1980         struct rbd_img_request *img_request;
1981         int num_osd_ops = 3;
1982
1983         rbd_assert(obj_request_img_data_test(obj_request));
1984         img_request = obj_request->img_request;
1985         rbd_assert(img_request);
1986         rbd_assert(img_request_write_test(img_request) ||
1987                         img_request_discard_test(img_request));
1988
1989         if (img_request_discard_test(img_request))
1990                 num_osd_ops = 2;
1991
1992         return __rbd_osd_req_create(img_request->rbd_dev,
1993                                     img_request->snapc, num_osd_ops,
1994                                     CEPH_OSD_FLAG_WRITE, obj_request);
1995 }
1996
1997 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1998 {
1999         ceph_osdc_put_request(osd_req);
2000 }
2001
2002 static struct rbd_obj_request *
2003 rbd_obj_request_create(enum obj_request_type type)
2004 {
2005         struct rbd_obj_request *obj_request;
2006
2007         rbd_assert(obj_request_type_valid(type));
2008
2009         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2010         if (!obj_request)
2011                 return NULL;
2012
2013         obj_request->which = BAD_WHICH;
2014         obj_request->type = type;
2015         INIT_LIST_HEAD(&obj_request->links);
2016         init_completion(&obj_request->completion);
2017         kref_init(&obj_request->kref);
2018
2019         dout("%s %p\n", __func__, obj_request);
2020         return obj_request;
2021 }
2022
2023 static void rbd_obj_request_destroy(struct kref *kref)
2024 {
2025         struct rbd_obj_request *obj_request;
2026
2027         obj_request = container_of(kref, struct rbd_obj_request, kref);
2028
2029         dout("%s: obj %p\n", __func__, obj_request);
2030
2031         rbd_assert(obj_request->img_request == NULL);
2032         rbd_assert(obj_request->which == BAD_WHICH);
2033
2034         if (obj_request->osd_req)
2035                 rbd_osd_req_destroy(obj_request->osd_req);
2036
2037         rbd_assert(obj_request_type_valid(obj_request->type));
2038         switch (obj_request->type) {
2039         case OBJ_REQUEST_NODATA:
2040                 break;          /* Nothing to do */
2041         case OBJ_REQUEST_BIO:
2042                 if (obj_request->bio_list)
2043                         bio_chain_put(obj_request->bio_list);
2044                 break;
2045         case OBJ_REQUEST_PAGES:
2046                 /* img_data requests don't own their page array */
2047                 if (obj_request->pages &&
2048                     !obj_request_img_data_test(obj_request))
2049                         ceph_release_page_vector(obj_request->pages,
2050                                                 obj_request->page_count);
2051                 break;
2052         }
2053
2054         kmem_cache_free(rbd_obj_request_cache, obj_request);
2055 }
2056
2057 /* It's OK to call this for a device with no parent */
2058
2059 static void rbd_spec_put(struct rbd_spec *spec);
2060 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2061 {
2062         rbd_dev_remove_parent(rbd_dev);
2063         rbd_spec_put(rbd_dev->parent_spec);
2064         rbd_dev->parent_spec = NULL;
2065         rbd_dev->parent_overlap = 0;
2066 }
2067
2068 /*
2069  * Parent image reference counting is used to determine when an
2070  * image's parent fields can be safely torn down--after there are no
2071  * more in-flight requests to the parent image.  When the last
2072  * reference is dropped, cleaning them up is safe.
2073  */
2074 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2075 {
2076         int counter;
2077
2078         if (!rbd_dev->parent_spec)
2079                 return;
2080
2081         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2082         if (counter > 0)
2083                 return;
2084
2085         /* Last reference; clean up parent data structures */
2086
2087         if (!counter)
2088                 rbd_dev_unparent(rbd_dev);
2089         else
2090                 rbd_warn(rbd_dev, "parent reference underflow");
2091 }
2092
2093 /*
2094  * If an image has a non-zero parent overlap, get a reference to its
2095  * parent.
2096  *
2097  * Returns true if the rbd device has a parent with a non-zero
2098  * overlap and a reference for it was successfully taken, or
2099  * false otherwise.
2100  */
2101 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2102 {
2103         int counter = 0;
2104
2105         if (!rbd_dev->parent_spec)
2106                 return false;
2107
2108         down_read(&rbd_dev->header_rwsem);
2109         if (rbd_dev->parent_overlap)
2110                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2111         up_read(&rbd_dev->header_rwsem);
2112
2113         if (counter < 0)
2114                 rbd_warn(rbd_dev, "parent reference overflow");
2115
2116         return counter > 0;
2117 }
2118
2119 /*
2120  * Caller is responsible for filling in the list of object requests
2121  * that comprises the image request, and the Linux request pointer
2122  * (if there is one).
2123  */
2124 static struct rbd_img_request *rbd_img_request_create(
2125                                         struct rbd_device *rbd_dev,
2126                                         u64 offset, u64 length,
2127                                         enum obj_operation_type op_type,
2128                                         struct ceph_snap_context *snapc)
2129 {
2130         struct rbd_img_request *img_request;
2131
2132         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2133         if (!img_request)
2134                 return NULL;
2135
2136         img_request->rq = NULL;
2137         img_request->rbd_dev = rbd_dev;
2138         img_request->offset = offset;
2139         img_request->length = length;
2140         img_request->flags = 0;
2141         if (op_type == OBJ_OP_DISCARD) {
2142                 img_request_discard_set(img_request);
2143                 img_request->snapc = snapc;
2144         } else if (op_type == OBJ_OP_WRITE) {
2145                 img_request_write_set(img_request);
2146                 img_request->snapc = snapc;
2147         } else {
2148                 img_request->snap_id = rbd_dev->spec->snap_id;
2149         }
2150         if (rbd_dev_parent_get(rbd_dev))
2151                 img_request_layered_set(img_request);
2152         spin_lock_init(&img_request->completion_lock);
2153         img_request->next_completion = 0;
2154         img_request->callback = NULL;
2155         img_request->result = 0;
2156         img_request->obj_request_count = 0;
2157         INIT_LIST_HEAD(&img_request->obj_requests);
2158         kref_init(&img_request->kref);
2159
2160         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2161                 obj_op_name(op_type), offset, length, img_request);
2162
2163         return img_request;
2164 }
2165
2166 static void rbd_img_request_destroy(struct kref *kref)
2167 {
2168         struct rbd_img_request *img_request;
2169         struct rbd_obj_request *obj_request;
2170         struct rbd_obj_request *next_obj_request;
2171
2172         img_request = container_of(kref, struct rbd_img_request, kref);
2173
2174         dout("%s: img %p\n", __func__, img_request);
2175
2176         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2177                 rbd_img_obj_request_del(img_request, obj_request);
2178         rbd_assert(img_request->obj_request_count == 0);
2179
2180         if (img_request_layered_test(img_request)) {
2181                 img_request_layered_clear(img_request);
2182                 rbd_dev_parent_put(img_request->rbd_dev);
2183         }
2184
2185         if (img_request_write_test(img_request) ||
2186                 img_request_discard_test(img_request))
2187                 ceph_put_snap_context(img_request->snapc);
2188
2189         kmem_cache_free(rbd_img_request_cache, img_request);
2190 }
2191
2192 static struct rbd_img_request *rbd_parent_request_create(
2193                                         struct rbd_obj_request *obj_request,
2194                                         u64 img_offset, u64 length)
2195 {
2196         struct rbd_img_request *parent_request;
2197         struct rbd_device *rbd_dev;
2198
2199         rbd_assert(obj_request->img_request);
2200         rbd_dev = obj_request->img_request->rbd_dev;
2201
2202         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2203                                                 length, OBJ_OP_READ, NULL);
2204         if (!parent_request)
2205                 return NULL;
2206
2207         img_request_child_set(parent_request);
2208         rbd_obj_request_get(obj_request);
2209         parent_request->obj_request = obj_request;
2210
2211         return parent_request;
2212 }
2213
2214 static void rbd_parent_request_destroy(struct kref *kref)
2215 {
2216         struct rbd_img_request *parent_request;
2217         struct rbd_obj_request *orig_request;
2218
2219         parent_request = container_of(kref, struct rbd_img_request, kref);
2220         orig_request = parent_request->obj_request;
2221
2222         parent_request->obj_request = NULL;
2223         rbd_obj_request_put(orig_request);
2224         img_request_child_clear(parent_request);
2225
2226         rbd_img_request_destroy(kref);
2227 }
2228
2229 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2230 {
2231         struct rbd_img_request *img_request;
2232         unsigned int xferred;
2233         int result;
2234         bool more;
2235
2236         rbd_assert(obj_request_img_data_test(obj_request));
2237         img_request = obj_request->img_request;
2238
2239         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2240         xferred = (unsigned int)obj_request->xferred;
2241         result = obj_request->result;
2242         if (result) {
2243                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2244                 enum obj_operation_type op_type;
2245
2246                 if (img_request_discard_test(img_request))
2247                         op_type = OBJ_OP_DISCARD;
2248                 else if (img_request_write_test(img_request))
2249                         op_type = OBJ_OP_WRITE;
2250                 else
2251                         op_type = OBJ_OP_READ;
2252
2253                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2254                         obj_op_name(op_type), obj_request->length,
2255                         obj_request->img_offset, obj_request->offset);
2256                 rbd_warn(rbd_dev, "  result %d xferred %x",
2257                         result, xferred);
2258                 if (!img_request->result)
2259                         img_request->result = result;
2260                 /*
2261                  * Need to end I/O on the entire obj_request worth of
2262                  * bytes in case of error.
2263                  */
2264                 xferred = obj_request->length;
2265         }
2266
2267         if (img_request_child_test(img_request)) {
2268                 rbd_assert(img_request->obj_request != NULL);
2269                 more = obj_request->which < img_request->obj_request_count - 1;
2270         } else {
2271                 blk_status_t status = errno_to_blk_status(result);
2272
2273                 rbd_assert(img_request->rq != NULL);
2274
2275                 more = blk_update_request(img_request->rq, status, xferred);
2276                 if (!more)
2277                         __blk_mq_end_request(img_request->rq, status);
2278         }
2279
2280         return more;
2281 }
2282
2283 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2284 {
2285         struct rbd_img_request *img_request;
2286         u32 which = obj_request->which;
2287         bool more = true;
2288
2289         rbd_assert(obj_request_img_data_test(obj_request));
2290         img_request = obj_request->img_request;
2291
2292         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2293         rbd_assert(img_request != NULL);
2294         rbd_assert(img_request->obj_request_count > 0);
2295         rbd_assert(which != BAD_WHICH);
2296         rbd_assert(which < img_request->obj_request_count);
2297
2298         spin_lock_irq(&img_request->completion_lock);
2299         if (which != img_request->next_completion)
2300                 goto out;
2301
2302         for_each_obj_request_from(img_request, obj_request) {
2303                 rbd_assert(more);
2304                 rbd_assert(which < img_request->obj_request_count);
2305
2306                 if (!obj_request_done_test(obj_request))
2307                         break;
2308                 more = rbd_img_obj_end_request(obj_request);
2309                 which++;
2310         }
2311
2312         rbd_assert(more ^ (which == img_request->obj_request_count));
2313         img_request->next_completion = which;
2314 out:
2315         spin_unlock_irq(&img_request->completion_lock);
2316         rbd_img_request_put(img_request);
2317
2318         if (!more)
2319                 rbd_img_request_complete(img_request);
2320 }
2321
2322 /*
2323  * Add individual osd ops to the given ceph_osd_request and prepare
2324  * them for submission. num_ops is the current number of
2325  * osd operations already to the object request.
2326  */
2327 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2328                                 struct ceph_osd_request *osd_request,
2329                                 enum obj_operation_type op_type,
2330                                 unsigned int num_ops)
2331 {
2332         struct rbd_img_request *img_request = obj_request->img_request;
2333         struct rbd_device *rbd_dev = img_request->rbd_dev;
2334         u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2335         u64 offset = obj_request->offset;
2336         u64 length = obj_request->length;
2337         u64 img_end;
2338         u16 opcode;
2339
2340         if (op_type == OBJ_OP_DISCARD) {
2341                 if (!offset && length == object_size &&
2342                     (!img_request_layered_test(img_request) ||
2343                      !obj_request_overlaps_parent(obj_request))) {
2344                         opcode = CEPH_OSD_OP_DELETE;
2345                 } else if ((offset + length == object_size)) {
2346                         opcode = CEPH_OSD_OP_TRUNCATE;
2347                 } else {
2348                         down_read(&rbd_dev->header_rwsem);
2349                         img_end = rbd_dev->header.image_size;
2350                         up_read(&rbd_dev->header_rwsem);
2351
2352                         if (obj_request->img_offset + length == img_end)
2353                                 opcode = CEPH_OSD_OP_TRUNCATE;
2354                         else
2355                                 opcode = CEPH_OSD_OP_ZERO;
2356                 }
2357         } else if (op_type == OBJ_OP_WRITE) {
2358                 if (!offset && length == object_size)
2359                         opcode = CEPH_OSD_OP_WRITEFULL;
2360                 else
2361                         opcode = CEPH_OSD_OP_WRITE;
2362                 osd_req_op_alloc_hint_init(osd_request, num_ops,
2363                                         object_size, object_size);
2364                 num_ops++;
2365         } else {
2366                 opcode = CEPH_OSD_OP_READ;
2367         }
2368
2369         if (opcode == CEPH_OSD_OP_DELETE)
2370                 osd_req_op_init(osd_request, num_ops, opcode, 0);
2371         else
2372                 osd_req_op_extent_init(osd_request, num_ops, opcode,
2373                                        offset, length, 0, 0);
2374
2375         if (obj_request->type == OBJ_REQUEST_BIO)
2376                 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2377                                         obj_request->bio_list, length);
2378         else if (obj_request->type == OBJ_REQUEST_PAGES)
2379                 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2380                                         obj_request->pages, length,
2381                                         offset & ~PAGE_MASK, false, false);
2382
2383         /* Discards are also writes */
2384         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2385                 rbd_osd_req_format_write(obj_request);
2386         else
2387                 rbd_osd_req_format_read(obj_request);
2388 }
2389
2390 /*
2391  * Split up an image request into one or more object requests, each
2392  * to a different object.  The "type" parameter indicates whether
2393  * "data_desc" is the pointer to the head of a list of bio
2394  * structures, or the base of a page array.  In either case this
2395  * function assumes data_desc describes memory sufficient to hold
2396  * all data described by the image request.
2397  */
2398 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2399                                         enum obj_request_type type,
2400                                         void *data_desc)
2401 {
2402         struct rbd_device *rbd_dev = img_request->rbd_dev;
2403         struct rbd_obj_request *obj_request = NULL;
2404         struct rbd_obj_request *next_obj_request;
2405         struct bio *bio_list = NULL;
2406         unsigned int bio_offset = 0;
2407         struct page **pages = NULL;
2408         enum obj_operation_type op_type;
2409         u64 img_offset;
2410         u64 resid;
2411
2412         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2413                 (int)type, data_desc);
2414
2415         img_offset = img_request->offset;
2416         resid = img_request->length;
2417         rbd_assert(resid > 0);
2418         op_type = rbd_img_request_op_type(img_request);
2419
2420         if (type == OBJ_REQUEST_BIO) {
2421                 bio_list = data_desc;
2422                 rbd_assert(img_offset ==
2423                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2424         } else if (type == OBJ_REQUEST_PAGES) {
2425                 pages = data_desc;
2426         }
2427
2428         while (resid) {
2429                 struct ceph_osd_request *osd_req;
2430                 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2431                 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2432                 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2433
2434                 obj_request = rbd_obj_request_create(type);
2435                 if (!obj_request)
2436                         goto out_unwind;
2437
2438                 obj_request->object_no = object_no;
2439                 obj_request->offset = offset;
2440                 obj_request->length = length;
2441
2442                 /*
2443                  * set obj_request->img_request before creating the
2444                  * osd_request so that it gets the right snapc
2445                  */
2446                 rbd_img_obj_request_add(img_request, obj_request);
2447
2448                 if (type == OBJ_REQUEST_BIO) {
2449                         unsigned int clone_size;
2450
2451                         rbd_assert(length <= (u64)UINT_MAX);
2452                         clone_size = (unsigned int)length;
2453                         obj_request->bio_list =
2454                                         bio_chain_clone_range(&bio_list,
2455                                                                 &bio_offset,
2456                                                                 clone_size,
2457                                                                 GFP_NOIO);
2458                         if (!obj_request->bio_list)
2459                                 goto out_unwind;
2460                 } else if (type == OBJ_REQUEST_PAGES) {
2461                         unsigned int page_count;
2462
2463                         obj_request->pages = pages;
2464                         page_count = (u32)calc_pages_for(offset, length);
2465                         obj_request->page_count = page_count;
2466                         if ((offset + length) & ~PAGE_MASK)
2467                                 page_count--;   /* more on last page */
2468                         pages += page_count;
2469                 }
2470
2471                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2472                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2473                                         obj_request);
2474                 if (!osd_req)
2475                         goto out_unwind;
2476
2477                 obj_request->osd_req = osd_req;
2478                 obj_request->callback = rbd_img_obj_callback;
2479                 obj_request->img_offset = img_offset;
2480
2481                 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2482
2483                 img_offset += length;
2484                 resid -= length;
2485         }
2486
2487         return 0;
2488
2489 out_unwind:
2490         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2491                 rbd_img_obj_request_del(img_request, obj_request);
2492
2493         return -ENOMEM;
2494 }
2495
2496 static void
2497 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2498 {
2499         struct rbd_img_request *img_request;
2500         struct rbd_device *rbd_dev;
2501         struct page **pages;
2502         u32 page_count;
2503
2504         dout("%s: obj %p\n", __func__, obj_request);
2505
2506         rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2507                 obj_request->type == OBJ_REQUEST_NODATA);
2508         rbd_assert(obj_request_img_data_test(obj_request));
2509         img_request = obj_request->img_request;
2510         rbd_assert(img_request);
2511
2512         rbd_dev = img_request->rbd_dev;
2513         rbd_assert(rbd_dev);
2514
2515         pages = obj_request->copyup_pages;
2516         rbd_assert(pages != NULL);
2517         obj_request->copyup_pages = NULL;
2518         page_count = obj_request->copyup_page_count;
2519         rbd_assert(page_count);
2520         obj_request->copyup_page_count = 0;
2521         ceph_release_page_vector(pages, page_count);
2522
2523         /*
2524          * We want the transfer count to reflect the size of the
2525          * original write request.  There is no such thing as a
2526          * successful short write, so if the request was successful
2527          * we can just set it to the originally-requested length.
2528          */
2529         if (!obj_request->result)
2530                 obj_request->xferred = obj_request->length;
2531
2532         obj_request_done_set(obj_request);
2533 }
2534
2535 static void
2536 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2537 {
2538         struct rbd_obj_request *orig_request;
2539         struct ceph_osd_request *osd_req;
2540         struct rbd_device *rbd_dev;
2541         struct page **pages;
2542         enum obj_operation_type op_type;
2543         u32 page_count;
2544         int img_result;
2545         u64 parent_length;
2546
2547         rbd_assert(img_request_child_test(img_request));
2548
2549         /* First get what we need from the image request */
2550
2551         pages = img_request->copyup_pages;
2552         rbd_assert(pages != NULL);
2553         img_request->copyup_pages = NULL;
2554         page_count = img_request->copyup_page_count;
2555         rbd_assert(page_count);
2556         img_request->copyup_page_count = 0;
2557
2558         orig_request = img_request->obj_request;
2559         rbd_assert(orig_request != NULL);
2560         rbd_assert(obj_request_type_valid(orig_request->type));
2561         img_result = img_request->result;
2562         parent_length = img_request->length;
2563         rbd_assert(img_result || parent_length == img_request->xferred);
2564         rbd_img_request_put(img_request);
2565
2566         rbd_assert(orig_request->img_request);
2567         rbd_dev = orig_request->img_request->rbd_dev;
2568         rbd_assert(rbd_dev);
2569
2570         /*
2571          * If the overlap has become 0 (most likely because the
2572          * image has been flattened) we need to free the pages
2573          * and re-submit the original write request.
2574          */
2575         if (!rbd_dev->parent_overlap) {
2576                 ceph_release_page_vector(pages, page_count);
2577                 rbd_obj_request_submit(orig_request);
2578                 return;
2579         }
2580
2581         if (img_result)
2582                 goto out_err;
2583
2584         /*
2585          * The original osd request is of no use to use any more.
2586          * We need a new one that can hold the three ops in a copyup
2587          * request.  Allocate the new copyup osd request for the
2588          * original request, and release the old one.
2589          */
2590         img_result = -ENOMEM;
2591         osd_req = rbd_osd_req_create_copyup(orig_request);
2592         if (!osd_req)
2593                 goto out_err;
2594         rbd_osd_req_destroy(orig_request->osd_req);
2595         orig_request->osd_req = osd_req;
2596         orig_request->copyup_pages = pages;
2597         orig_request->copyup_page_count = page_count;
2598
2599         /* Initialize the copyup op */
2600
2601         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2602         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2603                                                 false, false);
2604
2605         /* Add the other op(s) */
2606
2607         op_type = rbd_img_request_op_type(orig_request->img_request);
2608         rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2609
2610         /* All set, send it off. */
2611
2612         rbd_obj_request_submit(orig_request);
2613         return;
2614
2615 out_err:
2616         ceph_release_page_vector(pages, page_count);
2617         rbd_obj_request_error(orig_request, img_result);
2618 }
2619
2620 /*
2621  * Read from the parent image the range of data that covers the
2622  * entire target of the given object request.  This is used for
2623  * satisfying a layered image write request when the target of an
2624  * object request from the image request does not exist.
2625  *
2626  * A page array big enough to hold the returned data is allocated
2627  * and supplied to rbd_img_request_fill() as the "data descriptor."
2628  * When the read completes, this page array will be transferred to
2629  * the original object request for the copyup operation.
2630  *
2631  * If an error occurs, it is recorded as the result of the original
2632  * object request in rbd_img_obj_exists_callback().
2633  */
2634 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2635 {
2636         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2637         struct rbd_img_request *parent_request = NULL;
2638         u64 img_offset;
2639         u64 length;
2640         struct page **pages = NULL;
2641         u32 page_count;
2642         int result;
2643
2644         rbd_assert(rbd_dev->parent != NULL);
2645
2646         /*
2647          * Determine the byte range covered by the object in the
2648          * child image to which the original request was to be sent.
2649          */
2650         img_offset = obj_request->img_offset - obj_request->offset;
2651         length = rbd_obj_bytes(&rbd_dev->header);
2652
2653         /*
2654          * There is no defined parent data beyond the parent
2655          * overlap, so limit what we read at that boundary if
2656          * necessary.
2657          */
2658         if (img_offset + length > rbd_dev->parent_overlap) {
2659                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2660                 length = rbd_dev->parent_overlap - img_offset;
2661         }
2662
2663         /*
2664          * Allocate a page array big enough to receive the data read
2665          * from the parent.
2666          */
2667         page_count = (u32)calc_pages_for(0, length);
2668         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2669         if (IS_ERR(pages)) {
2670                 result = PTR_ERR(pages);
2671                 pages = NULL;
2672                 goto out_err;
2673         }
2674
2675         result = -ENOMEM;
2676         parent_request = rbd_parent_request_create(obj_request,
2677                                                 img_offset, length);
2678         if (!parent_request)
2679                 goto out_err;
2680
2681         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2682         if (result)
2683                 goto out_err;
2684
2685         parent_request->copyup_pages = pages;
2686         parent_request->copyup_page_count = page_count;
2687         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2688
2689         result = rbd_img_request_submit(parent_request);
2690         if (!result)
2691                 return 0;
2692
2693         parent_request->copyup_pages = NULL;
2694         parent_request->copyup_page_count = 0;
2695         parent_request->obj_request = NULL;
2696         rbd_obj_request_put(obj_request);
2697 out_err:
2698         if (pages)
2699                 ceph_release_page_vector(pages, page_count);
2700         if (parent_request)
2701                 rbd_img_request_put(parent_request);
2702         return result;
2703 }
2704
2705 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2706 {
2707         struct rbd_obj_request *orig_request;
2708         struct rbd_device *rbd_dev;
2709         int result;
2710
2711         rbd_assert(!obj_request_img_data_test(obj_request));
2712
2713         /*
2714          * All we need from the object request is the original
2715          * request and the result of the STAT op.  Grab those, then
2716          * we're done with the request.
2717          */
2718         orig_request = obj_request->obj_request;
2719         obj_request->obj_request = NULL;
2720         rbd_obj_request_put(orig_request);
2721         rbd_assert(orig_request);
2722         rbd_assert(orig_request->img_request);
2723
2724         result = obj_request->result;
2725         obj_request->result = 0;
2726
2727         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2728                 obj_request, orig_request, result,
2729                 obj_request->xferred, obj_request->length);
2730         rbd_obj_request_put(obj_request);
2731
2732         /*
2733          * If the overlap has become 0 (most likely because the
2734          * image has been flattened) we need to re-submit the
2735          * original request.
2736          */
2737         rbd_dev = orig_request->img_request->rbd_dev;
2738         if (!rbd_dev->parent_overlap) {
2739                 rbd_obj_request_submit(orig_request);
2740                 return;
2741         }
2742
2743         /*
2744          * Our only purpose here is to determine whether the object
2745          * exists, and we don't want to treat the non-existence as
2746          * an error.  If something else comes back, transfer the
2747          * error to the original request and complete it now.
2748          */
2749         if (!result) {
2750                 obj_request_existence_set(orig_request, true);
2751         } else if (result == -ENOENT) {
2752                 obj_request_existence_set(orig_request, false);
2753         } else {
2754                 goto fail_orig_request;
2755         }
2756
2757         /*
2758          * Resubmit the original request now that we have recorded
2759          * whether the target object exists.
2760          */
2761         result = rbd_img_obj_request_submit(orig_request);
2762         if (result)
2763                 goto fail_orig_request;
2764
2765         return;
2766
2767 fail_orig_request:
2768         rbd_obj_request_error(orig_request, result);
2769 }
2770
2771 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2772 {
2773         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2774         struct rbd_obj_request *stat_request;
2775         struct page **pages;
2776         u32 page_count;
2777         size_t size;
2778         int ret;
2779
2780         stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2781         if (!stat_request)
2782                 return -ENOMEM;
2783
2784         stat_request->object_no = obj_request->object_no;
2785
2786         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2787                                                    stat_request);
2788         if (!stat_request->osd_req) {
2789                 ret = -ENOMEM;
2790                 goto fail_stat_request;
2791         }
2792
2793         /*
2794          * The response data for a STAT call consists of:
2795          *     le64 length;
2796          *     struct {
2797          *         le32 tv_sec;
2798          *         le32 tv_nsec;
2799          *     } mtime;
2800          */
2801         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2802         page_count = (u32)calc_pages_for(0, size);
2803         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2804         if (IS_ERR(pages)) {
2805                 ret = PTR_ERR(pages);
2806                 goto fail_stat_request;
2807         }
2808
2809         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2810         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2811                                      false, false);
2812
2813         rbd_obj_request_get(obj_request);
2814         stat_request->obj_request = obj_request;
2815         stat_request->pages = pages;
2816         stat_request->page_count = page_count;
2817         stat_request->callback = rbd_img_obj_exists_callback;
2818
2819         rbd_obj_request_submit(stat_request);
2820         return 0;
2821
2822 fail_stat_request:
2823         rbd_obj_request_put(stat_request);
2824         return ret;
2825 }
2826
2827 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2828 {
2829         struct rbd_img_request *img_request = obj_request->img_request;
2830         struct rbd_device *rbd_dev = img_request->rbd_dev;
2831
2832         /* Reads */
2833         if (!img_request_write_test(img_request) &&
2834             !img_request_discard_test(img_request))
2835                 return true;
2836
2837         /* Non-layered writes */
2838         if (!img_request_layered_test(img_request))
2839                 return true;
2840
2841         /*
2842          * Layered writes outside of the parent overlap range don't
2843          * share any data with the parent.
2844          */
2845         if (!obj_request_overlaps_parent(obj_request))
2846                 return true;
2847
2848         /*
2849          * Entire-object layered writes - we will overwrite whatever
2850          * parent data there is anyway.
2851          */
2852         if (!obj_request->offset &&
2853             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2854                 return true;
2855
2856         /*
2857          * If the object is known to already exist, its parent data has
2858          * already been copied.
2859          */
2860         if (obj_request_known_test(obj_request) &&
2861             obj_request_exists_test(obj_request))
2862                 return true;
2863
2864         return false;
2865 }
2866
2867 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2868 {
2869         rbd_assert(obj_request_img_data_test(obj_request));
2870         rbd_assert(obj_request_type_valid(obj_request->type));
2871         rbd_assert(obj_request->img_request);
2872
2873         if (img_obj_request_simple(obj_request)) {
2874                 rbd_obj_request_submit(obj_request);
2875                 return 0;
2876         }
2877
2878         /*
2879          * It's a layered write.  The target object might exist but
2880          * we may not know that yet.  If we know it doesn't exist,
2881          * start by reading the data for the full target object from
2882          * the parent so we can use it for a copyup to the target.
2883          */
2884         if (obj_request_known_test(obj_request))
2885                 return rbd_img_obj_parent_read_full(obj_request);
2886
2887         /* We don't know whether the target exists.  Go find out. */
2888
2889         return rbd_img_obj_exists_submit(obj_request);
2890 }
2891
2892 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2893 {
2894         struct rbd_obj_request *obj_request;
2895         struct rbd_obj_request *next_obj_request;
2896         int ret = 0;
2897
2898         dout("%s: img %p\n", __func__, img_request);
2899
2900         rbd_img_request_get(img_request);
2901         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2902                 ret = rbd_img_obj_request_submit(obj_request);
2903                 if (ret)
2904                         goto out_put_ireq;
2905         }
2906
2907 out_put_ireq:
2908         rbd_img_request_put(img_request);
2909         return ret;
2910 }
2911
2912 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2913 {
2914         struct rbd_obj_request *obj_request;
2915         struct rbd_device *rbd_dev;
2916         u64 obj_end;
2917         u64 img_xferred;
2918         int img_result;
2919
2920         rbd_assert(img_request_child_test(img_request));
2921
2922         /* First get what we need from the image request and release it */
2923
2924         obj_request = img_request->obj_request;
2925         img_xferred = img_request->xferred;
2926         img_result = img_request->result;
2927         rbd_img_request_put(img_request);
2928
2929         /*
2930          * If the overlap has become 0 (most likely because the
2931          * image has been flattened) we need to re-submit the
2932          * original request.
2933          */
2934         rbd_assert(obj_request);
2935         rbd_assert(obj_request->img_request);
2936         rbd_dev = obj_request->img_request->rbd_dev;
2937         if (!rbd_dev->parent_overlap) {
2938                 rbd_obj_request_submit(obj_request);
2939                 return;
2940         }
2941
2942         obj_request->result = img_result;
2943         if (obj_request->result)
2944                 goto out;
2945
2946         /*
2947          * We need to zero anything beyond the parent overlap
2948          * boundary.  Since rbd_img_obj_request_read_callback()
2949          * will zero anything beyond the end of a short read, an
2950          * easy way to do this is to pretend the data from the
2951          * parent came up short--ending at the overlap boundary.
2952          */
2953         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2954         obj_end = obj_request->img_offset + obj_request->length;
2955         if (obj_end > rbd_dev->parent_overlap) {
2956                 u64 xferred = 0;
2957
2958                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2959                         xferred = rbd_dev->parent_overlap -
2960                                         obj_request->img_offset;
2961
2962                 obj_request->xferred = min(img_xferred, xferred);
2963         } else {
2964                 obj_request->xferred = img_xferred;
2965         }
2966 out:
2967         rbd_img_obj_request_read_callback(obj_request);
2968         rbd_obj_request_complete(obj_request);
2969 }
2970
2971 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2972 {
2973         struct rbd_img_request *img_request;
2974         int result;
2975
2976         rbd_assert(obj_request_img_data_test(obj_request));
2977         rbd_assert(obj_request->img_request != NULL);
2978         rbd_assert(obj_request->result == (s32) -ENOENT);
2979         rbd_assert(obj_request_type_valid(obj_request->type));
2980
2981         /* rbd_read_finish(obj_request, obj_request->length); */
2982         img_request = rbd_parent_request_create(obj_request,
2983                                                 obj_request->img_offset,
2984                                                 obj_request->length);
2985         result = -ENOMEM;
2986         if (!img_request)
2987                 goto out_err;
2988
2989         if (obj_request->type == OBJ_REQUEST_BIO)
2990                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2991                                                 obj_request->bio_list);
2992         else
2993                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2994                                                 obj_request->pages);
2995         if (result)
2996                 goto out_err;
2997
2998         img_request->callback = rbd_img_parent_read_callback;
2999         result = rbd_img_request_submit(img_request);
3000         if (result)
3001                 goto out_err;
3002
3003         return;
3004 out_err:
3005         if (img_request)
3006                 rbd_img_request_put(img_request);
3007         obj_request->result = result;
3008         obj_request->xferred = 0;
3009         obj_request_done_set(obj_request);
3010 }
3011
3012 static const struct rbd_client_id rbd_empty_cid;
3013
3014 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3015                           const struct rbd_client_id *rhs)
3016 {
3017         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3018 }
3019
3020 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3021 {
3022         struct rbd_client_id cid;
3023
3024         mutex_lock(&rbd_dev->watch_mutex);
3025         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3026         cid.handle = rbd_dev->watch_cookie;
3027         mutex_unlock(&rbd_dev->watch_mutex);
3028         return cid;
3029 }
3030
3031 /*
3032  * lock_rwsem must be held for write
3033  */
3034 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3035                               const struct rbd_client_id *cid)
3036 {
3037         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3038              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3039              cid->gid, cid->handle);
3040         rbd_dev->owner_cid = *cid; /* struct */
3041 }
3042
3043 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3044 {
3045         mutex_lock(&rbd_dev->watch_mutex);
3046         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3047         mutex_unlock(&rbd_dev->watch_mutex);
3048 }
3049
3050 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3051 {
3052         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3053
3054         strcpy(rbd_dev->lock_cookie, cookie);
3055         rbd_set_owner_cid(rbd_dev, &cid);
3056         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3057 }
3058
3059 /*
3060  * lock_rwsem must be held for write
3061  */
3062 static int rbd_lock(struct rbd_device *rbd_dev)
3063 {
3064         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3065         char cookie[32];
3066         int ret;
3067
3068         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3069                 rbd_dev->lock_cookie[0] != '\0');
3070
3071         format_lock_cookie(rbd_dev, cookie);
3072         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3073                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3074                             RBD_LOCK_TAG, "", 0);
3075         if (ret)
3076                 return ret;
3077
3078         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3079         __rbd_lock(rbd_dev, cookie);
3080         return 0;
3081 }
3082
3083 /*
3084  * lock_rwsem must be held for write
3085  */
3086 static void rbd_unlock(struct rbd_device *rbd_dev)
3087 {
3088         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3089         int ret;
3090
3091         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3092                 rbd_dev->lock_cookie[0] == '\0');
3093
3094         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3095                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3096         if (ret && ret != -ENOENT)
3097                 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3098
3099         /* treat errors as the image is unlocked */
3100         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3101         rbd_dev->lock_cookie[0] = '\0';
3102         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3103         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3104 }
3105
3106 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3107                                 enum rbd_notify_op notify_op,
3108                                 struct page ***preply_pages,
3109                                 size_t *preply_len)
3110 {
3111         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3112         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3113         int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3114         char buf[buf_size];
3115         void *p = buf;
3116
3117         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3118
3119         /* encode *LockPayload NotifyMessage (op + ClientId) */
3120         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3121         ceph_encode_32(&p, notify_op);
3122         ceph_encode_64(&p, cid.gid);
3123         ceph_encode_64(&p, cid.handle);
3124
3125         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3126                                 &rbd_dev->header_oloc, buf, buf_size,
3127                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3128 }
3129
3130 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3131                                enum rbd_notify_op notify_op)
3132 {
3133         struct page **reply_pages;
3134         size_t reply_len;
3135
3136         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3137         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3138 }
3139
3140 static void rbd_notify_acquired_lock(struct work_struct *work)
3141 {
3142         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3143                                                   acquired_lock_work);
3144
3145         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3146 }
3147
3148 static void rbd_notify_released_lock(struct work_struct *work)
3149 {
3150         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3151                                                   released_lock_work);
3152
3153         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3154 }
3155
3156 static int rbd_request_lock(struct rbd_device *rbd_dev)
3157 {
3158         struct page **reply_pages;
3159         size_t reply_len;
3160         bool lock_owner_responded = false;
3161         int ret;
3162
3163         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3164
3165         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3166                                    &reply_pages, &reply_len);
3167         if (ret && ret != -ETIMEDOUT) {
3168                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3169                 goto out;
3170         }
3171
3172         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3173                 void *p = page_address(reply_pages[0]);
3174                 void *const end = p + reply_len;
3175                 u32 n;
3176
3177                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3178                 while (n--) {
3179                         u8 struct_v;
3180                         u32 len;
3181
3182                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3183                         p += 8 + 8; /* skip gid and cookie */
3184
3185                         ceph_decode_32_safe(&p, end, len, e_inval);
3186                         if (!len)
3187                                 continue;
3188
3189                         if (lock_owner_responded) {
3190                                 rbd_warn(rbd_dev,
3191                                          "duplicate lock owners detected");
3192                                 ret = -EIO;
3193                                 goto out;
3194                         }
3195
3196                         lock_owner_responded = true;
3197                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3198                                                   &struct_v, &len);
3199                         if (ret) {
3200                                 rbd_warn(rbd_dev,
3201                                          "failed to decode ResponseMessage: %d",
3202                                          ret);
3203                                 goto e_inval;
3204                         }
3205
3206                         ret = ceph_decode_32(&p);
3207                 }
3208         }
3209
3210         if (!lock_owner_responded) {
3211                 rbd_warn(rbd_dev, "no lock owners detected");
3212                 ret = -ETIMEDOUT;
3213         }
3214
3215 out:
3216         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3217         return ret;
3218
3219 e_inval:
3220         ret = -EINVAL;
3221         goto out;
3222 }
3223
3224 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3225 {
3226         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3227
3228         cancel_delayed_work(&rbd_dev->lock_dwork);
3229         if (wake_all)
3230                 wake_up_all(&rbd_dev->lock_waitq);
3231         else
3232                 wake_up(&rbd_dev->lock_waitq);
3233 }
3234
3235 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3236                                struct ceph_locker **lockers, u32 *num_lockers)
3237 {
3238         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3239         u8 lock_type;
3240         char *lock_tag;
3241         int ret;
3242
3243         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3244
3245         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3246                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3247                                  &lock_type, &lock_tag, lockers, num_lockers);
3248         if (ret)
3249                 return ret;
3250
3251         if (*num_lockers == 0) {
3252                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3253                 goto out;
3254         }
3255
3256         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3257                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3258                          lock_tag);
3259                 ret = -EBUSY;
3260                 goto out;
3261         }
3262
3263         if (lock_type == CEPH_CLS_LOCK_SHARED) {
3264                 rbd_warn(rbd_dev, "shared lock type detected");
3265                 ret = -EBUSY;
3266                 goto out;
3267         }
3268
3269         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3270                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
3271                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3272                          (*lockers)[0].id.cookie);
3273                 ret = -EBUSY;
3274                 goto out;
3275         }
3276
3277 out:
3278         kfree(lock_tag);
3279         return ret;
3280 }
3281
3282 static int find_watcher(struct rbd_device *rbd_dev,
3283                         const struct ceph_locker *locker)
3284 {
3285         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3286         struct ceph_watch_item *watchers;
3287         u32 num_watchers;
3288         u64 cookie;
3289         int i;
3290         int ret;
3291
3292         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3293                                       &rbd_dev->header_oloc, &watchers,
3294                                       &num_watchers);
3295         if (ret)
3296                 return ret;
3297
3298         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3299         for (i = 0; i < num_watchers; i++) {
3300                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3301                             sizeof(locker->info.addr)) &&
3302                     watchers[i].cookie == cookie) {
3303                         struct rbd_client_id cid = {
3304                                 .gid = le64_to_cpu(watchers[i].name.num),
3305                                 .handle = cookie,
3306                         };
3307
3308                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3309                              rbd_dev, cid.gid, cid.handle);
3310                         rbd_set_owner_cid(rbd_dev, &cid);
3311                         ret = 1;
3312                         goto out;
3313                 }
3314         }
3315
3316         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3317         ret = 0;
3318 out:
3319         kfree(watchers);
3320         return ret;
3321 }
3322
3323 /*
3324  * lock_rwsem must be held for write
3325  */
3326 static int rbd_try_lock(struct rbd_device *rbd_dev)
3327 {
3328         struct ceph_client *client = rbd_dev->rbd_client->client;
3329         struct ceph_locker *lockers;
3330         u32 num_lockers;
3331         int ret;
3332
3333         for (;;) {
3334                 ret = rbd_lock(rbd_dev);
3335                 if (ret != -EBUSY)
3336                         return ret;
3337
3338                 /* determine if the current lock holder is still alive */
3339                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3340                 if (ret)
3341                         return ret;
3342
3343                 if (num_lockers == 0)
3344                         goto again;
3345
3346                 ret = find_watcher(rbd_dev, lockers);
3347                 if (ret) {
3348                         if (ret > 0)
3349                                 ret = 0; /* have to request lock */
3350                         goto out;
3351                 }
3352
3353                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3354                          ENTITY_NAME(lockers[0].id.name));
3355
3356                 ret = ceph_monc_blacklist_add(&client->monc,
3357                                               &lockers[0].info.addr);
3358                 if (ret) {
3359                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3360                                  ENTITY_NAME(lockers[0].id.name), ret);
3361                         goto out;
3362                 }
3363
3364                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3365                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
3366                                           lockers[0].id.cookie,
3367                                           &lockers[0].id.name);
3368                 if (ret && ret != -ENOENT)
3369                         goto out;
3370
3371 again:
3372                 ceph_free_lockers(lockers, num_lockers);
3373         }
3374
3375 out:
3376         ceph_free_lockers(lockers, num_lockers);
3377         return ret;
3378 }
3379
3380 /*
3381  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3382  */
3383 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3384                                                 int *pret)
3385 {
3386         enum rbd_lock_state lock_state;
3387
3388         down_read(&rbd_dev->lock_rwsem);
3389         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3390              rbd_dev->lock_state);
3391         if (__rbd_is_lock_owner(rbd_dev)) {
3392                 lock_state = rbd_dev->lock_state;
3393                 up_read(&rbd_dev->lock_rwsem);
3394                 return lock_state;
3395         }
3396
3397         up_read(&rbd_dev->lock_rwsem);
3398         down_write(&rbd_dev->lock_rwsem);
3399         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3400              rbd_dev->lock_state);
3401         if (!__rbd_is_lock_owner(rbd_dev)) {
3402                 *pret = rbd_try_lock(rbd_dev);
3403                 if (*pret)
3404                         rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3405         }
3406
3407         lock_state = rbd_dev->lock_state;
3408         up_write(&rbd_dev->lock_rwsem);
3409         return lock_state;
3410 }
3411
3412 static void rbd_acquire_lock(struct work_struct *work)
3413 {
3414         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3415                                             struct rbd_device, lock_dwork);
3416         enum rbd_lock_state lock_state;
3417         int ret = 0;
3418
3419         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3420 again:
3421         lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3422         if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3423                 if (lock_state == RBD_LOCK_STATE_LOCKED)
3424                         wake_requests(rbd_dev, true);
3425                 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3426                      rbd_dev, lock_state, ret);
3427                 return;
3428         }
3429
3430         ret = rbd_request_lock(rbd_dev);
3431         if (ret == -ETIMEDOUT) {
3432                 goto again; /* treat this as a dead client */
3433         } else if (ret == -EROFS) {
3434                 rbd_warn(rbd_dev, "peer will not release lock");
3435                 /*
3436                  * If this is rbd_add_acquire_lock(), we want to fail
3437                  * immediately -- reuse BLACKLISTED flag.  Otherwise we
3438                  * want to block.
3439                  */
3440                 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3441                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3442                         /* wake "rbd map --exclusive" process */
3443                         wake_requests(rbd_dev, false);
3444                 }
3445         } else if (ret < 0) {
3446                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3447                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3448                                  RBD_RETRY_DELAY);
3449         } else {
3450                 /*
3451                  * lock owner acked, but resend if we don't see them
3452                  * release the lock
3453                  */
3454                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3455                      rbd_dev);
3456                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3457                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3458         }
3459 }
3460
3461 /*
3462  * lock_rwsem must be held for write
3463  */
3464 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3465 {
3466         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3467              rbd_dev->lock_state);
3468         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3469                 return false;
3470
3471         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3472         downgrade_write(&rbd_dev->lock_rwsem);
3473         /*
3474          * Ensure that all in-flight IO is flushed.
3475          *
3476          * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3477          * may be shared with other devices.
3478          */
3479         ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3480         up_read(&rbd_dev->lock_rwsem);
3481
3482         down_write(&rbd_dev->lock_rwsem);
3483         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3484              rbd_dev->lock_state);
3485         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3486                 return false;
3487
3488         rbd_unlock(rbd_dev);
3489         /*
3490          * Give others a chance to grab the lock - we would re-acquire
3491          * almost immediately if we got new IO during ceph_osdc_sync()
3492          * otherwise.  We need to ack our own notifications, so this
3493          * lock_dwork will be requeued from rbd_wait_state_locked()
3494          * after wake_requests() in rbd_handle_released_lock().
3495          */
3496         cancel_delayed_work(&rbd_dev->lock_dwork);
3497         return true;
3498 }
3499
3500 static void rbd_release_lock_work(struct work_struct *work)
3501 {
3502         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3503                                                   unlock_work);
3504
3505         down_write(&rbd_dev->lock_rwsem);
3506         rbd_release_lock(rbd_dev);
3507         up_write(&rbd_dev->lock_rwsem);
3508 }
3509
3510 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3511                                      void **p)
3512 {
3513         struct rbd_client_id cid = { 0 };
3514
3515         if (struct_v >= 2) {
3516                 cid.gid = ceph_decode_64(p);
3517                 cid.handle = ceph_decode_64(p);
3518         }
3519
3520         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3521              cid.handle);
3522         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3523                 down_write(&rbd_dev->lock_rwsem);
3524                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3525                         /*
3526                          * we already know that the remote client is
3527                          * the owner
3528                          */
3529                         up_write(&rbd_dev->lock_rwsem);
3530                         return;
3531                 }
3532
3533                 rbd_set_owner_cid(rbd_dev, &cid);
3534                 downgrade_write(&rbd_dev->lock_rwsem);
3535         } else {
3536                 down_read(&rbd_dev->lock_rwsem);
3537         }
3538
3539         if (!__rbd_is_lock_owner(rbd_dev))
3540                 wake_requests(rbd_dev, false);
3541         up_read(&rbd_dev->lock_rwsem);
3542 }
3543
3544 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3545                                      void **p)
3546 {
3547         struct rbd_client_id cid = { 0 };
3548
3549         if (struct_v >= 2) {
3550                 cid.gid = ceph_decode_64(p);
3551                 cid.handle = ceph_decode_64(p);
3552         }
3553
3554         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3555              cid.handle);
3556         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3557                 down_write(&rbd_dev->lock_rwsem);
3558                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3559                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3560                              __func__, rbd_dev, cid.gid, cid.handle,
3561                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3562                         up_write(&rbd_dev->lock_rwsem);
3563                         return;
3564                 }
3565
3566                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3567                 downgrade_write(&rbd_dev->lock_rwsem);
3568         } else {
3569                 down_read(&rbd_dev->lock_rwsem);
3570         }
3571
3572         if (!__rbd_is_lock_owner(rbd_dev))
3573                 wake_requests(rbd_dev, false);
3574         up_read(&rbd_dev->lock_rwsem);
3575 }
3576
3577 /*
3578  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3579  * ResponseMessage is needed.
3580  */
3581 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3582                                    void **p)
3583 {
3584         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3585         struct rbd_client_id cid = { 0 };
3586         int result = 1;
3587
3588         if (struct_v >= 2) {
3589                 cid.gid = ceph_decode_64(p);
3590                 cid.handle = ceph_decode_64(p);
3591         }
3592
3593         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3594              cid.handle);
3595         if (rbd_cid_equal(&cid, &my_cid))
3596                 return result;
3597
3598         down_read(&rbd_dev->lock_rwsem);
3599         if (__rbd_is_lock_owner(rbd_dev)) {
3600                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3601                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3602                         goto out_unlock;
3603
3604                 /*
3605                  * encode ResponseMessage(0) so the peer can detect
3606                  * a missing owner
3607                  */
3608                 result = 0;
3609
3610                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3611                         if (!rbd_dev->opts->exclusive) {
3612                                 dout("%s rbd_dev %p queueing unlock_work\n",
3613                                      __func__, rbd_dev);
3614                                 queue_work(rbd_dev->task_wq,
3615                                            &rbd_dev->unlock_work);
3616                         } else {
3617                                 /* refuse to release the lock */
3618                                 result = -EROFS;
3619                         }
3620                 }
3621         }
3622
3623 out_unlock:
3624         up_read(&rbd_dev->lock_rwsem);
3625         return result;
3626 }
3627
3628 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3629                                      u64 notify_id, u64 cookie, s32 *result)
3630 {
3631         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3632         int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3633         char buf[buf_size];
3634         int ret;
3635
3636         if (result) {
3637                 void *p = buf;
3638
3639                 /* encode ResponseMessage */
3640                 ceph_start_encoding(&p, 1, 1,
3641                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
3642                 ceph_encode_32(&p, *result);
3643         } else {
3644                 buf_size = 0;
3645         }
3646
3647         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3648                                    &rbd_dev->header_oloc, notify_id, cookie,
3649                                    buf, buf_size);
3650         if (ret)
3651                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3652 }
3653
3654 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3655                                    u64 cookie)
3656 {
3657         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3658         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3659 }
3660
3661 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3662                                           u64 notify_id, u64 cookie, s32 result)
3663 {
3664         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3665         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3666 }
3667
3668 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3669                          u64 notifier_id, void *data, size_t data_len)
3670 {
3671         struct rbd_device *rbd_dev = arg;
3672         void *p = data;
3673         void *const end = p + data_len;
3674         u8 struct_v = 0;
3675         u32 len;
3676         u32 notify_op;
3677         int ret;
3678
3679         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3680              __func__, rbd_dev, cookie, notify_id, data_len);
3681         if (data_len) {
3682                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3683                                           &struct_v, &len);
3684                 if (ret) {
3685                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3686                                  ret);
3687                         return;
3688                 }
3689
3690                 notify_op = ceph_decode_32(&p);
3691         } else {
3692                 /* legacy notification for header updates */
3693                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3694                 len = 0;
3695         }
3696
3697         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3698         switch (notify_op) {
3699         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3700                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3701                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3702                 break;
3703         case RBD_NOTIFY_OP_RELEASED_LOCK:
3704                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3705                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3706                 break;
3707         case RBD_NOTIFY_OP_REQUEST_LOCK:
3708                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3709                 if (ret <= 0)
3710                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3711                                                       cookie, ret);
3712                 else
3713                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3714                 break;
3715         case RBD_NOTIFY_OP_HEADER_UPDATE:
3716                 ret = rbd_dev_refresh(rbd_dev);
3717                 if (ret)
3718                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
3719
3720                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3721                 break;
3722         default:
3723                 if (rbd_is_lock_owner(rbd_dev))
3724                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3725                                                       cookie, -EOPNOTSUPP);
3726                 else
3727                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3728                 break;
3729         }
3730 }
3731
3732 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3733
3734 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3735 {
3736         struct rbd_device *rbd_dev = arg;
3737
3738         rbd_warn(rbd_dev, "encountered watch error: %d", err);
3739
3740         down_write(&rbd_dev->lock_rwsem);
3741         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3742         up_write(&rbd_dev->lock_rwsem);
3743
3744         mutex_lock(&rbd_dev->watch_mutex);
3745         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3746                 __rbd_unregister_watch(rbd_dev);
3747                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3748
3749                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3750         }
3751         mutex_unlock(&rbd_dev->watch_mutex);
3752 }
3753
3754 /*
3755  * watch_mutex must be locked
3756  */
3757 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3758 {
3759         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3760         struct ceph_osd_linger_request *handle;
3761
3762         rbd_assert(!rbd_dev->watch_handle);
3763         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3764
3765         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3766                                  &rbd_dev->header_oloc, rbd_watch_cb,
3767                                  rbd_watch_errcb, rbd_dev);
3768         if (IS_ERR(handle))
3769                 return PTR_ERR(handle);
3770
3771         rbd_dev->watch_handle = handle;
3772         return 0;
3773 }
3774
3775 /*
3776  * watch_mutex must be locked
3777  */
3778 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3779 {
3780         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3781         int ret;
3782
3783         rbd_assert(rbd_dev->watch_handle);
3784         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3785
3786         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3787         if (ret)
3788                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3789
3790         rbd_dev->watch_handle = NULL;
3791 }
3792
3793 static int rbd_register_watch(struct rbd_device *rbd_dev)
3794 {
3795         int ret;
3796
3797         mutex_lock(&rbd_dev->watch_mutex);
3798         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3799         ret = __rbd_register_watch(rbd_dev);
3800         if (ret)
3801                 goto out;
3802
3803         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3804         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3805
3806 out:
3807         mutex_unlock(&rbd_dev->watch_mutex);
3808         return ret;
3809 }
3810
3811 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3812 {
3813         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3814
3815         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3816         cancel_work_sync(&rbd_dev->acquired_lock_work);
3817         cancel_work_sync(&rbd_dev->released_lock_work);
3818         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3819         cancel_work_sync(&rbd_dev->unlock_work);
3820 }
3821
3822 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3823 {
3824         WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3825         cancel_tasks_sync(rbd_dev);
3826
3827         mutex_lock(&rbd_dev->watch_mutex);
3828         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3829                 __rbd_unregister_watch(rbd_dev);
3830         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3831         mutex_unlock(&rbd_dev->watch_mutex);
3832
3833         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3834 }
3835
3836 /*
3837  * lock_rwsem must be held for write
3838  */
3839 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3840 {
3841         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3842         char cookie[32];
3843         int ret;
3844
3845         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3846
3847         format_lock_cookie(rbd_dev, cookie);
3848         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3849                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
3850                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3851                                   RBD_LOCK_TAG, cookie);
3852         if (ret) {
3853                 if (ret != -EOPNOTSUPP)
3854                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3855                                  ret);
3856
3857                 /*
3858                  * Lock cookie cannot be updated on older OSDs, so do
3859                  * a manual release and queue an acquire.
3860                  */
3861                 if (rbd_release_lock(rbd_dev))
3862                         queue_delayed_work(rbd_dev->task_wq,
3863                                            &rbd_dev->lock_dwork, 0);
3864         } else {
3865                 __rbd_lock(rbd_dev, cookie);
3866         }
3867 }
3868
3869 static void rbd_reregister_watch(struct work_struct *work)
3870 {
3871         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3872                                             struct rbd_device, watch_dwork);
3873         int ret;
3874
3875         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3876
3877         mutex_lock(&rbd_dev->watch_mutex);
3878         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3879                 mutex_unlock(&rbd_dev->watch_mutex);
3880                 return;
3881         }
3882
3883         ret = __rbd_register_watch(rbd_dev);
3884         if (ret) {
3885                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3886                 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3887                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3888                         wake_requests(rbd_dev, true);
3889                 } else {
3890                         queue_delayed_work(rbd_dev->task_wq,
3891                                            &rbd_dev->watch_dwork,
3892                                            RBD_RETRY_DELAY);
3893                 }
3894                 mutex_unlock(&rbd_dev->watch_mutex);
3895                 return;
3896         }
3897
3898         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3899         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3900         mutex_unlock(&rbd_dev->watch_mutex);
3901
3902         down_write(&rbd_dev->lock_rwsem);
3903         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3904                 rbd_reacquire_lock(rbd_dev);
3905         up_write(&rbd_dev->lock_rwsem);
3906
3907         ret = rbd_dev_refresh(rbd_dev);
3908         if (ret)
3909                 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3910 }
3911
3912 /*
3913  * Synchronous osd object method call.  Returns the number of bytes
3914  * returned in the outbound buffer, or a negative error code.
3915  */
3916 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3917                              struct ceph_object_id *oid,
3918                              struct ceph_object_locator *oloc,
3919                              const char *method_name,
3920                              const void *outbound,
3921                              size_t outbound_size,
3922                              void *inbound,
3923                              size_t inbound_size)
3924 {
3925         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3926         struct page *req_page = NULL;
3927         struct page *reply_page;
3928         int ret;
3929
3930         /*
3931          * Method calls are ultimately read operations.  The result
3932          * should placed into the inbound buffer provided.  They
3933          * also supply outbound data--parameters for the object
3934          * method.  Currently if this is present it will be a
3935          * snapshot id.
3936          */
3937         if (outbound) {
3938                 if (outbound_size > PAGE_SIZE)
3939                         return -E2BIG;
3940
3941                 req_page = alloc_page(GFP_KERNEL);
3942                 if (!req_page)
3943                         return -ENOMEM;
3944
3945                 memcpy(page_address(req_page), outbound, outbound_size);
3946         }
3947
3948         reply_page = alloc_page(GFP_KERNEL);
3949         if (!reply_page) {
3950                 if (req_page)
3951                         __free_page(req_page);
3952                 return -ENOMEM;
3953         }
3954
3955         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3956                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
3957                              reply_page, &inbound_size);
3958         if (!ret) {
3959                 memcpy(inbound, page_address(reply_page), inbound_size);
3960                 ret = inbound_size;
3961         }
3962
3963         if (req_page)
3964                 __free_page(req_page);
3965         __free_page(reply_page);
3966         return ret;
3967 }
3968
3969 /*
3970  * lock_rwsem must be held for read
3971  */
3972 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3973 {
3974         DEFINE_WAIT(wait);
3975
3976         do {
3977                 /*
3978                  * Note the use of mod_delayed_work() in rbd_acquire_lock()
3979                  * and cancel_delayed_work() in wake_requests().
3980                  */
3981                 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3982                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3983                 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3984                                           TASK_UNINTERRUPTIBLE);
3985                 up_read(&rbd_dev->lock_rwsem);
3986                 schedule();
3987                 down_read(&rbd_dev->lock_rwsem);
3988         } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3989                  !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
3990
3991         finish_wait(&rbd_dev->lock_waitq, &wait);
3992 }
3993
3994 static void rbd_queue_workfn(struct work_struct *work)
3995 {
3996         struct request *rq = blk_mq_rq_from_pdu(work);
3997         struct rbd_device *rbd_dev = rq->q->queuedata;
3998         struct rbd_img_request *img_request;
3999         struct ceph_snap_context *snapc = NULL;
4000         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4001         u64 length = blk_rq_bytes(rq);
4002         enum obj_operation_type op_type;
4003         u64 mapping_size;
4004         bool must_be_locked;
4005         int result;
4006
4007         switch (req_op(rq)) {
4008         case REQ_OP_DISCARD:
4009         case REQ_OP_WRITE_ZEROES:
4010                 op_type = OBJ_OP_DISCARD;
4011                 break;
4012         case REQ_OP_WRITE:
4013                 op_type = OBJ_OP_WRITE;
4014                 break;
4015         case REQ_OP_READ:
4016                 op_type = OBJ_OP_READ;
4017                 break;
4018         default:
4019                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4020                 result = -EIO;
4021                 goto err;
4022         }
4023
4024         /* Ignore/skip any zero-length requests */
4025
4026         if (!length) {
4027                 dout("%s: zero-length request\n", __func__);
4028                 result = 0;
4029                 goto err_rq;
4030         }
4031
4032         rbd_assert(op_type == OBJ_OP_READ ||
4033                    rbd_dev->spec->snap_id == CEPH_NOSNAP);
4034
4035         /*
4036          * Quit early if the mapped snapshot no longer exists.  It's
4037          * still possible the snapshot will have disappeared by the
4038          * time our request arrives at the osd, but there's no sense in
4039          * sending it if we already know.
4040          */
4041         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4042                 dout("request for non-existent snapshot");
4043                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4044                 result = -ENXIO;
4045                 goto err_rq;
4046         }
4047
4048         if (offset && length > U64_MAX - offset + 1) {
4049                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4050                          length);
4051                 result = -EINVAL;
4052                 goto err_rq;    /* Shouldn't happen */
4053         }
4054
4055         blk_mq_start_request(rq);
4056
4057         down_read(&rbd_dev->header_rwsem);
4058         mapping_size = rbd_dev->mapping.size;
4059         if (op_type != OBJ_OP_READ) {
4060                 snapc = rbd_dev->header.snapc;
4061                 ceph_get_snap_context(snapc);
4062         }
4063         up_read(&rbd_dev->header_rwsem);
4064
4065         if (offset + length > mapping_size) {
4066                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4067                          length, mapping_size);
4068                 result = -EIO;
4069                 goto err_rq;
4070         }
4071
4072         must_be_locked =
4073             (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4074             (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
4075         if (must_be_locked) {
4076                 down_read(&rbd_dev->lock_rwsem);
4077                 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4078                     !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4079                         if (rbd_dev->opts->exclusive) {
4080                                 rbd_warn(rbd_dev, "exclusive lock required");
4081                                 result = -EROFS;
4082                                 goto err_unlock;
4083                         }
4084                         rbd_wait_state_locked(rbd_dev);
4085                 }
4086                 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4087                         result = -EBLACKLISTED;
4088                         goto err_unlock;
4089                 }
4090         }
4091
4092         img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4093                                              snapc);
4094         if (!img_request) {
4095                 result = -ENOMEM;
4096                 goto err_unlock;
4097         }
4098         img_request->rq = rq;
4099         snapc = NULL; /* img_request consumes a ref */
4100
4101         if (op_type == OBJ_OP_DISCARD)
4102                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4103                                               NULL);
4104         else
4105                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4106                                               rq->bio);
4107         if (result)
4108                 goto err_img_request;
4109
4110         result = rbd_img_request_submit(img_request);
4111         if (result)
4112                 goto err_img_request;
4113
4114         if (must_be_locked)
4115                 up_read(&rbd_dev->lock_rwsem);
4116         return;
4117
4118 err_img_request:
4119         rbd_img_request_put(img_request);
4120 err_unlock:
4121         if (must_be_locked)
4122                 up_read(&rbd_dev->lock_rwsem);
4123 err_rq:
4124         if (result)
4125                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4126                          obj_op_name(op_type), length, offset, result);
4127         ceph_put_snap_context(snapc);
4128 err:
4129         blk_mq_end_request(rq, errno_to_blk_status(result));
4130 }
4131
4132 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4133                 const struct blk_mq_queue_data *bd)
4134 {
4135         struct request *rq = bd->rq;
4136         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4137
4138         queue_work(rbd_wq, work);
4139         return BLK_STS_OK;
4140 }
4141
4142 static void rbd_free_disk(struct rbd_device *rbd_dev)
4143 {
4144         blk_cleanup_queue(rbd_dev->disk->queue);
4145         blk_mq_free_tag_set(&rbd_dev->tag_set);
4146         put_disk(rbd_dev->disk);
4147         rbd_dev->disk = NULL;
4148 }
4149
4150 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4151                              struct ceph_object_id *oid,
4152                              struct ceph_object_locator *oloc,
4153                              void *buf, int buf_len)
4154
4155 {
4156         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4157         struct ceph_osd_request *req;
4158         struct page **pages;
4159         int num_pages = calc_pages_for(0, buf_len);
4160         int ret;
4161
4162         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4163         if (!req)
4164                 return -ENOMEM;
4165
4166         ceph_oid_copy(&req->r_base_oid, oid);
4167         ceph_oloc_copy(&req->r_base_oloc, oloc);
4168         req->r_flags = CEPH_OSD_FLAG_READ;
4169
4170         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4171         if (ret)
4172                 goto out_req;
4173
4174         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4175         if (IS_ERR(pages)) {
4176                 ret = PTR_ERR(pages);
4177                 goto out_req;
4178         }
4179
4180         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4181         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4182                                          true);
4183
4184         ceph_osdc_start_request(osdc, req, false);
4185         ret = ceph_osdc_wait_request(osdc, req);
4186         if (ret >= 0)
4187                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4188
4189 out_req:
4190         ceph_osdc_put_request(req);
4191         return ret;
4192 }
4193
4194 /*
4195  * Read the complete header for the given rbd device.  On successful
4196  * return, the rbd_dev->header field will contain up-to-date
4197  * information about the image.
4198  */
4199 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4200 {
4201         struct rbd_image_header_ondisk *ondisk = NULL;
4202         u32 snap_count = 0;
4203         u64 names_size = 0;
4204         u32 want_count;
4205         int ret;
4206
4207         /*
4208          * The complete header will include an array of its 64-bit
4209          * snapshot ids, followed by the names of those snapshots as
4210          * a contiguous block of NUL-terminated strings.  Note that
4211          * the number of snapshots could change by the time we read
4212          * it in, in which case we re-read it.
4213          */
4214         do {
4215                 size_t size;
4216
4217                 kfree(ondisk);
4218
4219                 size = sizeof (*ondisk);
4220                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4221                 size += names_size;
4222                 ondisk = kmalloc(size, GFP_KERNEL);
4223                 if (!ondisk)
4224                         return -ENOMEM;
4225
4226                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4227                                         &rbd_dev->header_oloc, ondisk, size);
4228                 if (ret < 0)
4229                         goto out;
4230                 if ((size_t)ret < size) {
4231                         ret = -ENXIO;
4232                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4233                                 size, ret);
4234                         goto out;
4235                 }
4236                 if (!rbd_dev_ondisk_valid(ondisk)) {
4237                         ret = -ENXIO;
4238                         rbd_warn(rbd_dev, "invalid header");
4239                         goto out;
4240                 }
4241
4242                 names_size = le64_to_cpu(ondisk->snap_names_len);
4243                 want_count = snap_count;
4244                 snap_count = le32_to_cpu(ondisk->snap_count);
4245         } while (snap_count != want_count);
4246
4247         ret = rbd_header_from_disk(rbd_dev, ondisk);
4248 out:
4249         kfree(ondisk);
4250
4251         return ret;
4252 }
4253
4254 /*
4255  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4256  * has disappeared from the (just updated) snapshot context.
4257  */
4258 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4259 {
4260         u64 snap_id;
4261
4262         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4263                 return;
4264
4265         snap_id = rbd_dev->spec->snap_id;
4266         if (snap_id == CEPH_NOSNAP)
4267                 return;
4268
4269         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4270                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4271 }
4272
4273 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4274 {
4275         sector_t size;
4276
4277         /*
4278          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4279          * try to update its size.  If REMOVING is set, updating size
4280          * is just useless work since the device can't be opened.
4281          */
4282         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4283             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4284                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4285                 dout("setting size to %llu sectors", (unsigned long long)size);
4286                 set_capacity(rbd_dev->disk, size);
4287                 revalidate_disk(rbd_dev->disk);
4288         }
4289 }
4290
4291 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4292 {
4293         u64 mapping_size;
4294         int ret;
4295
4296         down_write(&rbd_dev->header_rwsem);
4297         mapping_size = rbd_dev->mapping.size;
4298
4299         ret = rbd_dev_header_info(rbd_dev);
4300         if (ret)
4301                 goto out;
4302
4303         /*
4304          * If there is a parent, see if it has disappeared due to the
4305          * mapped image getting flattened.
4306          */
4307         if (rbd_dev->parent) {
4308                 ret = rbd_dev_v2_parent_info(rbd_dev);
4309                 if (ret)
4310                         goto out;
4311         }
4312
4313         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4314                 rbd_dev->mapping.size = rbd_dev->header.image_size;
4315         } else {
4316                 /* validate mapped snapshot's EXISTS flag */
4317                 rbd_exists_validate(rbd_dev);
4318         }
4319
4320 out:
4321         up_write(&rbd_dev->header_rwsem);
4322         if (!ret && mapping_size != rbd_dev->mapping.size)
4323                 rbd_dev_update_size(rbd_dev);
4324
4325         return ret;
4326 }
4327
4328 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4329                 unsigned int hctx_idx, unsigned int numa_node)
4330 {
4331         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4332
4333         INIT_WORK(work, rbd_queue_workfn);
4334         return 0;
4335 }
4336
4337 static const struct blk_mq_ops rbd_mq_ops = {
4338         .queue_rq       = rbd_queue_rq,
4339         .init_request   = rbd_init_request,
4340 };
4341
4342 static int rbd_init_disk(struct rbd_device *rbd_dev)
4343 {
4344         struct gendisk *disk;
4345         struct request_queue *q;
4346         u64 segment_size;
4347         int err;
4348
4349         /* create gendisk info */
4350         disk = alloc_disk(single_major ?
4351                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4352                           RBD_MINORS_PER_MAJOR);
4353         if (!disk)
4354                 return -ENOMEM;
4355
4356         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4357                  rbd_dev->dev_id);
4358         disk->major = rbd_dev->major;
4359         disk->first_minor = rbd_dev->minor;
4360         if (single_major)
4361                 disk->flags |= GENHD_FL_EXT_DEVT;
4362         disk->fops = &rbd_bd_ops;
4363         disk->private_data = rbd_dev;
4364
4365         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4366         rbd_dev->tag_set.ops = &rbd_mq_ops;
4367         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4368         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4369         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4370         rbd_dev->tag_set.nr_hw_queues = 1;
4371         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4372
4373         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4374         if (err)
4375                 goto out_disk;
4376
4377         q = blk_mq_init_queue(&rbd_dev->tag_set);
4378         if (IS_ERR(q)) {
4379                 err = PTR_ERR(q);
4380                 goto out_tag_set;
4381         }
4382
4383         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4384         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4385
4386         /* set io sizes to object size */
4387         segment_size = rbd_obj_bytes(&rbd_dev->header);
4388         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4389         q->limits.max_sectors = queue_max_hw_sectors(q);
4390         blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
4391         blk_queue_max_segment_size(q, segment_size);
4392         blk_queue_io_min(q, segment_size);
4393         blk_queue_io_opt(q, segment_size);
4394
4395         /* enable the discard support */
4396         queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4397         q->limits.discard_granularity = segment_size;
4398         blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4399         blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
4400
4401         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4402                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4403
4404         /*
4405          * disk_release() expects a queue ref from add_disk() and will
4406          * put it.  Hold an extra ref until add_disk() is called.
4407          */
4408         WARN_ON(!blk_get_queue(q));
4409         disk->queue = q;
4410         q->queuedata = rbd_dev;
4411
4412         rbd_dev->disk = disk;
4413
4414         return 0;
4415 out_tag_set:
4416         blk_mq_free_tag_set(&rbd_dev->tag_set);
4417 out_disk:
4418         put_disk(disk);
4419         return err;
4420 }
4421
4422 /*
4423   sysfs
4424 */
4425
4426 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4427 {
4428         return container_of(dev, struct rbd_device, dev);
4429 }
4430
4431 static ssize_t rbd_size_show(struct device *dev,
4432                              struct device_attribute *attr, char *buf)
4433 {
4434         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4435
4436         return sprintf(buf, "%llu\n",
4437                 (unsigned long long)rbd_dev->mapping.size);
4438 }
4439
4440 /*
4441  * Note this shows the features for whatever's mapped, which is not
4442  * necessarily the base image.
4443  */
4444 static ssize_t rbd_features_show(struct device *dev,
4445                              struct device_attribute *attr, char *buf)
4446 {
4447         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4448
4449         return sprintf(buf, "0x%016llx\n",
4450                         (unsigned long long)rbd_dev->mapping.features);
4451 }
4452
4453 static ssize_t rbd_major_show(struct device *dev,
4454                               struct device_attribute *attr, char *buf)
4455 {
4456         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4457
4458         if (rbd_dev->major)
4459                 return sprintf(buf, "%d\n", rbd_dev->major);
4460
4461         return sprintf(buf, "(none)\n");
4462 }
4463
4464 static ssize_t rbd_minor_show(struct device *dev,
4465                               struct device_attribute *attr, char *buf)
4466 {
4467         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4468
4469         return sprintf(buf, "%d\n", rbd_dev->minor);
4470 }
4471
4472 static ssize_t rbd_client_addr_show(struct device *dev,
4473                                     struct device_attribute *attr, char *buf)
4474 {
4475         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4476         struct ceph_entity_addr *client_addr =
4477             ceph_client_addr(rbd_dev->rbd_client->client);
4478
4479         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4480                        le32_to_cpu(client_addr->nonce));
4481 }
4482
4483 static ssize_t rbd_client_id_show(struct device *dev,
4484                                   struct device_attribute *attr, char *buf)
4485 {
4486         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4487
4488         return sprintf(buf, "client%lld\n",
4489                        ceph_client_gid(rbd_dev->rbd_client->client));
4490 }
4491
4492 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4493                                      struct device_attribute *attr, char *buf)
4494 {
4495         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4496
4497         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4498 }
4499
4500 static ssize_t rbd_config_info_show(struct device *dev,
4501                                     struct device_attribute *attr, char *buf)
4502 {
4503         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4504
4505         return sprintf(buf, "%s\n", rbd_dev->config_info);
4506 }
4507
4508 static ssize_t rbd_pool_show(struct device *dev,
4509                              struct device_attribute *attr, char *buf)
4510 {
4511         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4512
4513         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4514 }
4515
4516 static ssize_t rbd_pool_id_show(struct device *dev,
4517                              struct device_attribute *attr, char *buf)
4518 {
4519         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4520
4521         return sprintf(buf, "%llu\n",
4522                         (unsigned long long) rbd_dev->spec->pool_id);
4523 }
4524
4525 static ssize_t rbd_name_show(struct device *dev,
4526                              struct device_attribute *attr, char *buf)
4527 {
4528         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4529
4530         if (rbd_dev->spec->image_name)
4531                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4532
4533         return sprintf(buf, "(unknown)\n");
4534 }
4535
4536 static ssize_t rbd_image_id_show(struct device *dev,
4537                              struct device_attribute *attr, char *buf)
4538 {
4539         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4540
4541         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4542 }
4543
4544 /*
4545  * Shows the name of the currently-mapped snapshot (or
4546  * RBD_SNAP_HEAD_NAME for the base image).
4547  */
4548 static ssize_t rbd_snap_show(struct device *dev,
4549                              struct device_attribute *attr,
4550                              char *buf)
4551 {
4552         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4553
4554         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4555 }
4556
4557 static ssize_t rbd_snap_id_show(struct device *dev,
4558                                 struct device_attribute *attr, char *buf)
4559 {
4560         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4561
4562         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4563 }
4564
4565 /*
4566  * For a v2 image, shows the chain of parent images, separated by empty
4567  * lines.  For v1 images or if there is no parent, shows "(no parent
4568  * image)".
4569  */
4570 static ssize_t rbd_parent_show(struct device *dev,
4571                                struct device_attribute *attr,
4572                                char *buf)
4573 {
4574         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4575         ssize_t count = 0;
4576
4577         if (!rbd_dev->parent)
4578                 return sprintf(buf, "(no parent image)\n");
4579
4580         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4581                 struct rbd_spec *spec = rbd_dev->parent_spec;
4582
4583                 count += sprintf(&buf[count], "%s"
4584                             "pool_id %llu\npool_name %s\n"
4585                             "image_id %s\nimage_name %s\n"
4586                             "snap_id %llu\nsnap_name %s\n"
4587                             "overlap %llu\n",
4588                             !count ? "" : "\n", /* first? */
4589                             spec->pool_id, spec->pool_name,
4590                             spec->image_id, spec->image_name ?: "(unknown)",
4591                             spec->snap_id, spec->snap_name,
4592                             rbd_dev->parent_overlap);
4593         }
4594
4595         return count;
4596 }
4597
4598 static ssize_t rbd_image_refresh(struct device *dev,
4599                                  struct device_attribute *attr,
4600                                  const char *buf,
4601                                  size_t size)
4602 {
4603         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4604         int ret;
4605
4606         ret = rbd_dev_refresh(rbd_dev);
4607         if (ret)
4608                 return ret;
4609
4610         return size;
4611 }
4612
4613 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4614 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4615 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4616 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4617 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4618 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4619 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4620 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4621 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4622 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4623 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4624 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4625 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4626 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4627 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4628 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4629
4630 static struct attribute *rbd_attrs[] = {
4631         &dev_attr_size.attr,
4632         &dev_attr_features.attr,
4633         &dev_attr_major.attr,
4634         &dev_attr_minor.attr,
4635         &dev_attr_client_addr.attr,
4636         &dev_attr_client_id.attr,
4637         &dev_attr_cluster_fsid.attr,
4638         &dev_attr_config_info.attr,
4639         &dev_attr_pool.attr,
4640         &dev_attr_pool_id.attr,
4641         &dev_attr_name.attr,
4642         &dev_attr_image_id.attr,
4643         &dev_attr_current_snap.attr,
4644         &dev_attr_snap_id.attr,
4645         &dev_attr_parent.attr,
4646         &dev_attr_refresh.attr,
4647         NULL
4648 };
4649
4650 static struct attribute_group rbd_attr_group = {
4651         .attrs = rbd_attrs,
4652 };
4653
4654 static const struct attribute_group *rbd_attr_groups[] = {
4655         &rbd_attr_group,
4656         NULL
4657 };
4658
4659 static void rbd_dev_release(struct device *dev);
4660
4661 static const struct device_type rbd_device_type = {
4662         .name           = "rbd",
4663         .groups         = rbd_attr_groups,
4664         .release        = rbd_dev_release,
4665 };
4666
4667 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4668 {
4669         kref_get(&spec->kref);
4670
4671         return spec;
4672 }
4673
4674 static void rbd_spec_free(struct kref *kref);
4675 static void rbd_spec_put(struct rbd_spec *spec)
4676 {
4677         if (spec)
4678                 kref_put(&spec->kref, rbd_spec_free);
4679 }
4680
4681 static struct rbd_spec *rbd_spec_alloc(void)
4682 {
4683         struct rbd_spec *spec;
4684
4685         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4686         if (!spec)
4687                 return NULL;
4688
4689         spec->pool_id = CEPH_NOPOOL;
4690         spec->snap_id = CEPH_NOSNAP;
4691         kref_init(&spec->kref);
4692
4693         return spec;
4694 }
4695
4696 static void rbd_spec_free(struct kref *kref)
4697 {
4698         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4699
4700         kfree(spec->pool_name);
4701         kfree(spec->image_id);
4702         kfree(spec->image_name);
4703         kfree(spec->snap_name);
4704         kfree(spec);
4705 }
4706
4707 static void rbd_dev_free(struct rbd_device *rbd_dev)
4708 {
4709         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4710         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4711
4712         ceph_oid_destroy(&rbd_dev->header_oid);
4713         ceph_oloc_destroy(&rbd_dev->header_oloc);
4714         kfree(rbd_dev->config_info);
4715
4716         rbd_put_client(rbd_dev->rbd_client);
4717         rbd_spec_put(rbd_dev->spec);
4718         kfree(rbd_dev->opts);
4719         kfree(rbd_dev);
4720 }
4721
4722 static void rbd_dev_release(struct device *dev)
4723 {
4724         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4725         bool need_put = !!rbd_dev->opts;
4726
4727         if (need_put) {
4728                 destroy_workqueue(rbd_dev->task_wq);
4729                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4730         }
4731
4732         rbd_dev_free(rbd_dev);
4733
4734         /*
4735          * This is racy, but way better than putting module outside of
4736          * the release callback.  The race window is pretty small, so
4737          * doing something similar to dm (dm-builtin.c) is overkill.
4738          */
4739         if (need_put)
4740                 module_put(THIS_MODULE);
4741 }
4742
4743 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4744                                            struct rbd_spec *spec)
4745 {
4746         struct rbd_device *rbd_dev;
4747
4748         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4749         if (!rbd_dev)
4750                 return NULL;
4751
4752         spin_lock_init(&rbd_dev->lock);
4753         INIT_LIST_HEAD(&rbd_dev->node);
4754         init_rwsem(&rbd_dev->header_rwsem);
4755
4756         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4757         ceph_oid_init(&rbd_dev->header_oid);
4758         rbd_dev->header_oloc.pool = spec->pool_id;
4759
4760         mutex_init(&rbd_dev->watch_mutex);
4761         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4762         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4763
4764         init_rwsem(&rbd_dev->lock_rwsem);
4765         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4766         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4767         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4768         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4769         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4770         init_waitqueue_head(&rbd_dev->lock_waitq);
4771
4772         rbd_dev->dev.bus = &rbd_bus_type;
4773         rbd_dev->dev.type = &rbd_device_type;
4774         rbd_dev->dev.parent = &rbd_root_dev;
4775         device_initialize(&rbd_dev->dev);
4776
4777         rbd_dev->rbd_client = rbdc;
4778         rbd_dev->spec = spec;
4779
4780         return rbd_dev;
4781 }
4782
4783 /*
4784  * Create a mapping rbd_dev.
4785  */
4786 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4787                                          struct rbd_spec *spec,
4788                                          struct rbd_options *opts)
4789 {
4790         struct rbd_device *rbd_dev;
4791
4792         rbd_dev = __rbd_dev_create(rbdc, spec);
4793         if (!rbd_dev)
4794                 return NULL;
4795
4796         rbd_dev->opts = opts;
4797
4798         /* get an id and fill in device name */
4799         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4800                                          minor_to_rbd_dev_id(1 << MINORBITS),
4801                                          GFP_KERNEL);
4802         if (rbd_dev->dev_id < 0)
4803                 goto fail_rbd_dev;
4804
4805         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4806         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4807                                                    rbd_dev->name);
4808         if (!rbd_dev->task_wq)
4809                 goto fail_dev_id;
4810
4811         /* we have a ref from do_rbd_add() */
4812         __module_get(THIS_MODULE);
4813
4814         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4815         return rbd_dev;
4816
4817 fail_dev_id:
4818         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4819 fail_rbd_dev:
4820         rbd_dev_free(rbd_dev);
4821         return NULL;
4822 }
4823
4824 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4825 {
4826         if (rbd_dev)
4827                 put_device(&rbd_dev->dev);
4828 }
4829
4830 /*
4831  * Get the size and object order for an image snapshot, or if
4832  * snap_id is CEPH_NOSNAP, gets this information for the base
4833  * image.
4834  */
4835 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4836                                 u8 *order, u64 *snap_size)
4837 {
4838         __le64 snapid = cpu_to_le64(snap_id);
4839         int ret;
4840         struct {
4841                 u8 order;
4842                 __le64 size;
4843         } __attribute__ ((packed)) size_buf = { 0 };
4844
4845         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4846                                   &rbd_dev->header_oloc, "get_size",
4847                                   &snapid, sizeof(snapid),
4848                                   &size_buf, sizeof(size_buf));
4849         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4850         if (ret < 0)
4851                 return ret;
4852         if (ret < sizeof (size_buf))
4853                 return -ERANGE;
4854
4855         if (order) {
4856                 *order = size_buf.order;
4857                 dout("  order %u", (unsigned int)*order);
4858         }
4859         *snap_size = le64_to_cpu(size_buf.size);
4860
4861         dout("  snap_id 0x%016llx snap_size = %llu\n",
4862                 (unsigned long long)snap_id,
4863                 (unsigned long long)*snap_size);
4864
4865         return 0;
4866 }
4867
4868 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4869 {
4870         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4871                                         &rbd_dev->header.obj_order,
4872                                         &rbd_dev->header.image_size);
4873 }
4874
4875 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4876 {
4877         void *reply_buf;
4878         int ret;
4879         void *p;
4880
4881         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4882         if (!reply_buf)
4883                 return -ENOMEM;
4884
4885         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4886                                   &rbd_dev->header_oloc, "get_object_prefix",
4887                                   NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4888         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4889         if (ret < 0)
4890                 goto out;
4891
4892         p = reply_buf;
4893         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4894                                                 p + ret, NULL, GFP_NOIO);
4895         ret = 0;
4896
4897         if (IS_ERR(rbd_dev->header.object_prefix)) {
4898                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4899                 rbd_dev->header.object_prefix = NULL;
4900         } else {
4901                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4902         }
4903 out:
4904         kfree(reply_buf);
4905
4906         return ret;
4907 }
4908
4909 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4910                 u64 *snap_features)
4911 {
4912         __le64 snapid = cpu_to_le64(snap_id);
4913         struct {
4914                 __le64 features;
4915                 __le64 incompat;
4916         } __attribute__ ((packed)) features_buf = { 0 };
4917         u64 unsup;
4918         int ret;
4919
4920         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4921                                   &rbd_dev->header_oloc, "get_features",
4922                                   &snapid, sizeof(snapid),
4923                                   &features_buf, sizeof(features_buf));
4924         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4925         if (ret < 0)
4926                 return ret;
4927         if (ret < sizeof (features_buf))
4928                 return -ERANGE;
4929
4930         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4931         if (unsup) {
4932                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4933                          unsup);
4934                 return -ENXIO;
4935         }
4936
4937         *snap_features = le64_to_cpu(features_buf.features);
4938
4939         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4940                 (unsigned long long)snap_id,
4941                 (unsigned long long)*snap_features,
4942                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4943
4944         return 0;
4945 }
4946
4947 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4948 {
4949         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4950                                                 &rbd_dev->header.features);
4951 }
4952
4953 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4954 {
4955         struct rbd_spec *parent_spec;
4956         size_t size;
4957         void *reply_buf = NULL;
4958         __le64 snapid;
4959         void *p;
4960         void *end;
4961         u64 pool_id;
4962         char *image_id;
4963         u64 snap_id;
4964         u64 overlap;
4965         int ret;
4966
4967         parent_spec = rbd_spec_alloc();
4968         if (!parent_spec)
4969                 return -ENOMEM;
4970
4971         size = sizeof (__le64) +                                /* pool_id */
4972                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
4973                 sizeof (__le64) +                               /* snap_id */
4974                 sizeof (__le64);                                /* overlap */
4975         reply_buf = kmalloc(size, GFP_KERNEL);
4976         if (!reply_buf) {
4977                 ret = -ENOMEM;
4978                 goto out_err;
4979         }
4980
4981         snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4982         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4983                                   &rbd_dev->header_oloc, "get_parent",
4984                                   &snapid, sizeof(snapid), reply_buf, size);
4985         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4986         if (ret < 0)
4987                 goto out_err;
4988
4989         p = reply_buf;
4990         end = reply_buf + ret;
4991         ret = -ERANGE;
4992         ceph_decode_64_safe(&p, end, pool_id, out_err);
4993         if (pool_id == CEPH_NOPOOL) {
4994                 /*
4995                  * Either the parent never existed, or we have
4996                  * record of it but the image got flattened so it no
4997                  * longer has a parent.  When the parent of a
4998                  * layered image disappears we immediately set the
4999                  * overlap to 0.  The effect of this is that all new
5000                  * requests will be treated as if the image had no
5001                  * parent.
5002                  */
5003                 if (rbd_dev->parent_overlap) {
5004                         rbd_dev->parent_overlap = 0;
5005                         rbd_dev_parent_put(rbd_dev);
5006                         pr_info("%s: clone image has been flattened\n",
5007                                 rbd_dev->disk->disk_name);
5008                 }
5009
5010                 goto out;       /* No parent?  No problem. */
5011         }
5012
5013         /* The ceph file layout needs to fit pool id in 32 bits */
5014
5015         ret = -EIO;
5016         if (pool_id > (u64)U32_MAX) {
5017                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5018                         (unsigned long long)pool_id, U32_MAX);
5019                 goto out_err;
5020         }
5021
5022         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5023         if (IS_ERR(image_id)) {
5024                 ret = PTR_ERR(image_id);
5025                 goto out_err;
5026         }
5027         ceph_decode_64_safe(&p, end, snap_id, out_err);
5028         ceph_decode_64_safe(&p, end, overlap, out_err);
5029
5030         /*
5031          * The parent won't change (except when the clone is
5032          * flattened, already handled that).  So we only need to
5033          * record the parent spec we have not already done so.
5034          */
5035         if (!rbd_dev->parent_spec) {
5036                 parent_spec->pool_id = pool_id;
5037                 parent_spec->image_id = image_id;
5038                 parent_spec->snap_id = snap_id;
5039                 rbd_dev->parent_spec = parent_spec;
5040                 parent_spec = NULL;     /* rbd_dev now owns this */
5041         } else {
5042                 kfree(image_id);
5043         }
5044
5045         /*
5046          * We always update the parent overlap.  If it's zero we issue
5047          * a warning, as we will proceed as if there was no parent.
5048          */
5049         if (!overlap) {
5050                 if (parent_spec) {
5051                         /* refresh, careful to warn just once */
5052                         if (rbd_dev->parent_overlap)
5053                                 rbd_warn(rbd_dev,
5054                                     "clone now standalone (overlap became 0)");
5055                 } else {
5056                         /* initial probe */
5057                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5058                 }
5059         }
5060         rbd_dev->parent_overlap = overlap;
5061
5062 out:
5063         ret = 0;
5064 out_err:
5065         kfree(reply_buf);
5066         rbd_spec_put(parent_spec);
5067
5068         return ret;
5069 }
5070
5071 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5072 {
5073         struct {
5074                 __le64 stripe_unit;
5075                 __le64 stripe_count;
5076         } __attribute__ ((packed)) striping_info_buf = { 0 };
5077         size_t size = sizeof (striping_info_buf);
5078         void *p;
5079         u64 obj_size;
5080         u64 stripe_unit;
5081         u64 stripe_count;
5082         int ret;
5083
5084         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5085                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
5086                                 NULL, 0, &striping_info_buf, size);
5087         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5088         if (ret < 0)
5089                 return ret;
5090         if (ret < size)
5091                 return -ERANGE;
5092
5093         /*
5094          * We don't actually support the "fancy striping" feature
5095          * (STRIPINGV2) yet, but if the striping sizes are the
5096          * defaults the behavior is the same as before.  So find
5097          * out, and only fail if the image has non-default values.
5098          */
5099         ret = -EINVAL;
5100         obj_size = rbd_obj_bytes(&rbd_dev->header);
5101         p = &striping_info_buf;
5102         stripe_unit = ceph_decode_64(&p);
5103         if (stripe_unit != obj_size) {
5104                 rbd_warn(rbd_dev, "unsupported stripe unit "
5105                                 "(got %llu want %llu)",
5106                                 stripe_unit, obj_size);
5107                 return -EINVAL;
5108         }
5109         stripe_count = ceph_decode_64(&p);
5110         if (stripe_count != 1) {
5111                 rbd_warn(rbd_dev, "unsupported stripe count "
5112                                 "(got %llu want 1)", stripe_count);
5113                 return -EINVAL;
5114         }
5115         rbd_dev->header.stripe_unit = stripe_unit;
5116         rbd_dev->header.stripe_count = stripe_count;
5117
5118         return 0;
5119 }
5120
5121 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5122 {
5123         __le64 data_pool_id;
5124         int ret;
5125
5126         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5127                                   &rbd_dev->header_oloc, "get_data_pool",
5128                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
5129         if (ret < 0)
5130                 return ret;
5131         if (ret < sizeof(data_pool_id))
5132                 return -EBADMSG;
5133
5134         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5135         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5136         return 0;
5137 }
5138
5139 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5140 {
5141         CEPH_DEFINE_OID_ONSTACK(oid);
5142         size_t image_id_size;
5143         char *image_id;
5144         void *p;
5145         void *end;
5146         size_t size;
5147         void *reply_buf = NULL;
5148         size_t len = 0;
5149         char *image_name = NULL;
5150         int ret;
5151
5152         rbd_assert(!rbd_dev->spec->image_name);
5153
5154         len = strlen(rbd_dev->spec->image_id);
5155         image_id_size = sizeof (__le32) + len;
5156         image_id = kmalloc(image_id_size, GFP_KERNEL);
5157         if (!image_id)
5158                 return NULL;
5159
5160         p = image_id;
5161         end = image_id + image_id_size;
5162         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5163
5164         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5165         reply_buf = kmalloc(size, GFP_KERNEL);
5166         if (!reply_buf)
5167                 goto out;
5168
5169         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5170         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5171                                   "dir_get_name", image_id, image_id_size,
5172                                   reply_buf, size);
5173         if (ret < 0)
5174                 goto out;
5175         p = reply_buf;
5176         end = reply_buf + ret;
5177
5178         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5179         if (IS_ERR(image_name))
5180                 image_name = NULL;
5181         else
5182                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5183 out:
5184         kfree(reply_buf);
5185         kfree(image_id);
5186
5187         return image_name;
5188 }
5189
5190 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5191 {
5192         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5193         const char *snap_name;
5194         u32 which = 0;
5195
5196         /* Skip over names until we find the one we are looking for */
5197
5198         snap_name = rbd_dev->header.snap_names;
5199         while (which < snapc->num_snaps) {
5200                 if (!strcmp(name, snap_name))
5201                         return snapc->snaps[which];
5202                 snap_name += strlen(snap_name) + 1;
5203                 which++;
5204         }
5205         return CEPH_NOSNAP;
5206 }
5207
5208 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5209 {
5210         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5211         u32 which;
5212         bool found = false;
5213         u64 snap_id;
5214
5215         for (which = 0; !found && which < snapc->num_snaps; which++) {
5216                 const char *snap_name;
5217
5218                 snap_id = snapc->snaps[which];
5219                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5220                 if (IS_ERR(snap_name)) {
5221                         /* ignore no-longer existing snapshots */
5222                         if (PTR_ERR(snap_name) == -ENOENT)
5223                                 continue;
5224                         else
5225                                 break;
5226                 }
5227                 found = !strcmp(name, snap_name);
5228                 kfree(snap_name);
5229         }
5230         return found ? snap_id : CEPH_NOSNAP;
5231 }
5232
5233 /*
5234  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5235  * no snapshot by that name is found, or if an error occurs.
5236  */
5237 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5238 {
5239         if (rbd_dev->image_format == 1)
5240                 return rbd_v1_snap_id_by_name(rbd_dev, name);
5241
5242         return rbd_v2_snap_id_by_name(rbd_dev, name);
5243 }
5244
5245 /*
5246  * An image being mapped will have everything but the snap id.
5247  */
5248 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5249 {
5250         struct rbd_spec *spec = rbd_dev->spec;
5251
5252         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5253         rbd_assert(spec->image_id && spec->image_name);
5254         rbd_assert(spec->snap_name);
5255
5256         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5257                 u64 snap_id;
5258
5259                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5260                 if (snap_id == CEPH_NOSNAP)
5261                         return -ENOENT;
5262
5263                 spec->snap_id = snap_id;
5264         } else {
5265                 spec->snap_id = CEPH_NOSNAP;
5266         }
5267
5268         return 0;
5269 }
5270
5271 /*
5272  * A parent image will have all ids but none of the names.
5273  *
5274  * All names in an rbd spec are dynamically allocated.  It's OK if we
5275  * can't figure out the name for an image id.
5276  */
5277 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5278 {
5279         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5280         struct rbd_spec *spec = rbd_dev->spec;
5281         const char *pool_name;
5282         const char *image_name;
5283         const char *snap_name;
5284         int ret;
5285
5286         rbd_assert(spec->pool_id != CEPH_NOPOOL);
5287         rbd_assert(spec->image_id);
5288         rbd_assert(spec->snap_id != CEPH_NOSNAP);
5289
5290         /* Get the pool name; we have to make our own copy of this */
5291
5292         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5293         if (!pool_name) {
5294                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5295                 return -EIO;
5296         }
5297         pool_name = kstrdup(pool_name, GFP_KERNEL);
5298         if (!pool_name)
5299                 return -ENOMEM;
5300
5301         /* Fetch the image name; tolerate failure here */
5302
5303         image_name = rbd_dev_image_name(rbd_dev);
5304         if (!image_name)
5305                 rbd_warn(rbd_dev, "unable to get image name");
5306
5307         /* Fetch the snapshot name */
5308
5309         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5310         if (IS_ERR(snap_name)) {
5311                 ret = PTR_ERR(snap_name);
5312                 goto out_err;
5313         }
5314
5315         spec->pool_name = pool_name;
5316         spec->image_name = image_name;
5317         spec->snap_name = snap_name;
5318
5319         return 0;
5320
5321 out_err:
5322         kfree(image_name);
5323         kfree(pool_name);
5324         return ret;
5325 }
5326
5327 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5328 {
5329         size_t size;
5330         int ret;
5331         void *reply_buf;
5332         void *p;
5333         void *end;
5334         u64 seq;
5335         u32 snap_count;
5336         struct ceph_snap_context *snapc;
5337         u32 i;
5338
5339         /*
5340          * We'll need room for the seq value (maximum snapshot id),
5341          * snapshot count, and array of that many snapshot ids.
5342          * For now we have a fixed upper limit on the number we're
5343          * prepared to receive.
5344          */
5345         size = sizeof (__le64) + sizeof (__le32) +
5346                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
5347         reply_buf = kzalloc(size, GFP_KERNEL);
5348         if (!reply_buf)
5349                 return -ENOMEM;
5350
5351         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5352                                   &rbd_dev->header_oloc, "get_snapcontext",
5353                                   NULL, 0, reply_buf, size);
5354         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5355         if (ret < 0)
5356                 goto out;
5357
5358         p = reply_buf;
5359         end = reply_buf + ret;
5360         ret = -ERANGE;
5361         ceph_decode_64_safe(&p, end, seq, out);
5362         ceph_decode_32_safe(&p, end, snap_count, out);
5363
5364         /*
5365          * Make sure the reported number of snapshot ids wouldn't go
5366          * beyond the end of our buffer.  But before checking that,
5367          * make sure the computed size of the snapshot context we
5368          * allocate is representable in a size_t.
5369          */
5370         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5371                                  / sizeof (u64)) {
5372                 ret = -EINVAL;
5373                 goto out;
5374         }
5375         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5376                 goto out;
5377         ret = 0;
5378
5379         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5380         if (!snapc) {
5381                 ret = -ENOMEM;
5382                 goto out;
5383         }
5384         snapc->seq = seq;
5385         for (i = 0; i < snap_count; i++)
5386                 snapc->snaps[i] = ceph_decode_64(&p);
5387
5388         ceph_put_snap_context(rbd_dev->header.snapc);
5389         rbd_dev->header.snapc = snapc;
5390
5391         dout("  snap context seq = %llu, snap_count = %u\n",
5392                 (unsigned long long)seq, (unsigned int)snap_count);
5393 out:
5394         kfree(reply_buf);
5395
5396         return ret;
5397 }
5398
5399 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5400                                         u64 snap_id)
5401 {
5402         size_t size;
5403         void *reply_buf;
5404         __le64 snapid;
5405         int ret;
5406         void *p;
5407         void *end;
5408         char *snap_name;
5409
5410         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5411         reply_buf = kmalloc(size, GFP_KERNEL);
5412         if (!reply_buf)
5413                 return ERR_PTR(-ENOMEM);
5414
5415         snapid = cpu_to_le64(snap_id);
5416         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5417                                   &rbd_dev->header_oloc, "get_snapshot_name",
5418                                   &snapid, sizeof(snapid), reply_buf, size);
5419         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5420         if (ret < 0) {
5421                 snap_name = ERR_PTR(ret);
5422                 goto out;
5423         }
5424
5425         p = reply_buf;
5426         end = reply_buf + ret;
5427         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5428         if (IS_ERR(snap_name))
5429                 goto out;
5430
5431         dout("  snap_id 0x%016llx snap_name = %s\n",
5432                 (unsigned long long)snap_id, snap_name);
5433 out:
5434         kfree(reply_buf);
5435
5436         return snap_name;
5437 }
5438
5439 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5440 {
5441         bool first_time = rbd_dev->header.object_prefix == NULL;
5442         int ret;
5443
5444         ret = rbd_dev_v2_image_size(rbd_dev);
5445         if (ret)
5446                 return ret;
5447
5448         if (first_time) {
5449                 ret = rbd_dev_v2_header_onetime(rbd_dev);
5450                 if (ret)
5451                         return ret;
5452         }
5453
5454         ret = rbd_dev_v2_snap_context(rbd_dev);
5455         if (ret && first_time) {
5456                 kfree(rbd_dev->header.object_prefix);
5457                 rbd_dev->header.object_prefix = NULL;
5458         }
5459
5460         return ret;
5461 }
5462
5463 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5464 {
5465         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5466
5467         if (rbd_dev->image_format == 1)
5468                 return rbd_dev_v1_header_info(rbd_dev);
5469
5470         return rbd_dev_v2_header_info(rbd_dev);
5471 }
5472
5473 /*
5474  * Skips over white space at *buf, and updates *buf to point to the
5475  * first found non-space character (if any). Returns the length of
5476  * the token (string of non-white space characters) found.  Note
5477  * that *buf must be terminated with '\0'.
5478  */
5479 static inline size_t next_token(const char **buf)
5480 {
5481         /*
5482         * These are the characters that produce nonzero for
5483         * isspace() in the "C" and "POSIX" locales.
5484         */
5485         const char *spaces = " \f\n\r\t\v";
5486
5487         *buf += strspn(*buf, spaces);   /* Find start of token */
5488
5489         return strcspn(*buf, spaces);   /* Return token length */
5490 }
5491
5492 /*
5493  * Finds the next token in *buf, dynamically allocates a buffer big
5494  * enough to hold a copy of it, and copies the token into the new
5495  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5496  * that a duplicate buffer is created even for a zero-length token.
5497  *
5498  * Returns a pointer to the newly-allocated duplicate, or a null
5499  * pointer if memory for the duplicate was not available.  If
5500  * the lenp argument is a non-null pointer, the length of the token
5501  * (not including the '\0') is returned in *lenp.
5502  *
5503  * If successful, the *buf pointer will be updated to point beyond
5504  * the end of the found token.
5505  *
5506  * Note: uses GFP_KERNEL for allocation.
5507  */
5508 static inline char *dup_token(const char **buf, size_t *lenp)
5509 {
5510         char *dup;
5511         size_t len;
5512
5513         len = next_token(buf);
5514         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5515         if (!dup)
5516                 return NULL;
5517         *(dup + len) = '\0';
5518         *buf += len;
5519
5520         if (lenp)
5521                 *lenp = len;
5522
5523         return dup;
5524 }
5525
5526 /*
5527  * Parse the options provided for an "rbd add" (i.e., rbd image
5528  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5529  * and the data written is passed here via a NUL-terminated buffer.
5530  * Returns 0 if successful or an error code otherwise.
5531  *
5532  * The information extracted from these options is recorded in
5533  * the other parameters which return dynamically-allocated
5534  * structures:
5535  *  ceph_opts
5536  *      The address of a pointer that will refer to a ceph options
5537  *      structure.  Caller must release the returned pointer using
5538  *      ceph_destroy_options() when it is no longer needed.
5539  *  rbd_opts
5540  *      Address of an rbd options pointer.  Fully initialized by
5541  *      this function; caller must release with kfree().
5542  *  spec
5543  *      Address of an rbd image specification pointer.  Fully
5544  *      initialized by this function based on parsed options.
5545  *      Caller must release with rbd_spec_put().
5546  *
5547  * The options passed take this form:
5548  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5549  * where:
5550  *  <mon_addrs>
5551  *      A comma-separated list of one or more monitor addresses.
5552  *      A monitor address is an ip address, optionally followed
5553  *      by a port number (separated by a colon).
5554  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5555  *  <options>
5556  *      A comma-separated list of ceph and/or rbd options.
5557  *  <pool_name>
5558  *      The name of the rados pool containing the rbd image.
5559  *  <image_name>
5560  *      The name of the image in that pool to map.
5561  *  <snap_id>
5562  *      An optional snapshot id.  If provided, the mapping will
5563  *      present data from the image at the time that snapshot was
5564  *      created.  The image head is used if no snapshot id is
5565  *      provided.  Snapshot mappings are always read-only.
5566  */
5567 static int rbd_add_parse_args(const char *buf,
5568                                 struct ceph_options **ceph_opts,
5569                                 struct rbd_options **opts,
5570                                 struct rbd_spec **rbd_spec)
5571 {
5572         size_t len;
5573         char *options;
5574         const char *mon_addrs;
5575         char *snap_name;
5576         size_t mon_addrs_size;
5577         struct rbd_spec *spec = NULL;
5578         struct rbd_options *rbd_opts = NULL;
5579         struct ceph_options *copts;
5580         int ret;
5581
5582         /* The first four tokens are required */
5583
5584         len = next_token(&buf);
5585         if (!len) {
5586                 rbd_warn(NULL, "no monitor address(es) provided");
5587                 return -EINVAL;
5588         }
5589         mon_addrs = buf;
5590         mon_addrs_size = len + 1;
5591         buf += len;
5592
5593         ret = -EINVAL;
5594         options = dup_token(&buf, NULL);
5595         if (!options)
5596                 return -ENOMEM;
5597         if (!*options) {
5598                 rbd_warn(NULL, "no options provided");
5599                 goto out_err;
5600         }
5601
5602         spec = rbd_spec_alloc();
5603         if (!spec)
5604                 goto out_mem;
5605
5606         spec->pool_name = dup_token(&buf, NULL);
5607         if (!spec->pool_name)
5608                 goto out_mem;
5609         if (!*spec->pool_name) {
5610                 rbd_warn(NULL, "no pool name provided");
5611                 goto out_err;
5612         }
5613
5614         spec->image_name = dup_token(&buf, NULL);
5615         if (!spec->image_name)
5616                 goto out_mem;
5617         if (!*spec->image_name) {
5618                 rbd_warn(NULL, "no image name provided");
5619                 goto out_err;
5620         }
5621
5622         /*
5623          * Snapshot name is optional; default is to use "-"
5624          * (indicating the head/no snapshot).
5625          */
5626         len = next_token(&buf);
5627         if (!len) {
5628                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5629                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5630         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5631                 ret = -ENAMETOOLONG;
5632                 goto out_err;
5633         }
5634         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5635         if (!snap_name)
5636                 goto out_mem;
5637         *(snap_name + len) = '\0';
5638         spec->snap_name = snap_name;
5639
5640         /* Initialize all rbd options to the defaults */
5641
5642         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5643         if (!rbd_opts)
5644                 goto out_mem;
5645
5646         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5647         rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5648         rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5649         rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5650
5651         copts = ceph_parse_options(options, mon_addrs,
5652                                         mon_addrs + mon_addrs_size - 1,
5653                                         parse_rbd_opts_token, rbd_opts);
5654         if (IS_ERR(copts)) {
5655                 ret = PTR_ERR(copts);
5656                 goto out_err;
5657         }
5658         kfree(options);
5659
5660         *ceph_opts = copts;
5661         *opts = rbd_opts;
5662         *rbd_spec = spec;
5663
5664         return 0;
5665 out_mem:
5666         ret = -ENOMEM;
5667 out_err:
5668         kfree(rbd_opts);
5669         rbd_spec_put(spec);
5670         kfree(options);
5671
5672         return ret;
5673 }
5674
5675 /*
5676  * Return pool id (>= 0) or a negative error code.
5677  */
5678 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5679 {
5680         struct ceph_options *opts = rbdc->client->options;
5681         u64 newest_epoch;
5682         int tries = 0;
5683         int ret;
5684
5685 again:
5686         ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5687         if (ret == -ENOENT && tries++ < 1) {
5688                 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5689                                             &newest_epoch);
5690                 if (ret < 0)
5691                         return ret;
5692
5693                 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5694                         ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5695                         (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5696                                                      newest_epoch,
5697                                                      opts->mount_timeout);
5698                         goto again;
5699                 } else {
5700                         /* the osdmap we have is new enough */
5701                         return -ENOENT;
5702                 }
5703         }
5704
5705         return ret;
5706 }
5707
5708 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5709 {
5710         down_write(&rbd_dev->lock_rwsem);
5711         if (__rbd_is_lock_owner(rbd_dev))
5712                 rbd_unlock(rbd_dev);
5713         up_write(&rbd_dev->lock_rwsem);
5714 }
5715
5716 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5717 {
5718         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5719                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5720                 return -EINVAL;
5721         }
5722
5723         /* FIXME: "rbd map --exclusive" should be in interruptible */
5724         down_read(&rbd_dev->lock_rwsem);
5725         rbd_wait_state_locked(rbd_dev);
5726         up_read(&rbd_dev->lock_rwsem);
5727         if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5728                 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5729                 return -EROFS;
5730         }
5731
5732         return 0;
5733 }
5734
5735 /*
5736  * An rbd format 2 image has a unique identifier, distinct from the
5737  * name given to it by the user.  Internally, that identifier is
5738  * what's used to specify the names of objects related to the image.
5739  *
5740  * A special "rbd id" object is used to map an rbd image name to its
5741  * id.  If that object doesn't exist, then there is no v2 rbd image
5742  * with the supplied name.
5743  *
5744  * This function will record the given rbd_dev's image_id field if
5745  * it can be determined, and in that case will return 0.  If any
5746  * errors occur a negative errno will be returned and the rbd_dev's
5747  * image_id field will be unchanged (and should be NULL).
5748  */
5749 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5750 {
5751         int ret;
5752         size_t size;
5753         CEPH_DEFINE_OID_ONSTACK(oid);
5754         void *response;
5755         char *image_id;
5756
5757         /*
5758          * When probing a parent image, the image id is already
5759          * known (and the image name likely is not).  There's no
5760          * need to fetch the image id again in this case.  We
5761          * do still need to set the image format though.
5762          */
5763         if (rbd_dev->spec->image_id) {
5764                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5765
5766                 return 0;
5767         }
5768
5769         /*
5770          * First, see if the format 2 image id file exists, and if
5771          * so, get the image's persistent id from it.
5772          */
5773         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5774                                rbd_dev->spec->image_name);
5775         if (ret)
5776                 return ret;
5777
5778         dout("rbd id object name is %s\n", oid.name);
5779
5780         /* Response will be an encoded string, which includes a length */
5781
5782         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5783         response = kzalloc(size, GFP_NOIO);
5784         if (!response) {
5785                 ret = -ENOMEM;
5786                 goto out;
5787         }
5788
5789         /* If it doesn't exist we'll assume it's a format 1 image */
5790
5791         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5792                                   "get_id", NULL, 0,
5793                                   response, RBD_IMAGE_ID_LEN_MAX);
5794         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5795         if (ret == -ENOENT) {
5796                 image_id = kstrdup("", GFP_KERNEL);
5797                 ret = image_id ? 0 : -ENOMEM;
5798                 if (!ret)
5799                         rbd_dev->image_format = 1;
5800         } else if (ret >= 0) {
5801                 void *p = response;
5802
5803                 image_id = ceph_extract_encoded_string(&p, p + ret,
5804                                                 NULL, GFP_NOIO);
5805                 ret = PTR_ERR_OR_ZERO(image_id);
5806                 if (!ret)
5807                         rbd_dev->image_format = 2;
5808         }
5809
5810         if (!ret) {
5811                 rbd_dev->spec->image_id = image_id;
5812                 dout("image_id is %s\n", image_id);
5813         }
5814 out:
5815         kfree(response);
5816         ceph_oid_destroy(&oid);
5817         return ret;
5818 }
5819
5820 /*
5821  * Undo whatever state changes are made by v1 or v2 header info
5822  * call.
5823  */
5824 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5825 {
5826         struct rbd_image_header *header;
5827
5828         rbd_dev_parent_put(rbd_dev);
5829
5830         /* Free dynamic fields from the header, then zero it out */
5831
5832         header = &rbd_dev->header;
5833         ceph_put_snap_context(header->snapc);
5834         kfree(header->snap_sizes);
5835         kfree(header->snap_names);
5836         kfree(header->object_prefix);
5837         memset(header, 0, sizeof (*header));
5838 }
5839
5840 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5841 {
5842         int ret;
5843
5844         ret = rbd_dev_v2_object_prefix(rbd_dev);
5845         if (ret)
5846                 goto out_err;
5847
5848         /*
5849          * Get the and check features for the image.  Currently the
5850          * features are assumed to never change.
5851          */
5852         ret = rbd_dev_v2_features(rbd_dev);
5853         if (ret)
5854                 goto out_err;
5855
5856         /* If the image supports fancy striping, get its parameters */
5857
5858         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5859                 ret = rbd_dev_v2_striping_info(rbd_dev);
5860                 if (ret < 0)
5861                         goto out_err;
5862         }
5863
5864         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5865                 ret = rbd_dev_v2_data_pool(rbd_dev);
5866                 if (ret)
5867                         goto out_err;
5868         }
5869
5870         rbd_init_layout(rbd_dev);
5871         return 0;
5872
5873 out_err:
5874         rbd_dev->header.features = 0;
5875         kfree(rbd_dev->header.object_prefix);
5876         rbd_dev->header.object_prefix = NULL;
5877         return ret;
5878 }
5879
5880 /*
5881  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5882  * rbd_dev_image_probe() recursion depth, which means it's also the
5883  * length of the already discovered part of the parent chain.
5884  */
5885 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5886 {
5887         struct rbd_device *parent = NULL;
5888         int ret;
5889
5890         if (!rbd_dev->parent_spec)
5891                 return 0;
5892
5893         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5894                 pr_info("parent chain is too long (%d)\n", depth);
5895                 ret = -EINVAL;
5896                 goto out_err;
5897         }
5898
5899         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5900         if (!parent) {
5901                 ret = -ENOMEM;
5902                 goto out_err;
5903         }
5904
5905         /*
5906          * Images related by parent/child relationships always share
5907          * rbd_client and spec/parent_spec, so bump their refcounts.
5908          */
5909         __rbd_get_client(rbd_dev->rbd_client);
5910         rbd_spec_get(rbd_dev->parent_spec);
5911
5912         ret = rbd_dev_image_probe(parent, depth);
5913         if (ret < 0)
5914                 goto out_err;
5915
5916         rbd_dev->parent = parent;
5917         atomic_set(&rbd_dev->parent_ref, 1);
5918         return 0;
5919
5920 out_err:
5921         rbd_dev_unparent(rbd_dev);
5922         rbd_dev_destroy(parent);
5923         return ret;
5924 }
5925
5926 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5927 {
5928         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5929         rbd_dev_mapping_clear(rbd_dev);
5930         rbd_free_disk(rbd_dev);
5931         if (!single_major)
5932                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5933 }
5934
5935 /*
5936  * rbd_dev->header_rwsem must be locked for write and will be unlocked
5937  * upon return.
5938  */
5939 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5940 {
5941         int ret;
5942
5943         /* Record our major and minor device numbers. */
5944
5945         if (!single_major) {
5946                 ret = register_blkdev(0, rbd_dev->name);
5947                 if (ret < 0)
5948                         goto err_out_unlock;
5949
5950                 rbd_dev->major = ret;
5951                 rbd_dev->minor = 0;
5952         } else {
5953                 rbd_dev->major = rbd_major;
5954                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5955         }
5956
5957         /* Set up the blkdev mapping. */
5958
5959         ret = rbd_init_disk(rbd_dev);
5960         if (ret)
5961                 goto err_out_blkdev;
5962
5963         ret = rbd_dev_mapping_set(rbd_dev);
5964         if (ret)
5965                 goto err_out_disk;
5966
5967         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5968         set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
5969
5970         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5971         if (ret)
5972                 goto err_out_mapping;
5973
5974         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5975         up_write(&rbd_dev->header_rwsem);
5976         return 0;
5977
5978 err_out_mapping:
5979         rbd_dev_mapping_clear(rbd_dev);
5980 err_out_disk:
5981         rbd_free_disk(rbd_dev);
5982 err_out_blkdev:
5983         if (!single_major)
5984                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5985 err_out_unlock:
5986         up_write(&rbd_dev->header_rwsem);
5987         return ret;
5988 }
5989
5990 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5991 {
5992         struct rbd_spec *spec = rbd_dev->spec;
5993         int ret;
5994
5995         /* Record the header object name for this rbd image. */
5996
5997         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5998         if (rbd_dev->image_format == 1)
5999                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6000                                        spec->image_name, RBD_SUFFIX);
6001         else
6002                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6003                                        RBD_HEADER_PREFIX, spec->image_id);
6004
6005         return ret;
6006 }
6007
6008 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6009 {
6010         rbd_dev_unprobe(rbd_dev);
6011         if (rbd_dev->opts)
6012                 rbd_unregister_watch(rbd_dev);
6013         rbd_dev->image_format = 0;
6014         kfree(rbd_dev->spec->image_id);
6015         rbd_dev->spec->image_id = NULL;
6016 }
6017
6018 /*
6019  * Probe for the existence of the header object for the given rbd
6020  * device.  If this image is the one being mapped (i.e., not a
6021  * parent), initiate a watch on its header object before using that
6022  * object to get detailed information about the rbd image.
6023  */
6024 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6025 {
6026         int ret;
6027
6028         /*
6029          * Get the id from the image id object.  Unless there's an
6030          * error, rbd_dev->spec->image_id will be filled in with
6031          * a dynamically-allocated string, and rbd_dev->image_format
6032          * will be set to either 1 or 2.
6033          */
6034         ret = rbd_dev_image_id(rbd_dev);
6035         if (ret)
6036                 return ret;
6037
6038         ret = rbd_dev_header_name(rbd_dev);
6039         if (ret)
6040                 goto err_out_format;
6041
6042         if (!depth) {
6043                 ret = rbd_register_watch(rbd_dev);
6044                 if (ret) {
6045                         if (ret == -ENOENT)
6046                                 pr_info("image %s/%s does not exist\n",
6047                                         rbd_dev->spec->pool_name,
6048                                         rbd_dev->spec->image_name);
6049                         goto err_out_format;
6050                 }
6051         }
6052
6053         ret = rbd_dev_header_info(rbd_dev);
6054         if (ret)
6055                 goto err_out_watch;
6056
6057         /*
6058          * If this image is the one being mapped, we have pool name and
6059          * id, image name and id, and snap name - need to fill snap id.
6060          * Otherwise this is a parent image, identified by pool, image
6061          * and snap ids - need to fill in names for those ids.
6062          */
6063         if (!depth)
6064                 ret = rbd_spec_fill_snap_id(rbd_dev);
6065         else
6066                 ret = rbd_spec_fill_names(rbd_dev);
6067         if (ret) {
6068                 if (ret == -ENOENT)
6069                         pr_info("snap %s/%s@%s does not exist\n",
6070                                 rbd_dev->spec->pool_name,
6071                                 rbd_dev->spec->image_name,
6072                                 rbd_dev->spec->snap_name);
6073                 goto err_out_probe;
6074         }
6075
6076         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6077                 ret = rbd_dev_v2_parent_info(rbd_dev);
6078                 if (ret)
6079                         goto err_out_probe;
6080
6081                 /*
6082                  * Need to warn users if this image is the one being
6083                  * mapped and has a parent.
6084                  */
6085                 if (!depth && rbd_dev->parent_spec)
6086                         rbd_warn(rbd_dev,
6087                                  "WARNING: kernel layering is EXPERIMENTAL!");
6088         }
6089
6090         ret = rbd_dev_probe_parent(rbd_dev, depth);
6091         if (ret)
6092                 goto err_out_probe;
6093
6094         dout("discovered format %u image, header name is %s\n",
6095                 rbd_dev->image_format, rbd_dev->header_oid.name);
6096         return 0;
6097
6098 err_out_probe:
6099         rbd_dev_unprobe(rbd_dev);
6100 err_out_watch:
6101         if (!depth)
6102                 rbd_unregister_watch(rbd_dev);
6103 err_out_format:
6104         rbd_dev->image_format = 0;
6105         kfree(rbd_dev->spec->image_id);
6106         rbd_dev->spec->image_id = NULL;
6107         return ret;
6108 }
6109
6110 static ssize_t do_rbd_add(struct bus_type *bus,
6111                           const char *buf,
6112                           size_t count)
6113 {
6114         struct rbd_device *rbd_dev = NULL;
6115         struct ceph_options *ceph_opts = NULL;
6116         struct rbd_options *rbd_opts = NULL;
6117         struct rbd_spec *spec = NULL;
6118         struct rbd_client *rbdc;
6119         int rc;
6120
6121         if (!try_module_get(THIS_MODULE))
6122                 return -ENODEV;
6123
6124         /* parse add command */
6125         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6126         if (rc < 0)
6127                 goto out;
6128
6129         rbdc = rbd_get_client(ceph_opts);
6130         if (IS_ERR(rbdc)) {
6131                 rc = PTR_ERR(rbdc);
6132                 goto err_out_args;
6133         }
6134
6135         /* pick the pool */
6136         rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6137         if (rc < 0) {
6138                 if (rc == -ENOENT)
6139                         pr_info("pool %s does not exist\n", spec->pool_name);
6140                 goto err_out_client;
6141         }
6142         spec->pool_id = (u64)rc;
6143
6144         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6145         if (!rbd_dev) {
6146                 rc = -ENOMEM;
6147                 goto err_out_client;
6148         }
6149         rbdc = NULL;            /* rbd_dev now owns this */
6150         spec = NULL;            /* rbd_dev now owns this */
6151         rbd_opts = NULL;        /* rbd_dev now owns this */
6152
6153         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6154         if (!rbd_dev->config_info) {
6155                 rc = -ENOMEM;
6156                 goto err_out_rbd_dev;
6157         }
6158
6159         down_write(&rbd_dev->header_rwsem);
6160         rc = rbd_dev_image_probe(rbd_dev, 0);
6161         if (rc < 0) {
6162                 up_write(&rbd_dev->header_rwsem);
6163                 goto err_out_rbd_dev;
6164         }
6165
6166         /* If we are mapping a snapshot it must be marked read-only */
6167         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6168                 rbd_dev->opts->read_only = true;
6169
6170         rc = rbd_dev_device_setup(rbd_dev);
6171         if (rc)
6172                 goto err_out_image_probe;
6173
6174         if (rbd_dev->opts->exclusive) {
6175                 rc = rbd_add_acquire_lock(rbd_dev);
6176                 if (rc)
6177                         goto err_out_device_setup;
6178         }
6179
6180         /* Everything's ready.  Announce the disk to the world. */
6181
6182         rc = device_add(&rbd_dev->dev);
6183         if (rc)
6184                 goto err_out_image_lock;
6185
6186         add_disk(rbd_dev->disk);
6187         /* see rbd_init_disk() */
6188         blk_put_queue(rbd_dev->disk->queue);
6189
6190         spin_lock(&rbd_dev_list_lock);
6191         list_add_tail(&rbd_dev->node, &rbd_dev_list);
6192         spin_unlock(&rbd_dev_list_lock);
6193
6194         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6195                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6196                 rbd_dev->header.features);
6197         rc = count;
6198 out:
6199         module_put(THIS_MODULE);
6200         return rc;
6201
6202 err_out_image_lock:
6203         rbd_dev_image_unlock(rbd_dev);
6204 err_out_device_setup:
6205         rbd_dev_device_release(rbd_dev);
6206 err_out_image_probe:
6207         rbd_dev_image_release(rbd_dev);
6208 err_out_rbd_dev:
6209         rbd_dev_destroy(rbd_dev);
6210 err_out_client:
6211         rbd_put_client(rbdc);
6212 err_out_args:
6213         rbd_spec_put(spec);
6214         kfree(rbd_opts);
6215         goto out;
6216 }
6217
6218 static ssize_t rbd_add(struct bus_type *bus,
6219                        const char *buf,
6220                        size_t count)
6221 {
6222         if (single_major)
6223                 return -EINVAL;
6224
6225         return do_rbd_add(bus, buf, count);
6226 }
6227
6228 static ssize_t rbd_add_single_major(struct bus_type *bus,
6229                                     const char *buf,
6230                                     size_t count)
6231 {
6232         return do_rbd_add(bus, buf, count);
6233 }
6234
6235 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6236 {
6237         while (rbd_dev->parent) {
6238                 struct rbd_device *first = rbd_dev;
6239                 struct rbd_device *second = first->parent;
6240                 struct rbd_device *third;
6241
6242                 /*
6243                  * Follow to the parent with no grandparent and
6244                  * remove it.
6245                  */
6246                 while (second && (third = second->parent)) {
6247                         first = second;
6248                         second = third;
6249                 }
6250                 rbd_assert(second);
6251                 rbd_dev_image_release(second);
6252                 rbd_dev_destroy(second);
6253                 first->parent = NULL;
6254                 first->parent_overlap = 0;
6255
6256                 rbd_assert(first->parent_spec);
6257                 rbd_spec_put(first->parent_spec);
6258                 first->parent_spec = NULL;
6259         }
6260 }
6261
6262 static ssize_t do_rbd_remove(struct bus_type *bus,
6263                              const char *buf,
6264                              size_t count)
6265 {
6266         struct rbd_device *rbd_dev = NULL;
6267         struct list_head *tmp;
6268         int dev_id;
6269         char opt_buf[6];
6270         bool already = false;
6271         bool force = false;
6272         int ret;
6273
6274         dev_id = -1;
6275         opt_buf[0] = '\0';
6276         sscanf(buf, "%d %5s", &dev_id, opt_buf);
6277         if (dev_id < 0) {
6278                 pr_err("dev_id out of range\n");
6279                 return -EINVAL;
6280         }
6281         if (opt_buf[0] != '\0') {
6282                 if (!strcmp(opt_buf, "force")) {
6283                         force = true;
6284                 } else {
6285                         pr_err("bad remove option at '%s'\n", opt_buf);
6286                         return -EINVAL;
6287                 }
6288         }
6289
6290         ret = -ENOENT;
6291         spin_lock(&rbd_dev_list_lock);
6292         list_for_each(tmp, &rbd_dev_list) {
6293                 rbd_dev = list_entry(tmp, struct rbd_device, node);
6294                 if (rbd_dev->dev_id == dev_id) {
6295                         ret = 0;
6296                         break;
6297                 }
6298         }
6299         if (!ret) {
6300                 spin_lock_irq(&rbd_dev->lock);
6301                 if (rbd_dev->open_count && !force)
6302                         ret = -EBUSY;
6303                 else
6304                         already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6305                                                         &rbd_dev->flags);
6306                 spin_unlock_irq(&rbd_dev->lock);
6307         }
6308         spin_unlock(&rbd_dev_list_lock);
6309         if (ret < 0 || already)
6310                 return ret;
6311
6312         if (force) {
6313                 /*
6314                  * Prevent new IO from being queued and wait for existing
6315                  * IO to complete/fail.
6316                  */
6317                 blk_mq_freeze_queue(rbd_dev->disk->queue);
6318                 blk_set_queue_dying(rbd_dev->disk->queue);
6319         }
6320
6321         del_gendisk(rbd_dev->disk);
6322         spin_lock(&rbd_dev_list_lock);
6323         list_del_init(&rbd_dev->node);
6324         spin_unlock(&rbd_dev_list_lock);
6325         device_del(&rbd_dev->dev);
6326
6327         rbd_dev_image_unlock(rbd_dev);
6328         rbd_dev_device_release(rbd_dev);
6329         rbd_dev_image_release(rbd_dev);
6330         rbd_dev_destroy(rbd_dev);
6331         return count;
6332 }
6333
6334 static ssize_t rbd_remove(struct bus_type *bus,
6335                           const char *buf,
6336                           size_t count)
6337 {
6338         if (single_major)
6339                 return -EINVAL;
6340
6341         return do_rbd_remove(bus, buf, count);
6342 }
6343
6344 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6345                                        const char *buf,
6346                                        size_t count)
6347 {
6348         return do_rbd_remove(bus, buf, count);
6349 }
6350
6351 /*
6352  * create control files in sysfs
6353  * /sys/bus/rbd/...
6354  */
6355 static int rbd_sysfs_init(void)
6356 {
6357         int ret;
6358
6359         ret = device_register(&rbd_root_dev);
6360         if (ret < 0)
6361                 return ret;
6362
6363         ret = bus_register(&rbd_bus_type);
6364         if (ret < 0)
6365                 device_unregister(&rbd_root_dev);
6366
6367         return ret;
6368 }
6369
6370 static void rbd_sysfs_cleanup(void)
6371 {
6372         bus_unregister(&rbd_bus_type);
6373         device_unregister(&rbd_root_dev);
6374 }
6375
6376 static int rbd_slab_init(void)
6377 {
6378         rbd_assert(!rbd_img_request_cache);
6379         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6380         if (!rbd_img_request_cache)
6381                 return -ENOMEM;
6382
6383         rbd_assert(!rbd_obj_request_cache);
6384         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6385         if (!rbd_obj_request_cache)
6386                 goto out_err;
6387
6388         rbd_assert(!rbd_bio_clone);
6389         rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6390         if (!rbd_bio_clone)
6391                 goto out_err_clone;
6392
6393         return 0;
6394
6395 out_err_clone:
6396         kmem_cache_destroy(rbd_obj_request_cache);
6397         rbd_obj_request_cache = NULL;
6398 out_err:
6399         kmem_cache_destroy(rbd_img_request_cache);
6400         rbd_img_request_cache = NULL;
6401         return -ENOMEM;
6402 }
6403
6404 static void rbd_slab_exit(void)
6405 {
6406         rbd_assert(rbd_obj_request_cache);
6407         kmem_cache_destroy(rbd_obj_request_cache);
6408         rbd_obj_request_cache = NULL;
6409
6410         rbd_assert(rbd_img_request_cache);
6411         kmem_cache_destroy(rbd_img_request_cache);
6412         rbd_img_request_cache = NULL;
6413
6414         rbd_assert(rbd_bio_clone);
6415         bioset_free(rbd_bio_clone);
6416         rbd_bio_clone = NULL;
6417 }
6418
6419 static int __init rbd_init(void)
6420 {
6421         int rc;
6422
6423         if (!libceph_compatible(NULL)) {
6424                 rbd_warn(NULL, "libceph incompatibility (quitting)");
6425                 return -EINVAL;
6426         }
6427
6428         rc = rbd_slab_init();
6429         if (rc)
6430                 return rc;
6431
6432         /*
6433          * The number of active work items is limited by the number of
6434          * rbd devices * queue depth, so leave @max_active at default.
6435          */
6436         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6437         if (!rbd_wq) {
6438                 rc = -ENOMEM;
6439                 goto err_out_slab;
6440         }
6441
6442         if (single_major) {
6443                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6444                 if (rbd_major < 0) {
6445                         rc = rbd_major;
6446                         goto err_out_wq;
6447                 }
6448         }
6449
6450         rc = rbd_sysfs_init();
6451         if (rc)
6452                 goto err_out_blkdev;
6453
6454         if (single_major)
6455                 pr_info("loaded (major %d)\n", rbd_major);
6456         else
6457                 pr_info("loaded\n");
6458
6459         return 0;
6460
6461 err_out_blkdev:
6462         if (single_major)
6463                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6464 err_out_wq:
6465         destroy_workqueue(rbd_wq);
6466 err_out_slab:
6467         rbd_slab_exit();
6468         return rc;
6469 }
6470
6471 static void __exit rbd_exit(void)
6472 {
6473         ida_destroy(&rbd_dev_id_ida);
6474         rbd_sysfs_cleanup();
6475         if (single_major)
6476                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6477         destroy_workqueue(rbd_wq);
6478         rbd_slab_exit();
6479 }
6480
6481 module_init(rbd_init);
6482 module_exit(rbd_exit);
6483
6484 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6485 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6486 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6487 /* following authorship retained from original osdblk.c */
6488 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6489
6490 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6491 MODULE_LICENSE("GPL");