Merge tag 'platform-drivers-x86-v4.17-1' of git://git.infradead.org/linux-platform...
[sfrench/cifs-2.6.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
38
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
48
49 #include "rbd_types.h"
50
51 #define RBD_DEBUG       /* Activate rbd_assert() calls */
52
53 /*
54  * Increment the given counter and return its updated value.
55  * If the counter is already 0 it will not be incremented.
56  * If the counter is already at its maximum value returns
57  * -EINVAL without updating it.
58  */
59 static int atomic_inc_return_safe(atomic_t *v)
60 {
61         unsigned int counter;
62
63         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
64         if (counter <= (unsigned int)INT_MAX)
65                 return (int)counter;
66
67         atomic_dec(v);
68
69         return -EINVAL;
70 }
71
72 /* Decrement the counter.  Return the resulting value, or -EINVAL */
73 static int atomic_dec_return_safe(atomic_t *v)
74 {
75         int counter;
76
77         counter = atomic_dec_return(v);
78         if (counter >= 0)
79                 return counter;
80
81         atomic_inc(v);
82
83         return -EINVAL;
84 }
85
86 #define RBD_DRV_NAME "rbd"
87
88 #define RBD_MINORS_PER_MAJOR            256
89 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
90
91 #define RBD_MAX_PARENT_CHAIN_LEN        16
92
93 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
94 #define RBD_MAX_SNAP_NAME_LEN   \
95                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
96
97 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
98
99 #define RBD_SNAP_HEAD_NAME      "-"
100
101 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
102
103 /* This allows a single page to hold an image name sent by OSD */
104 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
105 #define RBD_IMAGE_ID_LEN_MAX    64
106
107 #define RBD_OBJ_PREFIX_LEN_MAX  64
108
109 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
110 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
111
112 /* Feature bits */
113
114 #define RBD_FEATURE_LAYERING            (1ULL<<0)
115 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
116 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
117 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
118 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
119
120 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
121                                  RBD_FEATURE_STRIPINGV2 |       \
122                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
123                                  RBD_FEATURE_DATA_POOL |        \
124                                  RBD_FEATURE_OPERATIONS)
125
126 /* Features supported by this (client software) implementation. */
127
128 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
129
130 /*
131  * An RBD device name will be "rbd#", where the "rbd" comes from
132  * RBD_DRV_NAME above, and # is a unique integer identifier.
133  */
134 #define DEV_NAME_LEN            32
135
136 /*
137  * block device image metadata (in-memory version)
138  */
139 struct rbd_image_header {
140         /* These six fields never change for a given rbd image */
141         char *object_prefix;
142         __u8 obj_order;
143         u64 stripe_unit;
144         u64 stripe_count;
145         s64 data_pool_id;
146         u64 features;           /* Might be changeable someday? */
147
148         /* The remaining fields need to be updated occasionally */
149         u64 image_size;
150         struct ceph_snap_context *snapc;
151         char *snap_names;       /* format 1 only */
152         u64 *snap_sizes;        /* format 1 only */
153 };
154
155 /*
156  * An rbd image specification.
157  *
158  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
159  * identify an image.  Each rbd_dev structure includes a pointer to
160  * an rbd_spec structure that encapsulates this identity.
161  *
162  * Each of the id's in an rbd_spec has an associated name.  For a
163  * user-mapped image, the names are supplied and the id's associated
164  * with them are looked up.  For a layered image, a parent image is
165  * defined by the tuple, and the names are looked up.
166  *
167  * An rbd_dev structure contains a parent_spec pointer which is
168  * non-null if the image it represents is a child in a layered
169  * image.  This pointer will refer to the rbd_spec structure used
170  * by the parent rbd_dev for its own identity (i.e., the structure
171  * is shared between the parent and child).
172  *
173  * Since these structures are populated once, during the discovery
174  * phase of image construction, they are effectively immutable so
175  * we make no effort to synchronize access to them.
176  *
177  * Note that code herein does not assume the image name is known (it
178  * could be a null pointer).
179  */
180 struct rbd_spec {
181         u64             pool_id;
182         const char      *pool_name;
183
184         const char      *image_id;
185         const char      *image_name;
186
187         u64             snap_id;
188         const char      *snap_name;
189
190         struct kref     kref;
191 };
192
193 /*
194  * an instance of the client.  multiple devices may share an rbd client.
195  */
196 struct rbd_client {
197         struct ceph_client      *client;
198         struct kref             kref;
199         struct list_head        node;
200 };
201
202 struct rbd_img_request;
203 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
204
205 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
206
207 struct rbd_obj_request;
208 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
209
210 enum obj_request_type {
211         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
212 };
213
214 enum obj_operation_type {
215         OBJ_OP_WRITE,
216         OBJ_OP_READ,
217         OBJ_OP_DISCARD,
218 };
219
220 enum obj_req_flags {
221         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
222         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
223         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
224         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
225 };
226
227 struct rbd_obj_request {
228         u64                     object_no;
229         u64                     offset;         /* object start byte */
230         u64                     length;         /* bytes from offset */
231         unsigned long           flags;
232
233         /*
234          * An object request associated with an image will have its
235          * img_data flag set; a standalone object request will not.
236          *
237          * A standalone object request will have which == BAD_WHICH
238          * and a null obj_request pointer.
239          *
240          * An object request initiated in support of a layered image
241          * object (to check for its existence before a write) will
242          * have which == BAD_WHICH and a non-null obj_request pointer.
243          *
244          * Finally, an object request for rbd image data will have
245          * which != BAD_WHICH, and will have a non-null img_request
246          * pointer.  The value of which will be in the range
247          * 0..(img_request->obj_request_count-1).
248          */
249         union {
250                 struct rbd_obj_request  *obj_request;   /* STAT op */
251                 struct {
252                         struct rbd_img_request  *img_request;
253                         u64                     img_offset;
254                         /* links for img_request->obj_requests list */
255                         struct list_head        links;
256                 };
257         };
258         u32                     which;          /* posn image request list */
259
260         enum obj_request_type   type;
261         union {
262                 struct bio      *bio_list;
263                 struct {
264                         struct page     **pages;
265                         u32             page_count;
266                 };
267         };
268         struct page             **copyup_pages;
269         u32                     copyup_page_count;
270
271         struct ceph_osd_request *osd_req;
272
273         u64                     xferred;        /* bytes transferred */
274         int                     result;
275
276         rbd_obj_callback_t      callback;
277
278         struct kref             kref;
279 };
280
281 enum img_req_flags {
282         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
283         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
284         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
285         IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
286 };
287
288 struct rbd_img_request {
289         struct rbd_device       *rbd_dev;
290         u64                     offset; /* starting image byte offset */
291         u64                     length; /* byte count from offset */
292         unsigned long           flags;
293         union {
294                 u64                     snap_id;        /* for reads */
295                 struct ceph_snap_context *snapc;        /* for writes */
296         };
297         union {
298                 struct request          *rq;            /* block request */
299                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
300         };
301         struct page             **copyup_pages;
302         u32                     copyup_page_count;
303         spinlock_t              completion_lock;/* protects next_completion */
304         u32                     next_completion;
305         rbd_img_callback_t      callback;
306         u64                     xferred;/* aggregate bytes transferred */
307         int                     result; /* first nonzero obj_request result */
308
309         u32                     obj_request_count;
310         struct list_head        obj_requests;   /* rbd_obj_request structs */
311
312         struct kref             kref;
313 };
314
315 #define for_each_obj_request(ireq, oreq) \
316         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
317 #define for_each_obj_request_from(ireq, oreq) \
318         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
319 #define for_each_obj_request_safe(ireq, oreq, n) \
320         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
321
322 enum rbd_watch_state {
323         RBD_WATCH_STATE_UNREGISTERED,
324         RBD_WATCH_STATE_REGISTERED,
325         RBD_WATCH_STATE_ERROR,
326 };
327
328 enum rbd_lock_state {
329         RBD_LOCK_STATE_UNLOCKED,
330         RBD_LOCK_STATE_LOCKED,
331         RBD_LOCK_STATE_RELEASING,
332 };
333
334 /* WatchNotify::ClientId */
335 struct rbd_client_id {
336         u64 gid;
337         u64 handle;
338 };
339
340 struct rbd_mapping {
341         u64                     size;
342         u64                     features;
343 };
344
345 /*
346  * a single device
347  */
348 struct rbd_device {
349         int                     dev_id;         /* blkdev unique id */
350
351         int                     major;          /* blkdev assigned major */
352         int                     minor;
353         struct gendisk          *disk;          /* blkdev's gendisk and rq */
354
355         u32                     image_format;   /* Either 1 or 2 */
356         struct rbd_client       *rbd_client;
357
358         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
359
360         spinlock_t              lock;           /* queue, flags, open_count */
361
362         struct rbd_image_header header;
363         unsigned long           flags;          /* possibly lock protected */
364         struct rbd_spec         *spec;
365         struct rbd_options      *opts;
366         char                    *config_info;   /* add{,_single_major} string */
367
368         struct ceph_object_id   header_oid;
369         struct ceph_object_locator header_oloc;
370
371         struct ceph_file_layout layout;         /* used for all rbd requests */
372
373         struct mutex            watch_mutex;
374         enum rbd_watch_state    watch_state;
375         struct ceph_osd_linger_request *watch_handle;
376         u64                     watch_cookie;
377         struct delayed_work     watch_dwork;
378
379         struct rw_semaphore     lock_rwsem;
380         enum rbd_lock_state     lock_state;
381         char                    lock_cookie[32];
382         struct rbd_client_id    owner_cid;
383         struct work_struct      acquired_lock_work;
384         struct work_struct      released_lock_work;
385         struct delayed_work     lock_dwork;
386         struct work_struct      unlock_work;
387         wait_queue_head_t       lock_waitq;
388
389         struct workqueue_struct *task_wq;
390
391         struct rbd_spec         *parent_spec;
392         u64                     parent_overlap;
393         atomic_t                parent_ref;
394         struct rbd_device       *parent;
395
396         /* Block layer tags. */
397         struct blk_mq_tag_set   tag_set;
398
399         /* protects updating the header */
400         struct rw_semaphore     header_rwsem;
401
402         struct rbd_mapping      mapping;
403
404         struct list_head        node;
405
406         /* sysfs related */
407         struct device           dev;
408         unsigned long           open_count;     /* protected by lock */
409 };
410
411 /*
412  * Flag bits for rbd_dev->flags:
413  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
414  *   by rbd_dev->lock
415  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
416  */
417 enum rbd_dev_flags {
418         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
419         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
420         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
421 };
422
423 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
424
425 static LIST_HEAD(rbd_dev_list);    /* devices */
426 static DEFINE_SPINLOCK(rbd_dev_list_lock);
427
428 static LIST_HEAD(rbd_client_list);              /* clients */
429 static DEFINE_SPINLOCK(rbd_client_list_lock);
430
431 /* Slab caches for frequently-allocated structures */
432
433 static struct kmem_cache        *rbd_img_request_cache;
434 static struct kmem_cache        *rbd_obj_request_cache;
435
436 static struct bio_set           *rbd_bio_clone;
437
438 static int rbd_major;
439 static DEFINE_IDA(rbd_dev_id_ida);
440
441 static struct workqueue_struct *rbd_wq;
442
443 /*
444  * single-major requires >= 0.75 version of userspace rbd utility.
445  */
446 static bool single_major = true;
447 module_param(single_major, bool, S_IRUGO);
448 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
449
450 static int rbd_img_request_submit(struct rbd_img_request *img_request);
451
452 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
453                        size_t count);
454 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
455                           size_t count);
456 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
457                                     size_t count);
458 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
459                                        size_t count);
460 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
461 static void rbd_spec_put(struct rbd_spec *spec);
462
463 static int rbd_dev_id_to_minor(int dev_id)
464 {
465         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
466 }
467
468 static int minor_to_rbd_dev_id(int minor)
469 {
470         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
471 }
472
473 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
474 {
475         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
476                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
477 }
478
479 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
480 {
481         bool is_lock_owner;
482
483         down_read(&rbd_dev->lock_rwsem);
484         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
485         up_read(&rbd_dev->lock_rwsem);
486         return is_lock_owner;
487 }
488
489 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
490 {
491         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
492 }
493
494 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
495 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
496 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
497 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
498 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
499
500 static struct attribute *rbd_bus_attrs[] = {
501         &bus_attr_add.attr,
502         &bus_attr_remove.attr,
503         &bus_attr_add_single_major.attr,
504         &bus_attr_remove_single_major.attr,
505         &bus_attr_supported_features.attr,
506         NULL,
507 };
508
509 static umode_t rbd_bus_is_visible(struct kobject *kobj,
510                                   struct attribute *attr, int index)
511 {
512         if (!single_major &&
513             (attr == &bus_attr_add_single_major.attr ||
514              attr == &bus_attr_remove_single_major.attr))
515                 return 0;
516
517         return attr->mode;
518 }
519
520 static const struct attribute_group rbd_bus_group = {
521         .attrs = rbd_bus_attrs,
522         .is_visible = rbd_bus_is_visible,
523 };
524 __ATTRIBUTE_GROUPS(rbd_bus);
525
526 static struct bus_type rbd_bus_type = {
527         .name           = "rbd",
528         .bus_groups     = rbd_bus_groups,
529 };
530
531 static void rbd_root_dev_release(struct device *dev)
532 {
533 }
534
535 static struct device rbd_root_dev = {
536         .init_name =    "rbd",
537         .release =      rbd_root_dev_release,
538 };
539
540 static __printf(2, 3)
541 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
542 {
543         struct va_format vaf;
544         va_list args;
545
546         va_start(args, fmt);
547         vaf.fmt = fmt;
548         vaf.va = &args;
549
550         if (!rbd_dev)
551                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
552         else if (rbd_dev->disk)
553                 printk(KERN_WARNING "%s: %s: %pV\n",
554                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
555         else if (rbd_dev->spec && rbd_dev->spec->image_name)
556                 printk(KERN_WARNING "%s: image %s: %pV\n",
557                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
558         else if (rbd_dev->spec && rbd_dev->spec->image_id)
559                 printk(KERN_WARNING "%s: id %s: %pV\n",
560                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
561         else    /* punt */
562                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
563                         RBD_DRV_NAME, rbd_dev, &vaf);
564         va_end(args);
565 }
566
567 #ifdef RBD_DEBUG
568 #define rbd_assert(expr)                                                \
569                 if (unlikely(!(expr))) {                                \
570                         printk(KERN_ERR "\nAssertion failure in %s() "  \
571                                                 "at line %d:\n\n"       \
572                                         "\trbd_assert(%s);\n\n",        \
573                                         __func__, __LINE__, #expr);     \
574                         BUG();                                          \
575                 }
576 #else /* !RBD_DEBUG */
577 #  define rbd_assert(expr)      ((void) 0)
578 #endif /* !RBD_DEBUG */
579
580 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
581 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
582 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
583 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
584
585 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
586 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
587 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
588 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
589 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
590                                         u64 snap_id);
591 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
592                                 u8 *order, u64 *snap_size);
593 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
594                 u64 *snap_features);
595
596 static int rbd_open(struct block_device *bdev, fmode_t mode)
597 {
598         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
599         bool removing = false;
600
601         spin_lock_irq(&rbd_dev->lock);
602         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
603                 removing = true;
604         else
605                 rbd_dev->open_count++;
606         spin_unlock_irq(&rbd_dev->lock);
607         if (removing)
608                 return -ENOENT;
609
610         (void) get_device(&rbd_dev->dev);
611
612         return 0;
613 }
614
615 static void rbd_release(struct gendisk *disk, fmode_t mode)
616 {
617         struct rbd_device *rbd_dev = disk->private_data;
618         unsigned long open_count_before;
619
620         spin_lock_irq(&rbd_dev->lock);
621         open_count_before = rbd_dev->open_count--;
622         spin_unlock_irq(&rbd_dev->lock);
623         rbd_assert(open_count_before > 0);
624
625         put_device(&rbd_dev->dev);
626 }
627
628 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
629 {
630         int ro;
631
632         if (get_user(ro, (int __user *)arg))
633                 return -EFAULT;
634
635         /* Snapshots can't be marked read-write */
636         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
637                 return -EROFS;
638
639         /* Let blkdev_roset() handle it */
640         return -ENOTTY;
641 }
642
643 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
644                         unsigned int cmd, unsigned long arg)
645 {
646         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
647         int ret;
648
649         switch (cmd) {
650         case BLKROSET:
651                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
652                 break;
653         default:
654                 ret = -ENOTTY;
655         }
656
657         return ret;
658 }
659
660 #ifdef CONFIG_COMPAT
661 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
662                                 unsigned int cmd, unsigned long arg)
663 {
664         return rbd_ioctl(bdev, mode, cmd, arg);
665 }
666 #endif /* CONFIG_COMPAT */
667
668 static const struct block_device_operations rbd_bd_ops = {
669         .owner                  = THIS_MODULE,
670         .open                   = rbd_open,
671         .release                = rbd_release,
672         .ioctl                  = rbd_ioctl,
673 #ifdef CONFIG_COMPAT
674         .compat_ioctl           = rbd_compat_ioctl,
675 #endif
676 };
677
678 /*
679  * Initialize an rbd client instance.  Success or not, this function
680  * consumes ceph_opts.  Caller holds client_mutex.
681  */
682 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
683 {
684         struct rbd_client *rbdc;
685         int ret = -ENOMEM;
686
687         dout("%s:\n", __func__);
688         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
689         if (!rbdc)
690                 goto out_opt;
691
692         kref_init(&rbdc->kref);
693         INIT_LIST_HEAD(&rbdc->node);
694
695         rbdc->client = ceph_create_client(ceph_opts, rbdc);
696         if (IS_ERR(rbdc->client))
697                 goto out_rbdc;
698         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
699
700         ret = ceph_open_session(rbdc->client);
701         if (ret < 0)
702                 goto out_client;
703
704         spin_lock(&rbd_client_list_lock);
705         list_add_tail(&rbdc->node, &rbd_client_list);
706         spin_unlock(&rbd_client_list_lock);
707
708         dout("%s: rbdc %p\n", __func__, rbdc);
709
710         return rbdc;
711 out_client:
712         ceph_destroy_client(rbdc->client);
713 out_rbdc:
714         kfree(rbdc);
715 out_opt:
716         if (ceph_opts)
717                 ceph_destroy_options(ceph_opts);
718         dout("%s: error %d\n", __func__, ret);
719
720         return ERR_PTR(ret);
721 }
722
723 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
724 {
725         kref_get(&rbdc->kref);
726
727         return rbdc;
728 }
729
730 /*
731  * Find a ceph client with specific addr and configuration.  If
732  * found, bump its reference count.
733  */
734 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
735 {
736         struct rbd_client *client_node;
737         bool found = false;
738
739         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
740                 return NULL;
741
742         spin_lock(&rbd_client_list_lock);
743         list_for_each_entry(client_node, &rbd_client_list, node) {
744                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
745                         __rbd_get_client(client_node);
746
747                         found = true;
748                         break;
749                 }
750         }
751         spin_unlock(&rbd_client_list_lock);
752
753         return found ? client_node : NULL;
754 }
755
756 /*
757  * (Per device) rbd map options
758  */
759 enum {
760         Opt_queue_depth,
761         Opt_last_int,
762         /* int args above */
763         Opt_last_string,
764         /* string args above */
765         Opt_read_only,
766         Opt_read_write,
767         Opt_lock_on_read,
768         Opt_exclusive,
769         Opt_err
770 };
771
772 static match_table_t rbd_opts_tokens = {
773         {Opt_queue_depth, "queue_depth=%d"},
774         /* int args above */
775         /* string args above */
776         {Opt_read_only, "read_only"},
777         {Opt_read_only, "ro"},          /* Alternate spelling */
778         {Opt_read_write, "read_write"},
779         {Opt_read_write, "rw"},         /* Alternate spelling */
780         {Opt_lock_on_read, "lock_on_read"},
781         {Opt_exclusive, "exclusive"},
782         {Opt_err, NULL}
783 };
784
785 struct rbd_options {
786         int     queue_depth;
787         bool    read_only;
788         bool    lock_on_read;
789         bool    exclusive;
790 };
791
792 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
793 #define RBD_READ_ONLY_DEFAULT   false
794 #define RBD_LOCK_ON_READ_DEFAULT false
795 #define RBD_EXCLUSIVE_DEFAULT   false
796
797 static int parse_rbd_opts_token(char *c, void *private)
798 {
799         struct rbd_options *rbd_opts = private;
800         substring_t argstr[MAX_OPT_ARGS];
801         int token, intval, ret;
802
803         token = match_token(c, rbd_opts_tokens, argstr);
804         if (token < Opt_last_int) {
805                 ret = match_int(&argstr[0], &intval);
806                 if (ret < 0) {
807                         pr_err("bad mount option arg (not int) at '%s'\n", c);
808                         return ret;
809                 }
810                 dout("got int token %d val %d\n", token, intval);
811         } else if (token > Opt_last_int && token < Opt_last_string) {
812                 dout("got string token %d val %s\n", token, argstr[0].from);
813         } else {
814                 dout("got token %d\n", token);
815         }
816
817         switch (token) {
818         case Opt_queue_depth:
819                 if (intval < 1) {
820                         pr_err("queue_depth out of range\n");
821                         return -EINVAL;
822                 }
823                 rbd_opts->queue_depth = intval;
824                 break;
825         case Opt_read_only:
826                 rbd_opts->read_only = true;
827                 break;
828         case Opt_read_write:
829                 rbd_opts->read_only = false;
830                 break;
831         case Opt_lock_on_read:
832                 rbd_opts->lock_on_read = true;
833                 break;
834         case Opt_exclusive:
835                 rbd_opts->exclusive = true;
836                 break;
837         default:
838                 /* libceph prints "bad option" msg */
839                 return -EINVAL;
840         }
841
842         return 0;
843 }
844
845 static char* obj_op_name(enum obj_operation_type op_type)
846 {
847         switch (op_type) {
848         case OBJ_OP_READ:
849                 return "read";
850         case OBJ_OP_WRITE:
851                 return "write";
852         case OBJ_OP_DISCARD:
853                 return "discard";
854         default:
855                 return "???";
856         }
857 }
858
859 /*
860  * Get a ceph client with specific addr and configuration, if one does
861  * not exist create it.  Either way, ceph_opts is consumed by this
862  * function.
863  */
864 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
865 {
866         struct rbd_client *rbdc;
867
868         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
869         rbdc = rbd_client_find(ceph_opts);
870         if (rbdc)       /* using an existing client */
871                 ceph_destroy_options(ceph_opts);
872         else
873                 rbdc = rbd_client_create(ceph_opts);
874         mutex_unlock(&client_mutex);
875
876         return rbdc;
877 }
878
879 /*
880  * Destroy ceph client
881  *
882  * Caller must hold rbd_client_list_lock.
883  */
884 static void rbd_client_release(struct kref *kref)
885 {
886         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
887
888         dout("%s: rbdc %p\n", __func__, rbdc);
889         spin_lock(&rbd_client_list_lock);
890         list_del(&rbdc->node);
891         spin_unlock(&rbd_client_list_lock);
892
893         ceph_destroy_client(rbdc->client);
894         kfree(rbdc);
895 }
896
897 /*
898  * Drop reference to ceph client node. If it's not referenced anymore, release
899  * it.
900  */
901 static void rbd_put_client(struct rbd_client *rbdc)
902 {
903         if (rbdc)
904                 kref_put(&rbdc->kref, rbd_client_release);
905 }
906
907 static bool rbd_image_format_valid(u32 image_format)
908 {
909         return image_format == 1 || image_format == 2;
910 }
911
912 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
913 {
914         size_t size;
915         u32 snap_count;
916
917         /* The header has to start with the magic rbd header text */
918         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
919                 return false;
920
921         /* The bio layer requires at least sector-sized I/O */
922
923         if (ondisk->options.order < SECTOR_SHIFT)
924                 return false;
925
926         /* If we use u64 in a few spots we may be able to loosen this */
927
928         if (ondisk->options.order > 8 * sizeof (int) - 1)
929                 return false;
930
931         /*
932          * The size of a snapshot header has to fit in a size_t, and
933          * that limits the number of snapshots.
934          */
935         snap_count = le32_to_cpu(ondisk->snap_count);
936         size = SIZE_MAX - sizeof (struct ceph_snap_context);
937         if (snap_count > size / sizeof (__le64))
938                 return false;
939
940         /*
941          * Not only that, but the size of the entire the snapshot
942          * header must also be representable in a size_t.
943          */
944         size -= snap_count * sizeof (__le64);
945         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
946                 return false;
947
948         return true;
949 }
950
951 /*
952  * returns the size of an object in the image
953  */
954 static u32 rbd_obj_bytes(struct rbd_image_header *header)
955 {
956         return 1U << header->obj_order;
957 }
958
959 static void rbd_init_layout(struct rbd_device *rbd_dev)
960 {
961         if (rbd_dev->header.stripe_unit == 0 ||
962             rbd_dev->header.stripe_count == 0) {
963                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
964                 rbd_dev->header.stripe_count = 1;
965         }
966
967         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
968         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
969         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
970         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
971                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
972         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
973 }
974
975 /*
976  * Fill an rbd image header with information from the given format 1
977  * on-disk header.
978  */
979 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
980                                  struct rbd_image_header_ondisk *ondisk)
981 {
982         struct rbd_image_header *header = &rbd_dev->header;
983         bool first_time = header->object_prefix == NULL;
984         struct ceph_snap_context *snapc;
985         char *object_prefix = NULL;
986         char *snap_names = NULL;
987         u64 *snap_sizes = NULL;
988         u32 snap_count;
989         int ret = -ENOMEM;
990         u32 i;
991
992         /* Allocate this now to avoid having to handle failure below */
993
994         if (first_time) {
995                 object_prefix = kstrndup(ondisk->object_prefix,
996                                          sizeof(ondisk->object_prefix),
997                                          GFP_KERNEL);
998                 if (!object_prefix)
999                         return -ENOMEM;
1000         }
1001
1002         /* Allocate the snapshot context and fill it in */
1003
1004         snap_count = le32_to_cpu(ondisk->snap_count);
1005         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1006         if (!snapc)
1007                 goto out_err;
1008         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1009         if (snap_count) {
1010                 struct rbd_image_snap_ondisk *snaps;
1011                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1012
1013                 /* We'll keep a copy of the snapshot names... */
1014
1015                 if (snap_names_len > (u64)SIZE_MAX)
1016                         goto out_2big;
1017                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1018                 if (!snap_names)
1019                         goto out_err;
1020
1021                 /* ...as well as the array of their sizes. */
1022                 snap_sizes = kmalloc_array(snap_count,
1023                                            sizeof(*header->snap_sizes),
1024                                            GFP_KERNEL);
1025                 if (!snap_sizes)
1026                         goto out_err;
1027
1028                 /*
1029                  * Copy the names, and fill in each snapshot's id
1030                  * and size.
1031                  *
1032                  * Note that rbd_dev_v1_header_info() guarantees the
1033                  * ondisk buffer we're working with has
1034                  * snap_names_len bytes beyond the end of the
1035                  * snapshot id array, this memcpy() is safe.
1036                  */
1037                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1038                 snaps = ondisk->snaps;
1039                 for (i = 0; i < snap_count; i++) {
1040                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1041                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1042                 }
1043         }
1044
1045         /* We won't fail any more, fill in the header */
1046
1047         if (first_time) {
1048                 header->object_prefix = object_prefix;
1049                 header->obj_order = ondisk->options.order;
1050                 rbd_init_layout(rbd_dev);
1051         } else {
1052                 ceph_put_snap_context(header->snapc);
1053                 kfree(header->snap_names);
1054                 kfree(header->snap_sizes);
1055         }
1056
1057         /* The remaining fields always get updated (when we refresh) */
1058
1059         header->image_size = le64_to_cpu(ondisk->image_size);
1060         header->snapc = snapc;
1061         header->snap_names = snap_names;
1062         header->snap_sizes = snap_sizes;
1063
1064         return 0;
1065 out_2big:
1066         ret = -EIO;
1067 out_err:
1068         kfree(snap_sizes);
1069         kfree(snap_names);
1070         ceph_put_snap_context(snapc);
1071         kfree(object_prefix);
1072
1073         return ret;
1074 }
1075
1076 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1077 {
1078         const char *snap_name;
1079
1080         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1081
1082         /* Skip over names until we find the one we are looking for */
1083
1084         snap_name = rbd_dev->header.snap_names;
1085         while (which--)
1086                 snap_name += strlen(snap_name) + 1;
1087
1088         return kstrdup(snap_name, GFP_KERNEL);
1089 }
1090
1091 /*
1092  * Snapshot id comparison function for use with qsort()/bsearch().
1093  * Note that result is for snapshots in *descending* order.
1094  */
1095 static int snapid_compare_reverse(const void *s1, const void *s2)
1096 {
1097         u64 snap_id1 = *(u64 *)s1;
1098         u64 snap_id2 = *(u64 *)s2;
1099
1100         if (snap_id1 < snap_id2)
1101                 return 1;
1102         return snap_id1 == snap_id2 ? 0 : -1;
1103 }
1104
1105 /*
1106  * Search a snapshot context to see if the given snapshot id is
1107  * present.
1108  *
1109  * Returns the position of the snapshot id in the array if it's found,
1110  * or BAD_SNAP_INDEX otherwise.
1111  *
1112  * Note: The snapshot array is in kept sorted (by the osd) in
1113  * reverse order, highest snapshot id first.
1114  */
1115 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1116 {
1117         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1118         u64 *found;
1119
1120         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1121                                 sizeof (snap_id), snapid_compare_reverse);
1122
1123         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1124 }
1125
1126 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1127                                         u64 snap_id)
1128 {
1129         u32 which;
1130         const char *snap_name;
1131
1132         which = rbd_dev_snap_index(rbd_dev, snap_id);
1133         if (which == BAD_SNAP_INDEX)
1134                 return ERR_PTR(-ENOENT);
1135
1136         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1137         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1138 }
1139
1140 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1141 {
1142         if (snap_id == CEPH_NOSNAP)
1143                 return RBD_SNAP_HEAD_NAME;
1144
1145         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1146         if (rbd_dev->image_format == 1)
1147                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1148
1149         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1150 }
1151
1152 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1153                                 u64 *snap_size)
1154 {
1155         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1156         if (snap_id == CEPH_NOSNAP) {
1157                 *snap_size = rbd_dev->header.image_size;
1158         } else if (rbd_dev->image_format == 1) {
1159                 u32 which;
1160
1161                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1162                 if (which == BAD_SNAP_INDEX)
1163                         return -ENOENT;
1164
1165                 *snap_size = rbd_dev->header.snap_sizes[which];
1166         } else {
1167                 u64 size = 0;
1168                 int ret;
1169
1170                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1171                 if (ret)
1172                         return ret;
1173
1174                 *snap_size = size;
1175         }
1176         return 0;
1177 }
1178
1179 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1180                         u64 *snap_features)
1181 {
1182         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1183         if (snap_id == CEPH_NOSNAP) {
1184                 *snap_features = rbd_dev->header.features;
1185         } else if (rbd_dev->image_format == 1) {
1186                 *snap_features = 0;     /* No features for format 1 */
1187         } else {
1188                 u64 features = 0;
1189                 int ret;
1190
1191                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1192                 if (ret)
1193                         return ret;
1194
1195                 *snap_features = features;
1196         }
1197         return 0;
1198 }
1199
1200 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1201 {
1202         u64 snap_id = rbd_dev->spec->snap_id;
1203         u64 size = 0;
1204         u64 features = 0;
1205         int ret;
1206
1207         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1208         if (ret)
1209                 return ret;
1210         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1211         if (ret)
1212                 return ret;
1213
1214         rbd_dev->mapping.size = size;
1215         rbd_dev->mapping.features = features;
1216
1217         return 0;
1218 }
1219
1220 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1221 {
1222         rbd_dev->mapping.size = 0;
1223         rbd_dev->mapping.features = 0;
1224 }
1225
1226 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1227 {
1228         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1229
1230         return offset & (segment_size - 1);
1231 }
1232
1233 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1234                                 u64 offset, u64 length)
1235 {
1236         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1237
1238         offset &= segment_size - 1;
1239
1240         rbd_assert(length <= U64_MAX - offset);
1241         if (offset + length > segment_size)
1242                 length = segment_size - offset;
1243
1244         return length;
1245 }
1246
1247 /*
1248  * bio helpers
1249  */
1250
1251 static void bio_chain_put(struct bio *chain)
1252 {
1253         struct bio *tmp;
1254
1255         while (chain) {
1256                 tmp = chain;
1257                 chain = chain->bi_next;
1258                 bio_put(tmp);
1259         }
1260 }
1261
1262 /*
1263  * zeros a bio chain, starting at specific offset
1264  */
1265 static void zero_bio_chain(struct bio *chain, int start_ofs)
1266 {
1267         struct bio_vec bv;
1268         struct bvec_iter iter;
1269         unsigned long flags;
1270         void *buf;
1271         int pos = 0;
1272
1273         while (chain) {
1274                 bio_for_each_segment(bv, chain, iter) {
1275                         if (pos + bv.bv_len > start_ofs) {
1276                                 int remainder = max(start_ofs - pos, 0);
1277                                 buf = bvec_kmap_irq(&bv, &flags);
1278                                 memset(buf + remainder, 0,
1279                                        bv.bv_len - remainder);
1280                                 flush_dcache_page(bv.bv_page);
1281                                 bvec_kunmap_irq(buf, &flags);
1282                         }
1283                         pos += bv.bv_len;
1284                 }
1285
1286                 chain = chain->bi_next;
1287         }
1288 }
1289
1290 /*
1291  * similar to zero_bio_chain(), zeros data defined by a page array,
1292  * starting at the given byte offset from the start of the array and
1293  * continuing up to the given end offset.  The pages array is
1294  * assumed to be big enough to hold all bytes up to the end.
1295  */
1296 static void zero_pages(struct page **pages, u64 offset, u64 end)
1297 {
1298         struct page **page = &pages[offset >> PAGE_SHIFT];
1299
1300         rbd_assert(end > offset);
1301         rbd_assert(end - offset <= (u64)SIZE_MAX);
1302         while (offset < end) {
1303                 size_t page_offset;
1304                 size_t length;
1305                 unsigned long flags;
1306                 void *kaddr;
1307
1308                 page_offset = offset & ~PAGE_MASK;
1309                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1310                 local_irq_save(flags);
1311                 kaddr = kmap_atomic(*page);
1312                 memset(kaddr + page_offset, 0, length);
1313                 flush_dcache_page(*page);
1314                 kunmap_atomic(kaddr);
1315                 local_irq_restore(flags);
1316
1317                 offset += length;
1318                 page++;
1319         }
1320 }
1321
1322 /*
1323  * Clone a portion of a bio, starting at the given byte offset
1324  * and continuing for the number of bytes indicated.
1325  */
1326 static struct bio *bio_clone_range(struct bio *bio_src,
1327                                         unsigned int offset,
1328                                         unsigned int len,
1329                                         gfp_t gfpmask)
1330 {
1331         struct bio *bio;
1332
1333         bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
1334         if (!bio)
1335                 return NULL;    /* ENOMEM */
1336
1337         bio_advance(bio, offset);
1338         bio->bi_iter.bi_size = len;
1339
1340         return bio;
1341 }
1342
1343 /*
1344  * Clone a portion of a bio chain, starting at the given byte offset
1345  * into the first bio in the source chain and continuing for the
1346  * number of bytes indicated.  The result is another bio chain of
1347  * exactly the given length, or a null pointer on error.
1348  *
1349  * The bio_src and offset parameters are both in-out.  On entry they
1350  * refer to the first source bio and the offset into that bio where
1351  * the start of data to be cloned is located.
1352  *
1353  * On return, bio_src is updated to refer to the bio in the source
1354  * chain that contains first un-cloned byte, and *offset will
1355  * contain the offset of that byte within that bio.
1356  */
1357 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1358                                         unsigned int *offset,
1359                                         unsigned int len,
1360                                         gfp_t gfpmask)
1361 {
1362         struct bio *bi = *bio_src;
1363         unsigned int off = *offset;
1364         struct bio *chain = NULL;
1365         struct bio **end;
1366
1367         /* Build up a chain of clone bios up to the limit */
1368
1369         if (!bi || off >= bi->bi_iter.bi_size || !len)
1370                 return NULL;            /* Nothing to clone */
1371
1372         end = &chain;
1373         while (len) {
1374                 unsigned int bi_size;
1375                 struct bio *bio;
1376
1377                 if (!bi) {
1378                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1379                         goto out_err;   /* EINVAL; ran out of bio's */
1380                 }
1381                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1382                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1383                 if (!bio)
1384                         goto out_err;   /* ENOMEM */
1385
1386                 *end = bio;
1387                 end = &bio->bi_next;
1388
1389                 off += bi_size;
1390                 if (off == bi->bi_iter.bi_size) {
1391                         bi = bi->bi_next;
1392                         off = 0;
1393                 }
1394                 len -= bi_size;
1395         }
1396         *bio_src = bi;
1397         *offset = off;
1398
1399         return chain;
1400 out_err:
1401         bio_chain_put(chain);
1402
1403         return NULL;
1404 }
1405
1406 /*
1407  * The default/initial value for all object request flags is 0.  For
1408  * each flag, once its value is set to 1 it is never reset to 0
1409  * again.
1410  */
1411 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1412 {
1413         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1414                 struct rbd_device *rbd_dev;
1415
1416                 rbd_dev = obj_request->img_request->rbd_dev;
1417                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1418                         obj_request);
1419         }
1420 }
1421
1422 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1423 {
1424         smp_mb();
1425         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1426 }
1427
1428 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1429 {
1430         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1431                 struct rbd_device *rbd_dev = NULL;
1432
1433                 if (obj_request_img_data_test(obj_request))
1434                         rbd_dev = obj_request->img_request->rbd_dev;
1435                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1436                         obj_request);
1437         }
1438 }
1439
1440 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1441 {
1442         smp_mb();
1443         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1444 }
1445
1446 /*
1447  * This sets the KNOWN flag after (possibly) setting the EXISTS
1448  * flag.  The latter is set based on the "exists" value provided.
1449  *
1450  * Note that for our purposes once an object exists it never goes
1451  * away again.  It's possible that the response from two existence
1452  * checks are separated by the creation of the target object, and
1453  * the first ("doesn't exist") response arrives *after* the second
1454  * ("does exist").  In that case we ignore the second one.
1455  */
1456 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1457                                 bool exists)
1458 {
1459         if (exists)
1460                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1461         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1462         smp_mb();
1463 }
1464
1465 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1466 {
1467         smp_mb();
1468         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1469 }
1470
1471 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1472 {
1473         smp_mb();
1474         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1475 }
1476
1477 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1478 {
1479         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1480
1481         return obj_request->img_offset <
1482             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1483 }
1484
1485 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1486 {
1487         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1488                 kref_read(&obj_request->kref));
1489         kref_get(&obj_request->kref);
1490 }
1491
1492 static void rbd_obj_request_destroy(struct kref *kref);
1493 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1494 {
1495         rbd_assert(obj_request != NULL);
1496         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1497                 kref_read(&obj_request->kref));
1498         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1499 }
1500
1501 static void rbd_img_request_get(struct rbd_img_request *img_request)
1502 {
1503         dout("%s: img %p (was %d)\n", __func__, img_request,
1504              kref_read(&img_request->kref));
1505         kref_get(&img_request->kref);
1506 }
1507
1508 static bool img_request_child_test(struct rbd_img_request *img_request);
1509 static void rbd_parent_request_destroy(struct kref *kref);
1510 static void rbd_img_request_destroy(struct kref *kref);
1511 static void rbd_img_request_put(struct rbd_img_request *img_request)
1512 {
1513         rbd_assert(img_request != NULL);
1514         dout("%s: img %p (was %d)\n", __func__, img_request,
1515                 kref_read(&img_request->kref));
1516         if (img_request_child_test(img_request))
1517                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1518         else
1519                 kref_put(&img_request->kref, rbd_img_request_destroy);
1520 }
1521
1522 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1523                                         struct rbd_obj_request *obj_request)
1524 {
1525         rbd_assert(obj_request->img_request == NULL);
1526
1527         /* Image request now owns object's original reference */
1528         obj_request->img_request = img_request;
1529         obj_request->which = img_request->obj_request_count;
1530         rbd_assert(!obj_request_img_data_test(obj_request));
1531         obj_request_img_data_set(obj_request);
1532         rbd_assert(obj_request->which != BAD_WHICH);
1533         img_request->obj_request_count++;
1534         list_add_tail(&obj_request->links, &img_request->obj_requests);
1535         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1536                 obj_request->which);
1537 }
1538
1539 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1540                                         struct rbd_obj_request *obj_request)
1541 {
1542         rbd_assert(obj_request->which != BAD_WHICH);
1543
1544         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1545                 obj_request->which);
1546         list_del(&obj_request->links);
1547         rbd_assert(img_request->obj_request_count > 0);
1548         img_request->obj_request_count--;
1549         rbd_assert(obj_request->which == img_request->obj_request_count);
1550         obj_request->which = BAD_WHICH;
1551         rbd_assert(obj_request_img_data_test(obj_request));
1552         rbd_assert(obj_request->img_request == img_request);
1553         obj_request->img_request = NULL;
1554         obj_request->callback = NULL;
1555         rbd_obj_request_put(obj_request);
1556 }
1557
1558 static bool obj_request_type_valid(enum obj_request_type type)
1559 {
1560         switch (type) {
1561         case OBJ_REQUEST_NODATA:
1562         case OBJ_REQUEST_BIO:
1563         case OBJ_REQUEST_PAGES:
1564                 return true;
1565         default:
1566                 return false;
1567         }
1568 }
1569
1570 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1571
1572 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1573 {
1574         struct ceph_osd_request *osd_req = obj_request->osd_req;
1575
1576         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1577              obj_request, obj_request->object_no, obj_request->offset,
1578              obj_request->length, osd_req);
1579         if (obj_request_img_data_test(obj_request)) {
1580                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1581                 rbd_img_request_get(obj_request->img_request);
1582         }
1583         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1584 }
1585
1586 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1587 {
1588
1589         dout("%s: img %p\n", __func__, img_request);
1590
1591         /*
1592          * If no error occurred, compute the aggregate transfer
1593          * count for the image request.  We could instead use
1594          * atomic64_cmpxchg() to update it as each object request
1595          * completes; not clear which way is better off hand.
1596          */
1597         if (!img_request->result) {
1598                 struct rbd_obj_request *obj_request;
1599                 u64 xferred = 0;
1600
1601                 for_each_obj_request(img_request, obj_request)
1602                         xferred += obj_request->xferred;
1603                 img_request->xferred = xferred;
1604         }
1605
1606         if (img_request->callback)
1607                 img_request->callback(img_request);
1608         else
1609                 rbd_img_request_put(img_request);
1610 }
1611
1612 /*
1613  * The default/initial value for all image request flags is 0.  Each
1614  * is conditionally set to 1 at image request initialization time
1615  * and currently never change thereafter.
1616  */
1617 static void img_request_write_set(struct rbd_img_request *img_request)
1618 {
1619         set_bit(IMG_REQ_WRITE, &img_request->flags);
1620         smp_mb();
1621 }
1622
1623 static bool img_request_write_test(struct rbd_img_request *img_request)
1624 {
1625         smp_mb();
1626         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1627 }
1628
1629 /*
1630  * Set the discard flag when the img_request is an discard request
1631  */
1632 static void img_request_discard_set(struct rbd_img_request *img_request)
1633 {
1634         set_bit(IMG_REQ_DISCARD, &img_request->flags);
1635         smp_mb();
1636 }
1637
1638 static bool img_request_discard_test(struct rbd_img_request *img_request)
1639 {
1640         smp_mb();
1641         return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1642 }
1643
1644 static void img_request_child_set(struct rbd_img_request *img_request)
1645 {
1646         set_bit(IMG_REQ_CHILD, &img_request->flags);
1647         smp_mb();
1648 }
1649
1650 static void img_request_child_clear(struct rbd_img_request *img_request)
1651 {
1652         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1653         smp_mb();
1654 }
1655
1656 static bool img_request_child_test(struct rbd_img_request *img_request)
1657 {
1658         smp_mb();
1659         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1660 }
1661
1662 static void img_request_layered_set(struct rbd_img_request *img_request)
1663 {
1664         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1665         smp_mb();
1666 }
1667
1668 static void img_request_layered_clear(struct rbd_img_request *img_request)
1669 {
1670         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1671         smp_mb();
1672 }
1673
1674 static bool img_request_layered_test(struct rbd_img_request *img_request)
1675 {
1676         smp_mb();
1677         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1678 }
1679
1680 static enum obj_operation_type
1681 rbd_img_request_op_type(struct rbd_img_request *img_request)
1682 {
1683         if (img_request_write_test(img_request))
1684                 return OBJ_OP_WRITE;
1685         else if (img_request_discard_test(img_request))
1686                 return OBJ_OP_DISCARD;
1687         else
1688                 return OBJ_OP_READ;
1689 }
1690
1691 static void
1692 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1693 {
1694         u64 xferred = obj_request->xferred;
1695         u64 length = obj_request->length;
1696
1697         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1698                 obj_request, obj_request->img_request, obj_request->result,
1699                 xferred, length);
1700         /*
1701          * ENOENT means a hole in the image.  We zero-fill the entire
1702          * length of the request.  A short read also implies zero-fill
1703          * to the end of the request.  An error requires the whole
1704          * length of the request to be reported finished with an error
1705          * to the block layer.  In each case we update the xferred
1706          * count to indicate the whole request was satisfied.
1707          */
1708         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1709         if (obj_request->result == -ENOENT) {
1710                 if (obj_request->type == OBJ_REQUEST_BIO)
1711                         zero_bio_chain(obj_request->bio_list, 0);
1712                 else
1713                         zero_pages(obj_request->pages, 0, length);
1714                 obj_request->result = 0;
1715         } else if (xferred < length && !obj_request->result) {
1716                 if (obj_request->type == OBJ_REQUEST_BIO)
1717                         zero_bio_chain(obj_request->bio_list, xferred);
1718                 else
1719                         zero_pages(obj_request->pages, xferred, length);
1720         }
1721         obj_request->xferred = length;
1722         obj_request_done_set(obj_request);
1723 }
1724
1725 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1726 {
1727         dout("%s: obj %p cb %p\n", __func__, obj_request,
1728                 obj_request->callback);
1729         obj_request->callback(obj_request);
1730 }
1731
1732 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1733 {
1734         obj_request->result = err;
1735         obj_request->xferred = 0;
1736         /*
1737          * kludge - mirror rbd_obj_request_submit() to match a put in
1738          * rbd_img_obj_callback()
1739          */
1740         if (obj_request_img_data_test(obj_request)) {
1741                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1742                 rbd_img_request_get(obj_request->img_request);
1743         }
1744         obj_request_done_set(obj_request);
1745         rbd_obj_request_complete(obj_request);
1746 }
1747
1748 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1749 {
1750         struct rbd_img_request *img_request = NULL;
1751         struct rbd_device *rbd_dev = NULL;
1752         bool layered = false;
1753
1754         if (obj_request_img_data_test(obj_request)) {
1755                 img_request = obj_request->img_request;
1756                 layered = img_request && img_request_layered_test(img_request);
1757                 rbd_dev = img_request->rbd_dev;
1758         }
1759
1760         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1761                 obj_request, img_request, obj_request->result,
1762                 obj_request->xferred, obj_request->length);
1763         if (layered && obj_request->result == -ENOENT &&
1764                         obj_request->img_offset < rbd_dev->parent_overlap)
1765                 rbd_img_parent_read(obj_request);
1766         else if (img_request)
1767                 rbd_img_obj_request_read_callback(obj_request);
1768         else
1769                 obj_request_done_set(obj_request);
1770 }
1771
1772 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1773 {
1774         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1775                 obj_request->result, obj_request->length);
1776         /*
1777          * There is no such thing as a successful short write.  Set
1778          * it to our originally-requested length.
1779          */
1780         obj_request->xferred = obj_request->length;
1781         obj_request_done_set(obj_request);
1782 }
1783
1784 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1785 {
1786         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1787                 obj_request->result, obj_request->length);
1788         /*
1789          * There is no such thing as a successful short discard.  Set
1790          * it to our originally-requested length.
1791          */
1792         obj_request->xferred = obj_request->length;
1793         /* discarding a non-existent object is not a problem */
1794         if (obj_request->result == -ENOENT)
1795                 obj_request->result = 0;
1796         obj_request_done_set(obj_request);
1797 }
1798
1799 /*
1800  * For a simple stat call there's nothing to do.  We'll do more if
1801  * this is part of a write sequence for a layered image.
1802  */
1803 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1804 {
1805         dout("%s: obj %p\n", __func__, obj_request);
1806         obj_request_done_set(obj_request);
1807 }
1808
1809 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1810 {
1811         dout("%s: obj %p\n", __func__, obj_request);
1812
1813         if (obj_request_img_data_test(obj_request))
1814                 rbd_osd_copyup_callback(obj_request);
1815         else
1816                 obj_request_done_set(obj_request);
1817 }
1818
1819 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1820 {
1821         struct rbd_obj_request *obj_request = osd_req->r_priv;
1822         u16 opcode;
1823
1824         dout("%s: osd_req %p\n", __func__, osd_req);
1825         rbd_assert(osd_req == obj_request->osd_req);
1826         if (obj_request_img_data_test(obj_request)) {
1827                 rbd_assert(obj_request->img_request);
1828                 rbd_assert(obj_request->which != BAD_WHICH);
1829         } else {
1830                 rbd_assert(obj_request->which == BAD_WHICH);
1831         }
1832
1833         if (osd_req->r_result < 0)
1834                 obj_request->result = osd_req->r_result;
1835
1836         /*
1837          * We support a 64-bit length, but ultimately it has to be
1838          * passed to the block layer, which just supports a 32-bit
1839          * length field.
1840          */
1841         obj_request->xferred = osd_req->r_ops[0].outdata_len;
1842         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1843
1844         opcode = osd_req->r_ops[0].op;
1845         switch (opcode) {
1846         case CEPH_OSD_OP_READ:
1847                 rbd_osd_read_callback(obj_request);
1848                 break;
1849         case CEPH_OSD_OP_SETALLOCHINT:
1850                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1851                            osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1852                 /* fall through */
1853         case CEPH_OSD_OP_WRITE:
1854         case CEPH_OSD_OP_WRITEFULL:
1855                 rbd_osd_write_callback(obj_request);
1856                 break;
1857         case CEPH_OSD_OP_STAT:
1858                 rbd_osd_stat_callback(obj_request);
1859                 break;
1860         case CEPH_OSD_OP_DELETE:
1861         case CEPH_OSD_OP_TRUNCATE:
1862         case CEPH_OSD_OP_ZERO:
1863                 rbd_osd_discard_callback(obj_request);
1864                 break;
1865         case CEPH_OSD_OP_CALL:
1866                 rbd_osd_call_callback(obj_request);
1867                 break;
1868         default:
1869                 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1870                          obj_request->object_no, opcode);
1871                 break;
1872         }
1873
1874         if (obj_request_done_test(obj_request))
1875                 rbd_obj_request_complete(obj_request);
1876 }
1877
1878 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1879 {
1880         struct ceph_osd_request *osd_req = obj_request->osd_req;
1881
1882         rbd_assert(obj_request_img_data_test(obj_request));
1883         osd_req->r_snapid = obj_request->img_request->snap_id;
1884 }
1885
1886 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1887 {
1888         struct ceph_osd_request *osd_req = obj_request->osd_req;
1889
1890         ktime_get_real_ts(&osd_req->r_mtime);
1891         osd_req->r_data_offset = obj_request->offset;
1892 }
1893
1894 static struct ceph_osd_request *
1895 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1896                      struct ceph_snap_context *snapc,
1897                      int num_ops, unsigned int flags,
1898                      struct rbd_obj_request *obj_request)
1899 {
1900         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1901         struct ceph_osd_request *req;
1902         const char *name_format = rbd_dev->image_format == 1 ?
1903                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1904
1905         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1906         if (!req)
1907                 return NULL;
1908
1909         req->r_flags = flags;
1910         req->r_callback = rbd_osd_req_callback;
1911         req->r_priv = obj_request;
1912
1913         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1914         if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1915                         rbd_dev->header.object_prefix, obj_request->object_no))
1916                 goto err_req;
1917
1918         if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1919                 goto err_req;
1920
1921         return req;
1922
1923 err_req:
1924         ceph_osdc_put_request(req);
1925         return NULL;
1926 }
1927
1928 /*
1929  * Create an osd request.  A read request has one osd op (read).
1930  * A write request has either one (watch) or two (hint+write) osd ops.
1931  * (All rbd data writes are prefixed with an allocation hint op, but
1932  * technically osd watch is a write request, hence this distinction.)
1933  */
1934 static struct ceph_osd_request *rbd_osd_req_create(
1935                                         struct rbd_device *rbd_dev,
1936                                         enum obj_operation_type op_type,
1937                                         unsigned int num_ops,
1938                                         struct rbd_obj_request *obj_request)
1939 {
1940         struct ceph_snap_context *snapc = NULL;
1941
1942         if (obj_request_img_data_test(obj_request) &&
1943                 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1944                 struct rbd_img_request *img_request = obj_request->img_request;
1945                 if (op_type == OBJ_OP_WRITE) {
1946                         rbd_assert(img_request_write_test(img_request));
1947                 } else {
1948                         rbd_assert(img_request_discard_test(img_request));
1949                 }
1950                 snapc = img_request->snapc;
1951         }
1952
1953         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1954
1955         return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1956             (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1957             CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1958 }
1959
1960 /*
1961  * Create a copyup osd request based on the information in the object
1962  * request supplied.  A copyup request has two or three osd ops, a
1963  * copyup method call, potentially a hint op, and a write or truncate
1964  * or zero op.
1965  */
1966 static struct ceph_osd_request *
1967 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1968 {
1969         struct rbd_img_request *img_request;
1970         int num_osd_ops = 3;
1971
1972         rbd_assert(obj_request_img_data_test(obj_request));
1973         img_request = obj_request->img_request;
1974         rbd_assert(img_request);
1975         rbd_assert(img_request_write_test(img_request) ||
1976                         img_request_discard_test(img_request));
1977
1978         if (img_request_discard_test(img_request))
1979                 num_osd_ops = 2;
1980
1981         return __rbd_osd_req_create(img_request->rbd_dev,
1982                                     img_request->snapc, num_osd_ops,
1983                                     CEPH_OSD_FLAG_WRITE, obj_request);
1984 }
1985
1986 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1987 {
1988         ceph_osdc_put_request(osd_req);
1989 }
1990
1991 static struct rbd_obj_request *
1992 rbd_obj_request_create(enum obj_request_type type)
1993 {
1994         struct rbd_obj_request *obj_request;
1995
1996         rbd_assert(obj_request_type_valid(type));
1997
1998         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1999         if (!obj_request)
2000                 return NULL;
2001
2002         obj_request->which = BAD_WHICH;
2003         obj_request->type = type;
2004         INIT_LIST_HEAD(&obj_request->links);
2005         kref_init(&obj_request->kref);
2006
2007         dout("%s %p\n", __func__, obj_request);
2008         return obj_request;
2009 }
2010
2011 static void rbd_obj_request_destroy(struct kref *kref)
2012 {
2013         struct rbd_obj_request *obj_request;
2014
2015         obj_request = container_of(kref, struct rbd_obj_request, kref);
2016
2017         dout("%s: obj %p\n", __func__, obj_request);
2018
2019         rbd_assert(obj_request->img_request == NULL);
2020         rbd_assert(obj_request->which == BAD_WHICH);
2021
2022         if (obj_request->osd_req)
2023                 rbd_osd_req_destroy(obj_request->osd_req);
2024
2025         rbd_assert(obj_request_type_valid(obj_request->type));
2026         switch (obj_request->type) {
2027         case OBJ_REQUEST_NODATA:
2028                 break;          /* Nothing to do */
2029         case OBJ_REQUEST_BIO:
2030                 if (obj_request->bio_list)
2031                         bio_chain_put(obj_request->bio_list);
2032                 break;
2033         case OBJ_REQUEST_PAGES:
2034                 /* img_data requests don't own their page array */
2035                 if (obj_request->pages &&
2036                     !obj_request_img_data_test(obj_request))
2037                         ceph_release_page_vector(obj_request->pages,
2038                                                 obj_request->page_count);
2039                 break;
2040         }
2041
2042         kmem_cache_free(rbd_obj_request_cache, obj_request);
2043 }
2044
2045 /* It's OK to call this for a device with no parent */
2046
2047 static void rbd_spec_put(struct rbd_spec *spec);
2048 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2049 {
2050         rbd_dev_remove_parent(rbd_dev);
2051         rbd_spec_put(rbd_dev->parent_spec);
2052         rbd_dev->parent_spec = NULL;
2053         rbd_dev->parent_overlap = 0;
2054 }
2055
2056 /*
2057  * Parent image reference counting is used to determine when an
2058  * image's parent fields can be safely torn down--after there are no
2059  * more in-flight requests to the parent image.  When the last
2060  * reference is dropped, cleaning them up is safe.
2061  */
2062 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2063 {
2064         int counter;
2065
2066         if (!rbd_dev->parent_spec)
2067                 return;
2068
2069         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2070         if (counter > 0)
2071                 return;
2072
2073         /* Last reference; clean up parent data structures */
2074
2075         if (!counter)
2076                 rbd_dev_unparent(rbd_dev);
2077         else
2078                 rbd_warn(rbd_dev, "parent reference underflow");
2079 }
2080
2081 /*
2082  * If an image has a non-zero parent overlap, get a reference to its
2083  * parent.
2084  *
2085  * Returns true if the rbd device has a parent with a non-zero
2086  * overlap and a reference for it was successfully taken, or
2087  * false otherwise.
2088  */
2089 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2090 {
2091         int counter = 0;
2092
2093         if (!rbd_dev->parent_spec)
2094                 return false;
2095
2096         down_read(&rbd_dev->header_rwsem);
2097         if (rbd_dev->parent_overlap)
2098                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2099         up_read(&rbd_dev->header_rwsem);
2100
2101         if (counter < 0)
2102                 rbd_warn(rbd_dev, "parent reference overflow");
2103
2104         return counter > 0;
2105 }
2106
2107 /*
2108  * Caller is responsible for filling in the list of object requests
2109  * that comprises the image request, and the Linux request pointer
2110  * (if there is one).
2111  */
2112 static struct rbd_img_request *rbd_img_request_create(
2113                                         struct rbd_device *rbd_dev,
2114                                         u64 offset, u64 length,
2115                                         enum obj_operation_type op_type,
2116                                         struct ceph_snap_context *snapc)
2117 {
2118         struct rbd_img_request *img_request;
2119
2120         img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
2121         if (!img_request)
2122                 return NULL;
2123
2124         img_request->rbd_dev = rbd_dev;
2125         img_request->offset = offset;
2126         img_request->length = length;
2127         if (op_type == OBJ_OP_DISCARD) {
2128                 img_request_discard_set(img_request);
2129                 img_request->snapc = snapc;
2130         } else if (op_type == OBJ_OP_WRITE) {
2131                 img_request_write_set(img_request);
2132                 img_request->snapc = snapc;
2133         } else {
2134                 img_request->snap_id = rbd_dev->spec->snap_id;
2135         }
2136         if (rbd_dev_parent_get(rbd_dev))
2137                 img_request_layered_set(img_request);
2138
2139         spin_lock_init(&img_request->completion_lock);
2140         INIT_LIST_HEAD(&img_request->obj_requests);
2141         kref_init(&img_request->kref);
2142
2143         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2144                 obj_op_name(op_type), offset, length, img_request);
2145
2146         return img_request;
2147 }
2148
2149 static void rbd_img_request_destroy(struct kref *kref)
2150 {
2151         struct rbd_img_request *img_request;
2152         struct rbd_obj_request *obj_request;
2153         struct rbd_obj_request *next_obj_request;
2154
2155         img_request = container_of(kref, struct rbd_img_request, kref);
2156
2157         dout("%s: img %p\n", __func__, img_request);
2158
2159         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2160                 rbd_img_obj_request_del(img_request, obj_request);
2161         rbd_assert(img_request->obj_request_count == 0);
2162
2163         if (img_request_layered_test(img_request)) {
2164                 img_request_layered_clear(img_request);
2165                 rbd_dev_parent_put(img_request->rbd_dev);
2166         }
2167
2168         if (img_request_write_test(img_request) ||
2169                 img_request_discard_test(img_request))
2170                 ceph_put_snap_context(img_request->snapc);
2171
2172         kmem_cache_free(rbd_img_request_cache, img_request);
2173 }
2174
2175 static struct rbd_img_request *rbd_parent_request_create(
2176                                         struct rbd_obj_request *obj_request,
2177                                         u64 img_offset, u64 length)
2178 {
2179         struct rbd_img_request *parent_request;
2180         struct rbd_device *rbd_dev;
2181
2182         rbd_assert(obj_request->img_request);
2183         rbd_dev = obj_request->img_request->rbd_dev;
2184
2185         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2186                                                 length, OBJ_OP_READ, NULL);
2187         if (!parent_request)
2188                 return NULL;
2189
2190         img_request_child_set(parent_request);
2191         rbd_obj_request_get(obj_request);
2192         parent_request->obj_request = obj_request;
2193
2194         return parent_request;
2195 }
2196
2197 static void rbd_parent_request_destroy(struct kref *kref)
2198 {
2199         struct rbd_img_request *parent_request;
2200         struct rbd_obj_request *orig_request;
2201
2202         parent_request = container_of(kref, struct rbd_img_request, kref);
2203         orig_request = parent_request->obj_request;
2204
2205         parent_request->obj_request = NULL;
2206         rbd_obj_request_put(orig_request);
2207         img_request_child_clear(parent_request);
2208
2209         rbd_img_request_destroy(kref);
2210 }
2211
2212 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2213 {
2214         struct rbd_img_request *img_request;
2215         unsigned int xferred;
2216         int result;
2217         bool more;
2218
2219         rbd_assert(obj_request_img_data_test(obj_request));
2220         img_request = obj_request->img_request;
2221
2222         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2223         xferred = (unsigned int)obj_request->xferred;
2224         result = obj_request->result;
2225         if (result) {
2226                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2227                 enum obj_operation_type op_type;
2228
2229                 if (img_request_discard_test(img_request))
2230                         op_type = OBJ_OP_DISCARD;
2231                 else if (img_request_write_test(img_request))
2232                         op_type = OBJ_OP_WRITE;
2233                 else
2234                         op_type = OBJ_OP_READ;
2235
2236                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2237                         obj_op_name(op_type), obj_request->length,
2238                         obj_request->img_offset, obj_request->offset);
2239                 rbd_warn(rbd_dev, "  result %d xferred %x",
2240                         result, xferred);
2241                 if (!img_request->result)
2242                         img_request->result = result;
2243                 /*
2244                  * Need to end I/O on the entire obj_request worth of
2245                  * bytes in case of error.
2246                  */
2247                 xferred = obj_request->length;
2248         }
2249
2250         if (img_request_child_test(img_request)) {
2251                 rbd_assert(img_request->obj_request != NULL);
2252                 more = obj_request->which < img_request->obj_request_count - 1;
2253         } else {
2254                 blk_status_t status = errno_to_blk_status(result);
2255
2256                 rbd_assert(img_request->rq != NULL);
2257
2258                 more = blk_update_request(img_request->rq, status, xferred);
2259                 if (!more)
2260                         __blk_mq_end_request(img_request->rq, status);
2261         }
2262
2263         return more;
2264 }
2265
2266 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2267 {
2268         struct rbd_img_request *img_request;
2269         u32 which = obj_request->which;
2270         bool more = true;
2271
2272         rbd_assert(obj_request_img_data_test(obj_request));
2273         img_request = obj_request->img_request;
2274
2275         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2276         rbd_assert(img_request != NULL);
2277         rbd_assert(img_request->obj_request_count > 0);
2278         rbd_assert(which != BAD_WHICH);
2279         rbd_assert(which < img_request->obj_request_count);
2280
2281         spin_lock_irq(&img_request->completion_lock);
2282         if (which != img_request->next_completion)
2283                 goto out;
2284
2285         for_each_obj_request_from(img_request, obj_request) {
2286                 rbd_assert(more);
2287                 rbd_assert(which < img_request->obj_request_count);
2288
2289                 if (!obj_request_done_test(obj_request))
2290                         break;
2291                 more = rbd_img_obj_end_request(obj_request);
2292                 which++;
2293         }
2294
2295         rbd_assert(more ^ (which == img_request->obj_request_count));
2296         img_request->next_completion = which;
2297 out:
2298         spin_unlock_irq(&img_request->completion_lock);
2299         rbd_img_request_put(img_request);
2300
2301         if (!more)
2302                 rbd_img_request_complete(img_request);
2303 }
2304
2305 /*
2306  * Add individual osd ops to the given ceph_osd_request and prepare
2307  * them for submission. num_ops is the current number of
2308  * osd operations already to the object request.
2309  */
2310 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2311                                 struct ceph_osd_request *osd_request,
2312                                 enum obj_operation_type op_type,
2313                                 unsigned int num_ops)
2314 {
2315         struct rbd_img_request *img_request = obj_request->img_request;
2316         struct rbd_device *rbd_dev = img_request->rbd_dev;
2317         u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2318         u64 offset = obj_request->offset;
2319         u64 length = obj_request->length;
2320         u64 img_end;
2321         u16 opcode;
2322
2323         if (op_type == OBJ_OP_DISCARD) {
2324                 if (!offset && length == object_size &&
2325                     (!img_request_layered_test(img_request) ||
2326                      !obj_request_overlaps_parent(obj_request))) {
2327                         opcode = CEPH_OSD_OP_DELETE;
2328                 } else if ((offset + length == object_size)) {
2329                         opcode = CEPH_OSD_OP_TRUNCATE;
2330                 } else {
2331                         down_read(&rbd_dev->header_rwsem);
2332                         img_end = rbd_dev->header.image_size;
2333                         up_read(&rbd_dev->header_rwsem);
2334
2335                         if (obj_request->img_offset + length == img_end)
2336                                 opcode = CEPH_OSD_OP_TRUNCATE;
2337                         else
2338                                 opcode = CEPH_OSD_OP_ZERO;
2339                 }
2340         } else if (op_type == OBJ_OP_WRITE) {
2341                 if (!offset && length == object_size)
2342                         opcode = CEPH_OSD_OP_WRITEFULL;
2343                 else
2344                         opcode = CEPH_OSD_OP_WRITE;
2345                 osd_req_op_alloc_hint_init(osd_request, num_ops,
2346                                         object_size, object_size);
2347                 num_ops++;
2348         } else {
2349                 opcode = CEPH_OSD_OP_READ;
2350         }
2351
2352         if (opcode == CEPH_OSD_OP_DELETE)
2353                 osd_req_op_init(osd_request, num_ops, opcode, 0);
2354         else
2355                 osd_req_op_extent_init(osd_request, num_ops, opcode,
2356                                        offset, length, 0, 0);
2357
2358         if (obj_request->type == OBJ_REQUEST_BIO)
2359                 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2360                                         obj_request->bio_list, length);
2361         else if (obj_request->type == OBJ_REQUEST_PAGES)
2362                 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2363                                         obj_request->pages, length,
2364                                         offset & ~PAGE_MASK, false, false);
2365
2366         /* Discards are also writes */
2367         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2368                 rbd_osd_req_format_write(obj_request);
2369         else
2370                 rbd_osd_req_format_read(obj_request);
2371 }
2372
2373 /*
2374  * Split up an image request into one or more object requests, each
2375  * to a different object.  The "type" parameter indicates whether
2376  * "data_desc" is the pointer to the head of a list of bio
2377  * structures, or the base of a page array.  In either case this
2378  * function assumes data_desc describes memory sufficient to hold
2379  * all data described by the image request.
2380  */
2381 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2382                                         enum obj_request_type type,
2383                                         void *data_desc)
2384 {
2385         struct rbd_device *rbd_dev = img_request->rbd_dev;
2386         struct rbd_obj_request *obj_request = NULL;
2387         struct rbd_obj_request *next_obj_request;
2388         struct bio *bio_list = NULL;
2389         unsigned int bio_offset = 0;
2390         struct page **pages = NULL;
2391         enum obj_operation_type op_type;
2392         u64 img_offset;
2393         u64 resid;
2394
2395         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2396                 (int)type, data_desc);
2397
2398         img_offset = img_request->offset;
2399         resid = img_request->length;
2400         rbd_assert(resid > 0);
2401         op_type = rbd_img_request_op_type(img_request);
2402
2403         if (type == OBJ_REQUEST_BIO) {
2404                 bio_list = data_desc;
2405                 rbd_assert(img_offset ==
2406                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2407         } else if (type == OBJ_REQUEST_PAGES) {
2408                 pages = data_desc;
2409         }
2410
2411         while (resid) {
2412                 struct ceph_osd_request *osd_req;
2413                 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2414                 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2415                 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2416
2417                 obj_request = rbd_obj_request_create(type);
2418                 if (!obj_request)
2419                         goto out_unwind;
2420
2421                 obj_request->object_no = object_no;
2422                 obj_request->offset = offset;
2423                 obj_request->length = length;
2424
2425                 /*
2426                  * set obj_request->img_request before creating the
2427                  * osd_request so that it gets the right snapc
2428                  */
2429                 rbd_img_obj_request_add(img_request, obj_request);
2430
2431                 if (type == OBJ_REQUEST_BIO) {
2432                         unsigned int clone_size;
2433
2434                         rbd_assert(length <= (u64)UINT_MAX);
2435                         clone_size = (unsigned int)length;
2436                         obj_request->bio_list =
2437                                         bio_chain_clone_range(&bio_list,
2438                                                                 &bio_offset,
2439                                                                 clone_size,
2440                                                                 GFP_NOIO);
2441                         if (!obj_request->bio_list)
2442                                 goto out_unwind;
2443                 } else if (type == OBJ_REQUEST_PAGES) {
2444                         unsigned int page_count;
2445
2446                         obj_request->pages = pages;
2447                         page_count = (u32)calc_pages_for(offset, length);
2448                         obj_request->page_count = page_count;
2449                         if ((offset + length) & ~PAGE_MASK)
2450                                 page_count--;   /* more on last page */
2451                         pages += page_count;
2452                 }
2453
2454                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2455                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2456                                         obj_request);
2457                 if (!osd_req)
2458                         goto out_unwind;
2459
2460                 obj_request->osd_req = osd_req;
2461                 obj_request->callback = rbd_img_obj_callback;
2462                 obj_request->img_offset = img_offset;
2463
2464                 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2465
2466                 img_offset += length;
2467                 resid -= length;
2468         }
2469
2470         return 0;
2471
2472 out_unwind:
2473         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2474                 rbd_img_obj_request_del(img_request, obj_request);
2475
2476         return -ENOMEM;
2477 }
2478
2479 static void
2480 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2481 {
2482         struct rbd_img_request *img_request;
2483         struct rbd_device *rbd_dev;
2484         struct page **pages;
2485         u32 page_count;
2486
2487         dout("%s: obj %p\n", __func__, obj_request);
2488
2489         rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2490                 obj_request->type == OBJ_REQUEST_NODATA);
2491         rbd_assert(obj_request_img_data_test(obj_request));
2492         img_request = obj_request->img_request;
2493         rbd_assert(img_request);
2494
2495         rbd_dev = img_request->rbd_dev;
2496         rbd_assert(rbd_dev);
2497
2498         pages = obj_request->copyup_pages;
2499         rbd_assert(pages != NULL);
2500         obj_request->copyup_pages = NULL;
2501         page_count = obj_request->copyup_page_count;
2502         rbd_assert(page_count);
2503         obj_request->copyup_page_count = 0;
2504         ceph_release_page_vector(pages, page_count);
2505
2506         /*
2507          * We want the transfer count to reflect the size of the
2508          * original write request.  There is no such thing as a
2509          * successful short write, so if the request was successful
2510          * we can just set it to the originally-requested length.
2511          */
2512         if (!obj_request->result)
2513                 obj_request->xferred = obj_request->length;
2514
2515         obj_request_done_set(obj_request);
2516 }
2517
2518 static void
2519 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2520 {
2521         struct rbd_obj_request *orig_request;
2522         struct ceph_osd_request *osd_req;
2523         struct rbd_device *rbd_dev;
2524         struct page **pages;
2525         enum obj_operation_type op_type;
2526         u32 page_count;
2527         int img_result;
2528         u64 parent_length;
2529
2530         rbd_assert(img_request_child_test(img_request));
2531
2532         /* First get what we need from the image request */
2533
2534         pages = img_request->copyup_pages;
2535         rbd_assert(pages != NULL);
2536         img_request->copyup_pages = NULL;
2537         page_count = img_request->copyup_page_count;
2538         rbd_assert(page_count);
2539         img_request->copyup_page_count = 0;
2540
2541         orig_request = img_request->obj_request;
2542         rbd_assert(orig_request != NULL);
2543         rbd_assert(obj_request_type_valid(orig_request->type));
2544         img_result = img_request->result;
2545         parent_length = img_request->length;
2546         rbd_assert(img_result || parent_length == img_request->xferred);
2547         rbd_img_request_put(img_request);
2548
2549         rbd_assert(orig_request->img_request);
2550         rbd_dev = orig_request->img_request->rbd_dev;
2551         rbd_assert(rbd_dev);
2552
2553         /*
2554          * If the overlap has become 0 (most likely because the
2555          * image has been flattened) we need to free the pages
2556          * and re-submit the original write request.
2557          */
2558         if (!rbd_dev->parent_overlap) {
2559                 ceph_release_page_vector(pages, page_count);
2560                 rbd_obj_request_submit(orig_request);
2561                 return;
2562         }
2563
2564         if (img_result)
2565                 goto out_err;
2566
2567         /*
2568          * The original osd request is of no use to use any more.
2569          * We need a new one that can hold the three ops in a copyup
2570          * request.  Allocate the new copyup osd request for the
2571          * original request, and release the old one.
2572          */
2573         img_result = -ENOMEM;
2574         osd_req = rbd_osd_req_create_copyup(orig_request);
2575         if (!osd_req)
2576                 goto out_err;
2577         rbd_osd_req_destroy(orig_request->osd_req);
2578         orig_request->osd_req = osd_req;
2579         orig_request->copyup_pages = pages;
2580         orig_request->copyup_page_count = page_count;
2581
2582         /* Initialize the copyup op */
2583
2584         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2585         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2586                                                 false, false);
2587
2588         /* Add the other op(s) */
2589
2590         op_type = rbd_img_request_op_type(orig_request->img_request);
2591         rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2592
2593         /* All set, send it off. */
2594
2595         rbd_obj_request_submit(orig_request);
2596         return;
2597
2598 out_err:
2599         ceph_release_page_vector(pages, page_count);
2600         rbd_obj_request_error(orig_request, img_result);
2601 }
2602
2603 /*
2604  * Read from the parent image the range of data that covers the
2605  * entire target of the given object request.  This is used for
2606  * satisfying a layered image write request when the target of an
2607  * object request from the image request does not exist.
2608  *
2609  * A page array big enough to hold the returned data is allocated
2610  * and supplied to rbd_img_request_fill() as the "data descriptor."
2611  * When the read completes, this page array will be transferred to
2612  * the original object request for the copyup operation.
2613  *
2614  * If an error occurs, it is recorded as the result of the original
2615  * object request in rbd_img_obj_exists_callback().
2616  */
2617 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2618 {
2619         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2620         struct rbd_img_request *parent_request = NULL;
2621         u64 img_offset;
2622         u64 length;
2623         struct page **pages = NULL;
2624         u32 page_count;
2625         int result;
2626
2627         rbd_assert(rbd_dev->parent != NULL);
2628
2629         /*
2630          * Determine the byte range covered by the object in the
2631          * child image to which the original request was to be sent.
2632          */
2633         img_offset = obj_request->img_offset - obj_request->offset;
2634         length = rbd_obj_bytes(&rbd_dev->header);
2635
2636         /*
2637          * There is no defined parent data beyond the parent
2638          * overlap, so limit what we read at that boundary if
2639          * necessary.
2640          */
2641         if (img_offset + length > rbd_dev->parent_overlap) {
2642                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2643                 length = rbd_dev->parent_overlap - img_offset;
2644         }
2645
2646         /*
2647          * Allocate a page array big enough to receive the data read
2648          * from the parent.
2649          */
2650         page_count = (u32)calc_pages_for(0, length);
2651         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2652         if (IS_ERR(pages)) {
2653                 result = PTR_ERR(pages);
2654                 pages = NULL;
2655                 goto out_err;
2656         }
2657
2658         result = -ENOMEM;
2659         parent_request = rbd_parent_request_create(obj_request,
2660                                                 img_offset, length);
2661         if (!parent_request)
2662                 goto out_err;
2663
2664         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2665         if (result)
2666                 goto out_err;
2667
2668         parent_request->copyup_pages = pages;
2669         parent_request->copyup_page_count = page_count;
2670         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2671
2672         result = rbd_img_request_submit(parent_request);
2673         if (!result)
2674                 return 0;
2675
2676         parent_request->copyup_pages = NULL;
2677         parent_request->copyup_page_count = 0;
2678 out_err:
2679         if (pages)
2680                 ceph_release_page_vector(pages, page_count);
2681         if (parent_request)
2682                 rbd_img_request_put(parent_request);
2683         return result;
2684 }
2685
2686 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2687 {
2688         struct rbd_obj_request *orig_request;
2689         struct rbd_device *rbd_dev;
2690         int result;
2691
2692         rbd_assert(!obj_request_img_data_test(obj_request));
2693
2694         /*
2695          * All we need from the object request is the original
2696          * request and the result of the STAT op.  Grab those, then
2697          * we're done with the request.
2698          */
2699         orig_request = obj_request->obj_request;
2700         obj_request->obj_request = NULL;
2701         rbd_obj_request_put(orig_request);
2702         rbd_assert(orig_request);
2703         rbd_assert(orig_request->img_request);
2704
2705         result = obj_request->result;
2706         obj_request->result = 0;
2707
2708         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2709                 obj_request, orig_request, result,
2710                 obj_request->xferred, obj_request->length);
2711         rbd_obj_request_put(obj_request);
2712
2713         /*
2714          * If the overlap has become 0 (most likely because the
2715          * image has been flattened) we need to re-submit the
2716          * original request.
2717          */
2718         rbd_dev = orig_request->img_request->rbd_dev;
2719         if (!rbd_dev->parent_overlap) {
2720                 rbd_obj_request_submit(orig_request);
2721                 return;
2722         }
2723
2724         /*
2725          * Our only purpose here is to determine whether the object
2726          * exists, and we don't want to treat the non-existence as
2727          * an error.  If something else comes back, transfer the
2728          * error to the original request and complete it now.
2729          */
2730         if (!result) {
2731                 obj_request_existence_set(orig_request, true);
2732         } else if (result == -ENOENT) {
2733                 obj_request_existence_set(orig_request, false);
2734         } else {
2735                 goto fail_orig_request;
2736         }
2737
2738         /*
2739          * Resubmit the original request now that we have recorded
2740          * whether the target object exists.
2741          */
2742         result = rbd_img_obj_request_submit(orig_request);
2743         if (result)
2744                 goto fail_orig_request;
2745
2746         return;
2747
2748 fail_orig_request:
2749         rbd_obj_request_error(orig_request, result);
2750 }
2751
2752 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2753 {
2754         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2755         struct rbd_obj_request *stat_request;
2756         struct page **pages;
2757         u32 page_count;
2758         size_t size;
2759         int ret;
2760
2761         stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2762         if (!stat_request)
2763                 return -ENOMEM;
2764
2765         stat_request->object_no = obj_request->object_no;
2766
2767         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2768                                                    stat_request);
2769         if (!stat_request->osd_req) {
2770                 ret = -ENOMEM;
2771                 goto fail_stat_request;
2772         }
2773
2774         /*
2775          * The response data for a STAT call consists of:
2776          *     le64 length;
2777          *     struct {
2778          *         le32 tv_sec;
2779          *         le32 tv_nsec;
2780          *     } mtime;
2781          */
2782         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2783         page_count = (u32)calc_pages_for(0, size);
2784         pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
2785         if (IS_ERR(pages)) {
2786                 ret = PTR_ERR(pages);
2787                 goto fail_stat_request;
2788         }
2789
2790         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2791         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2792                                      false, false);
2793
2794         rbd_obj_request_get(obj_request);
2795         stat_request->obj_request = obj_request;
2796         stat_request->pages = pages;
2797         stat_request->page_count = page_count;
2798         stat_request->callback = rbd_img_obj_exists_callback;
2799
2800         rbd_obj_request_submit(stat_request);
2801         return 0;
2802
2803 fail_stat_request:
2804         rbd_obj_request_put(stat_request);
2805         return ret;
2806 }
2807
2808 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2809 {
2810         struct rbd_img_request *img_request = obj_request->img_request;
2811         struct rbd_device *rbd_dev = img_request->rbd_dev;
2812
2813         /* Reads */
2814         if (!img_request_write_test(img_request) &&
2815             !img_request_discard_test(img_request))
2816                 return true;
2817
2818         /* Non-layered writes */
2819         if (!img_request_layered_test(img_request))
2820                 return true;
2821
2822         /*
2823          * Layered writes outside of the parent overlap range don't
2824          * share any data with the parent.
2825          */
2826         if (!obj_request_overlaps_parent(obj_request))
2827                 return true;
2828
2829         /*
2830          * Entire-object layered writes - we will overwrite whatever
2831          * parent data there is anyway.
2832          */
2833         if (!obj_request->offset &&
2834             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2835                 return true;
2836
2837         /*
2838          * If the object is known to already exist, its parent data has
2839          * already been copied.
2840          */
2841         if (obj_request_known_test(obj_request) &&
2842             obj_request_exists_test(obj_request))
2843                 return true;
2844
2845         return false;
2846 }
2847
2848 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2849 {
2850         rbd_assert(obj_request_img_data_test(obj_request));
2851         rbd_assert(obj_request_type_valid(obj_request->type));
2852         rbd_assert(obj_request->img_request);
2853
2854         if (img_obj_request_simple(obj_request)) {
2855                 rbd_obj_request_submit(obj_request);
2856                 return 0;
2857         }
2858
2859         /*
2860          * It's a layered write.  The target object might exist but
2861          * we may not know that yet.  If we know it doesn't exist,
2862          * start by reading the data for the full target object from
2863          * the parent so we can use it for a copyup to the target.
2864          */
2865         if (obj_request_known_test(obj_request))
2866                 return rbd_img_obj_parent_read_full(obj_request);
2867
2868         /* We don't know whether the target exists.  Go find out. */
2869
2870         return rbd_img_obj_exists_submit(obj_request);
2871 }
2872
2873 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2874 {
2875         struct rbd_obj_request *obj_request;
2876         struct rbd_obj_request *next_obj_request;
2877         int ret = 0;
2878
2879         dout("%s: img %p\n", __func__, img_request);
2880
2881         rbd_img_request_get(img_request);
2882         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2883                 ret = rbd_img_obj_request_submit(obj_request);
2884                 if (ret)
2885                         goto out_put_ireq;
2886         }
2887
2888 out_put_ireq:
2889         rbd_img_request_put(img_request);
2890         return ret;
2891 }
2892
2893 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2894 {
2895         struct rbd_obj_request *obj_request;
2896         struct rbd_device *rbd_dev;
2897         u64 obj_end;
2898         u64 img_xferred;
2899         int img_result;
2900
2901         rbd_assert(img_request_child_test(img_request));
2902
2903         /* First get what we need from the image request and release it */
2904
2905         obj_request = img_request->obj_request;
2906         img_xferred = img_request->xferred;
2907         img_result = img_request->result;
2908         rbd_img_request_put(img_request);
2909
2910         /*
2911          * If the overlap has become 0 (most likely because the
2912          * image has been flattened) we need to re-submit the
2913          * original request.
2914          */
2915         rbd_assert(obj_request);
2916         rbd_assert(obj_request->img_request);
2917         rbd_dev = obj_request->img_request->rbd_dev;
2918         if (!rbd_dev->parent_overlap) {
2919                 rbd_obj_request_submit(obj_request);
2920                 return;
2921         }
2922
2923         obj_request->result = img_result;
2924         if (obj_request->result)
2925                 goto out;
2926
2927         /*
2928          * We need to zero anything beyond the parent overlap
2929          * boundary.  Since rbd_img_obj_request_read_callback()
2930          * will zero anything beyond the end of a short read, an
2931          * easy way to do this is to pretend the data from the
2932          * parent came up short--ending at the overlap boundary.
2933          */
2934         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2935         obj_end = obj_request->img_offset + obj_request->length;
2936         if (obj_end > rbd_dev->parent_overlap) {
2937                 u64 xferred = 0;
2938
2939                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2940                         xferred = rbd_dev->parent_overlap -
2941                                         obj_request->img_offset;
2942
2943                 obj_request->xferred = min(img_xferred, xferred);
2944         } else {
2945                 obj_request->xferred = img_xferred;
2946         }
2947 out:
2948         rbd_img_obj_request_read_callback(obj_request);
2949         rbd_obj_request_complete(obj_request);
2950 }
2951
2952 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2953 {
2954         struct rbd_img_request *img_request;
2955         int result;
2956
2957         rbd_assert(obj_request_img_data_test(obj_request));
2958         rbd_assert(obj_request->img_request != NULL);
2959         rbd_assert(obj_request->result == (s32) -ENOENT);
2960         rbd_assert(obj_request_type_valid(obj_request->type));
2961
2962         /* rbd_read_finish(obj_request, obj_request->length); */
2963         img_request = rbd_parent_request_create(obj_request,
2964                                                 obj_request->img_offset,
2965                                                 obj_request->length);
2966         result = -ENOMEM;
2967         if (!img_request)
2968                 goto out_err;
2969
2970         if (obj_request->type == OBJ_REQUEST_BIO)
2971                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2972                                                 obj_request->bio_list);
2973         else
2974                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2975                                                 obj_request->pages);
2976         if (result)
2977                 goto out_err;
2978
2979         img_request->callback = rbd_img_parent_read_callback;
2980         result = rbd_img_request_submit(img_request);
2981         if (result)
2982                 goto out_err;
2983
2984         return;
2985 out_err:
2986         if (img_request)
2987                 rbd_img_request_put(img_request);
2988         obj_request->result = result;
2989         obj_request->xferred = 0;
2990         obj_request_done_set(obj_request);
2991 }
2992
2993 static const struct rbd_client_id rbd_empty_cid;
2994
2995 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2996                           const struct rbd_client_id *rhs)
2997 {
2998         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2999 }
3000
3001 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3002 {
3003         struct rbd_client_id cid;
3004
3005         mutex_lock(&rbd_dev->watch_mutex);
3006         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3007         cid.handle = rbd_dev->watch_cookie;
3008         mutex_unlock(&rbd_dev->watch_mutex);
3009         return cid;
3010 }
3011
3012 /*
3013  * lock_rwsem must be held for write
3014  */
3015 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3016                               const struct rbd_client_id *cid)
3017 {
3018         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3019              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3020              cid->gid, cid->handle);
3021         rbd_dev->owner_cid = *cid; /* struct */
3022 }
3023
3024 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3025 {
3026         mutex_lock(&rbd_dev->watch_mutex);
3027         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3028         mutex_unlock(&rbd_dev->watch_mutex);
3029 }
3030
3031 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3032 {
3033         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3034
3035         strcpy(rbd_dev->lock_cookie, cookie);
3036         rbd_set_owner_cid(rbd_dev, &cid);
3037         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3038 }
3039
3040 /*
3041  * lock_rwsem must be held for write
3042  */
3043 static int rbd_lock(struct rbd_device *rbd_dev)
3044 {
3045         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3046         char cookie[32];
3047         int ret;
3048
3049         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3050                 rbd_dev->lock_cookie[0] != '\0');
3051
3052         format_lock_cookie(rbd_dev, cookie);
3053         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3054                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3055                             RBD_LOCK_TAG, "", 0);
3056         if (ret)
3057                 return ret;
3058
3059         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3060         __rbd_lock(rbd_dev, cookie);
3061         return 0;
3062 }
3063
3064 /*
3065  * lock_rwsem must be held for write
3066  */
3067 static void rbd_unlock(struct rbd_device *rbd_dev)
3068 {
3069         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3070         int ret;
3071
3072         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3073                 rbd_dev->lock_cookie[0] == '\0');
3074
3075         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3076                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3077         if (ret && ret != -ENOENT)
3078                 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
3079
3080         /* treat errors as the image is unlocked */
3081         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3082         rbd_dev->lock_cookie[0] = '\0';
3083         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3084         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3085 }
3086
3087 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3088                                 enum rbd_notify_op notify_op,
3089                                 struct page ***preply_pages,
3090                                 size_t *preply_len)
3091 {
3092         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3093         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3094         int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3095         char buf[buf_size];
3096         void *p = buf;
3097
3098         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3099
3100         /* encode *LockPayload NotifyMessage (op + ClientId) */
3101         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3102         ceph_encode_32(&p, notify_op);
3103         ceph_encode_64(&p, cid.gid);
3104         ceph_encode_64(&p, cid.handle);
3105
3106         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3107                                 &rbd_dev->header_oloc, buf, buf_size,
3108                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3109 }
3110
3111 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3112                                enum rbd_notify_op notify_op)
3113 {
3114         struct page **reply_pages;
3115         size_t reply_len;
3116
3117         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3118         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3119 }
3120
3121 static void rbd_notify_acquired_lock(struct work_struct *work)
3122 {
3123         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3124                                                   acquired_lock_work);
3125
3126         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3127 }
3128
3129 static void rbd_notify_released_lock(struct work_struct *work)
3130 {
3131         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3132                                                   released_lock_work);
3133
3134         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3135 }
3136
3137 static int rbd_request_lock(struct rbd_device *rbd_dev)
3138 {
3139         struct page **reply_pages;
3140         size_t reply_len;
3141         bool lock_owner_responded = false;
3142         int ret;
3143
3144         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3145
3146         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3147                                    &reply_pages, &reply_len);
3148         if (ret && ret != -ETIMEDOUT) {
3149                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3150                 goto out;
3151         }
3152
3153         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3154                 void *p = page_address(reply_pages[0]);
3155                 void *const end = p + reply_len;
3156                 u32 n;
3157
3158                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3159                 while (n--) {
3160                         u8 struct_v;
3161                         u32 len;
3162
3163                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3164                         p += 8 + 8; /* skip gid and cookie */
3165
3166                         ceph_decode_32_safe(&p, end, len, e_inval);
3167                         if (!len)
3168                                 continue;
3169
3170                         if (lock_owner_responded) {
3171                                 rbd_warn(rbd_dev,
3172                                          "duplicate lock owners detected");
3173                                 ret = -EIO;
3174                                 goto out;
3175                         }
3176
3177                         lock_owner_responded = true;
3178                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3179                                                   &struct_v, &len);
3180                         if (ret) {
3181                                 rbd_warn(rbd_dev,
3182                                          "failed to decode ResponseMessage: %d",
3183                                          ret);
3184                                 goto e_inval;
3185                         }
3186
3187                         ret = ceph_decode_32(&p);
3188                 }
3189         }
3190
3191         if (!lock_owner_responded) {
3192                 rbd_warn(rbd_dev, "no lock owners detected");
3193                 ret = -ETIMEDOUT;
3194         }
3195
3196 out:
3197         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3198         return ret;
3199
3200 e_inval:
3201         ret = -EINVAL;
3202         goto out;
3203 }
3204
3205 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3206 {
3207         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3208
3209         cancel_delayed_work(&rbd_dev->lock_dwork);
3210         if (wake_all)
3211                 wake_up_all(&rbd_dev->lock_waitq);
3212         else
3213                 wake_up(&rbd_dev->lock_waitq);
3214 }
3215
3216 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3217                                struct ceph_locker **lockers, u32 *num_lockers)
3218 {
3219         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3220         u8 lock_type;
3221         char *lock_tag;
3222         int ret;
3223
3224         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3225
3226         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3227                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3228                                  &lock_type, &lock_tag, lockers, num_lockers);
3229         if (ret)
3230                 return ret;
3231
3232         if (*num_lockers == 0) {
3233                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3234                 goto out;
3235         }
3236
3237         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3238                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3239                          lock_tag);
3240                 ret = -EBUSY;
3241                 goto out;
3242         }
3243
3244         if (lock_type == CEPH_CLS_LOCK_SHARED) {
3245                 rbd_warn(rbd_dev, "shared lock type detected");
3246                 ret = -EBUSY;
3247                 goto out;
3248         }
3249
3250         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3251                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
3252                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3253                          (*lockers)[0].id.cookie);
3254                 ret = -EBUSY;
3255                 goto out;
3256         }
3257
3258 out:
3259         kfree(lock_tag);
3260         return ret;
3261 }
3262
3263 static int find_watcher(struct rbd_device *rbd_dev,
3264                         const struct ceph_locker *locker)
3265 {
3266         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3267         struct ceph_watch_item *watchers;
3268         u32 num_watchers;
3269         u64 cookie;
3270         int i;
3271         int ret;
3272
3273         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3274                                       &rbd_dev->header_oloc, &watchers,
3275                                       &num_watchers);
3276         if (ret)
3277                 return ret;
3278
3279         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3280         for (i = 0; i < num_watchers; i++) {
3281                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3282                             sizeof(locker->info.addr)) &&
3283                     watchers[i].cookie == cookie) {
3284                         struct rbd_client_id cid = {
3285                                 .gid = le64_to_cpu(watchers[i].name.num),
3286                                 .handle = cookie,
3287                         };
3288
3289                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3290                              rbd_dev, cid.gid, cid.handle);
3291                         rbd_set_owner_cid(rbd_dev, &cid);
3292                         ret = 1;
3293                         goto out;
3294                 }
3295         }
3296
3297         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3298         ret = 0;
3299 out:
3300         kfree(watchers);
3301         return ret;
3302 }
3303
3304 /*
3305  * lock_rwsem must be held for write
3306  */
3307 static int rbd_try_lock(struct rbd_device *rbd_dev)
3308 {
3309         struct ceph_client *client = rbd_dev->rbd_client->client;
3310         struct ceph_locker *lockers;
3311         u32 num_lockers;
3312         int ret;
3313
3314         for (;;) {
3315                 ret = rbd_lock(rbd_dev);
3316                 if (ret != -EBUSY)
3317                         return ret;
3318
3319                 /* determine if the current lock holder is still alive */
3320                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3321                 if (ret)
3322                         return ret;
3323
3324                 if (num_lockers == 0)
3325                         goto again;
3326
3327                 ret = find_watcher(rbd_dev, lockers);
3328                 if (ret) {
3329                         if (ret > 0)
3330                                 ret = 0; /* have to request lock */
3331                         goto out;
3332                 }
3333
3334                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3335                          ENTITY_NAME(lockers[0].id.name));
3336
3337                 ret = ceph_monc_blacklist_add(&client->monc,
3338                                               &lockers[0].info.addr);
3339                 if (ret) {
3340                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3341                                  ENTITY_NAME(lockers[0].id.name), ret);
3342                         goto out;
3343                 }
3344
3345                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3346                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
3347                                           lockers[0].id.cookie,
3348                                           &lockers[0].id.name);
3349                 if (ret && ret != -ENOENT)
3350                         goto out;
3351
3352 again:
3353                 ceph_free_lockers(lockers, num_lockers);
3354         }
3355
3356 out:
3357         ceph_free_lockers(lockers, num_lockers);
3358         return ret;
3359 }
3360
3361 /*
3362  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3363  */
3364 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,