Merge remote-tracking branches 'asoc/topic/rockchip', 'asoc/topic/rt5514', 'asoc...
[sfrench/cifs-2.6.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/decode.h>
36 #include <linux/parser.h>
37 #include <linux/bsearch.h>
38
39 #include <linux/kernel.h>
40 #include <linux/device.h>
41 #include <linux/module.h>
42 #include <linux/blk-mq.h>
43 #include <linux/fs.h>
44 #include <linux/blkdev.h>
45 #include <linux/slab.h>
46 #include <linux/idr.h>
47 #include <linux/workqueue.h>
48
49 #include "rbd_types.h"
50
51 #define RBD_DEBUG       /* Activate rbd_assert() calls */
52
53 /*
54  * The basic unit of block I/O is a sector.  It is interpreted in a
55  * number of contexts in Linux (blk, bio, genhd), but the default is
56  * universally 512 bytes.  These symbols are just slightly more
57  * meaningful than the bare numbers they represent.
58  */
59 #define SECTOR_SHIFT    9
60 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
61
62 /*
63  * Increment the given counter and return its updated value.
64  * If the counter is already 0 it will not be incremented.
65  * If the counter is already at its maximum value returns
66  * -EINVAL without updating it.
67  */
68 static int atomic_inc_return_safe(atomic_t *v)
69 {
70         unsigned int counter;
71
72         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
73         if (counter <= (unsigned int)INT_MAX)
74                 return (int)counter;
75
76         atomic_dec(v);
77
78         return -EINVAL;
79 }
80
81 /* Decrement the counter.  Return the resulting value, or -EINVAL */
82 static int atomic_dec_return_safe(atomic_t *v)
83 {
84         int counter;
85
86         counter = atomic_dec_return(v);
87         if (counter >= 0)
88                 return counter;
89
90         atomic_inc(v);
91
92         return -EINVAL;
93 }
94
95 #define RBD_DRV_NAME "rbd"
96
97 #define RBD_MINORS_PER_MAJOR            256
98 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
99
100 #define RBD_MAX_PARENT_CHAIN_LEN        16
101
102 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
103 #define RBD_MAX_SNAP_NAME_LEN   \
104                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
105
106 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
107
108 #define RBD_SNAP_HEAD_NAME      "-"
109
110 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
111
112 /* This allows a single page to hold an image name sent by OSD */
113 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
114 #define RBD_IMAGE_ID_LEN_MAX    64
115
116 #define RBD_OBJ_PREFIX_LEN_MAX  64
117
118 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
119 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
120
121 /* Feature bits */
122
123 #define RBD_FEATURE_LAYERING            (1ULL<<0)
124 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
125 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
126 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
127
128 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
129                                  RBD_FEATURE_STRIPINGV2 |       \
130                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
131                                  RBD_FEATURE_DATA_POOL)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
136
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN            32
142
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147         /* These six fields never change for a given rbd image */
148         char *object_prefix;
149         __u8 obj_order;
150         u64 stripe_unit;
151         u64 stripe_count;
152         s64 data_pool_id;
153         u64 features;           /* Might be changeable someday? */
154
155         /* The remaining fields need to be updated occasionally */
156         u64 image_size;
157         struct ceph_snap_context *snapc;
158         char *snap_names;       /* format 1 only */
159         u64 *snap_sizes;        /* format 1 only */
160 };
161
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188         u64             pool_id;
189         const char      *pool_name;
190
191         const char      *image_id;
192         const char      *image_name;
193
194         u64             snap_id;
195         const char      *snap_name;
196
197         struct kref     kref;
198 };
199
200 /*
201  * an instance of the client.  multiple devices may share an rbd client.
202  */
203 struct rbd_client {
204         struct ceph_client      *client;
205         struct kref             kref;
206         struct list_head        node;
207 };
208
209 struct rbd_img_request;
210 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
211
212 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
213
214 struct rbd_obj_request;
215 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
216
217 enum obj_request_type {
218         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
219 };
220
221 enum obj_operation_type {
222         OBJ_OP_WRITE,
223         OBJ_OP_READ,
224         OBJ_OP_DISCARD,
225 };
226
227 enum obj_req_flags {
228         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
229         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
230         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
231         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
232 };
233
234 struct rbd_obj_request {
235         u64                     object_no;
236         u64                     offset;         /* object start byte */
237         u64                     length;         /* bytes from offset */
238         unsigned long           flags;
239
240         /*
241          * An object request associated with an image will have its
242          * img_data flag set; a standalone object request will not.
243          *
244          * A standalone object request will have which == BAD_WHICH
245          * and a null obj_request pointer.
246          *
247          * An object request initiated in support of a layered image
248          * object (to check for its existence before a write) will
249          * have which == BAD_WHICH and a non-null obj_request pointer.
250          *
251          * Finally, an object request for rbd image data will have
252          * which != BAD_WHICH, and will have a non-null img_request
253          * pointer.  The value of which will be in the range
254          * 0..(img_request->obj_request_count-1).
255          */
256         union {
257                 struct rbd_obj_request  *obj_request;   /* STAT op */
258                 struct {
259                         struct rbd_img_request  *img_request;
260                         u64                     img_offset;
261                         /* links for img_request->obj_requests list */
262                         struct list_head        links;
263                 };
264         };
265         u32                     which;          /* posn image request list */
266
267         enum obj_request_type   type;
268         union {
269                 struct bio      *bio_list;
270                 struct {
271                         struct page     **pages;
272                         u32             page_count;
273                 };
274         };
275         struct page             **copyup_pages;
276         u32                     copyup_page_count;
277
278         struct ceph_osd_request *osd_req;
279
280         u64                     xferred;        /* bytes transferred */
281         int                     result;
282
283         rbd_obj_callback_t      callback;
284         struct completion       completion;
285
286         struct kref             kref;
287 };
288
289 enum img_req_flags {
290         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
291         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
292         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
293         IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
294 };
295
296 struct rbd_img_request {
297         struct rbd_device       *rbd_dev;
298         u64                     offset; /* starting image byte offset */
299         u64                     length; /* byte count from offset */
300         unsigned long           flags;
301         union {
302                 u64                     snap_id;        /* for reads */
303                 struct ceph_snap_context *snapc;        /* for writes */
304         };
305         union {
306                 struct request          *rq;            /* block request */
307                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
308         };
309         struct page             **copyup_pages;
310         u32                     copyup_page_count;
311         spinlock_t              completion_lock;/* protects next_completion */
312         u32                     next_completion;
313         rbd_img_callback_t      callback;
314         u64                     xferred;/* aggregate bytes transferred */
315         int                     result; /* first nonzero obj_request result */
316
317         u32                     obj_request_count;
318         struct list_head        obj_requests;   /* rbd_obj_request structs */
319
320         struct kref             kref;
321 };
322
323 #define for_each_obj_request(ireq, oreq) \
324         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
325 #define for_each_obj_request_from(ireq, oreq) \
326         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
327 #define for_each_obj_request_safe(ireq, oreq, n) \
328         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
329
330 enum rbd_watch_state {
331         RBD_WATCH_STATE_UNREGISTERED,
332         RBD_WATCH_STATE_REGISTERED,
333         RBD_WATCH_STATE_ERROR,
334 };
335
336 enum rbd_lock_state {
337         RBD_LOCK_STATE_UNLOCKED,
338         RBD_LOCK_STATE_LOCKED,
339         RBD_LOCK_STATE_RELEASING,
340 };
341
342 /* WatchNotify::ClientId */
343 struct rbd_client_id {
344         u64 gid;
345         u64 handle;
346 };
347
348 struct rbd_mapping {
349         u64                     size;
350         u64                     features;
351         bool                    read_only;
352 };
353
354 /*
355  * a single device
356  */
357 struct rbd_device {
358         int                     dev_id;         /* blkdev unique id */
359
360         int                     major;          /* blkdev assigned major */
361         int                     minor;
362         struct gendisk          *disk;          /* blkdev's gendisk and rq */
363
364         u32                     image_format;   /* Either 1 or 2 */
365         struct rbd_client       *rbd_client;
366
367         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
368
369         spinlock_t              lock;           /* queue, flags, open_count */
370
371         struct rbd_image_header header;
372         unsigned long           flags;          /* possibly lock protected */
373         struct rbd_spec         *spec;
374         struct rbd_options      *opts;
375         char                    *config_info;   /* add{,_single_major} string */
376
377         struct ceph_object_id   header_oid;
378         struct ceph_object_locator header_oloc;
379
380         struct ceph_file_layout layout;         /* used for all rbd requests */
381
382         struct mutex            watch_mutex;
383         enum rbd_watch_state    watch_state;
384         struct ceph_osd_linger_request *watch_handle;
385         u64                     watch_cookie;
386         struct delayed_work     watch_dwork;
387
388         struct rw_semaphore     lock_rwsem;
389         enum rbd_lock_state     lock_state;
390         struct rbd_client_id    owner_cid;
391         struct work_struct      acquired_lock_work;
392         struct work_struct      released_lock_work;
393         struct delayed_work     lock_dwork;
394         struct work_struct      unlock_work;
395         wait_queue_head_t       lock_waitq;
396
397         struct workqueue_struct *task_wq;
398
399         struct rbd_spec         *parent_spec;
400         u64                     parent_overlap;
401         atomic_t                parent_ref;
402         struct rbd_device       *parent;
403
404         /* Block layer tags. */
405         struct blk_mq_tag_set   tag_set;
406
407         /* protects updating the header */
408         struct rw_semaphore     header_rwsem;
409
410         struct rbd_mapping      mapping;
411
412         struct list_head        node;
413
414         /* sysfs related */
415         struct device           dev;
416         unsigned long           open_count;     /* protected by lock */
417 };
418
419 /*
420  * Flag bits for rbd_dev->flags:
421  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
422  *   by rbd_dev->lock
423  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
424  */
425 enum rbd_dev_flags {
426         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
427         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
428         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
429 };
430
431 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
432
433 static LIST_HEAD(rbd_dev_list);    /* devices */
434 static DEFINE_SPINLOCK(rbd_dev_list_lock);
435
436 static LIST_HEAD(rbd_client_list);              /* clients */
437 static DEFINE_SPINLOCK(rbd_client_list_lock);
438
439 /* Slab caches for frequently-allocated structures */
440
441 static struct kmem_cache        *rbd_img_request_cache;
442 static struct kmem_cache        *rbd_obj_request_cache;
443
444 static int rbd_major;
445 static DEFINE_IDA(rbd_dev_id_ida);
446
447 static struct workqueue_struct *rbd_wq;
448
449 /*
450  * Default to false for now, as single-major requires >= 0.75 version of
451  * userspace rbd utility.
452  */
453 static bool single_major = false;
454 module_param(single_major, bool, S_IRUGO);
455 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
456
457 static int rbd_img_request_submit(struct rbd_img_request *img_request);
458
459 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
460                        size_t count);
461 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
462                           size_t count);
463 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
464                                     size_t count);
465 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
466                                        size_t count);
467 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
468 static void rbd_spec_put(struct rbd_spec *spec);
469
470 static int rbd_dev_id_to_minor(int dev_id)
471 {
472         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
473 }
474
475 static int minor_to_rbd_dev_id(int minor)
476 {
477         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
478 }
479
480 static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
481 {
482         return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
483                rbd_dev->spec->snap_id == CEPH_NOSNAP &&
484                !rbd_dev->mapping.read_only;
485 }
486
487 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
488 {
489         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
490                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
491 }
492
493 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
494 {
495         bool is_lock_owner;
496
497         down_read(&rbd_dev->lock_rwsem);
498         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
499         up_read(&rbd_dev->lock_rwsem);
500         return is_lock_owner;
501 }
502
503 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
504 {
505         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
506 }
507
508 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
509 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
510 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
511 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
512 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
513
514 static struct attribute *rbd_bus_attrs[] = {
515         &bus_attr_add.attr,
516         &bus_attr_remove.attr,
517         &bus_attr_add_single_major.attr,
518         &bus_attr_remove_single_major.attr,
519         &bus_attr_supported_features.attr,
520         NULL,
521 };
522
523 static umode_t rbd_bus_is_visible(struct kobject *kobj,
524                                   struct attribute *attr, int index)
525 {
526         if (!single_major &&
527             (attr == &bus_attr_add_single_major.attr ||
528              attr == &bus_attr_remove_single_major.attr))
529                 return 0;
530
531         return attr->mode;
532 }
533
534 static const struct attribute_group rbd_bus_group = {
535         .attrs = rbd_bus_attrs,
536         .is_visible = rbd_bus_is_visible,
537 };
538 __ATTRIBUTE_GROUPS(rbd_bus);
539
540 static struct bus_type rbd_bus_type = {
541         .name           = "rbd",
542         .bus_groups     = rbd_bus_groups,
543 };
544
545 static void rbd_root_dev_release(struct device *dev)
546 {
547 }
548
549 static struct device rbd_root_dev = {
550         .init_name =    "rbd",
551         .release =      rbd_root_dev_release,
552 };
553
554 static __printf(2, 3)
555 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
556 {
557         struct va_format vaf;
558         va_list args;
559
560         va_start(args, fmt);
561         vaf.fmt = fmt;
562         vaf.va = &args;
563
564         if (!rbd_dev)
565                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
566         else if (rbd_dev->disk)
567                 printk(KERN_WARNING "%s: %s: %pV\n",
568                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
569         else if (rbd_dev->spec && rbd_dev->spec->image_name)
570                 printk(KERN_WARNING "%s: image %s: %pV\n",
571                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
572         else if (rbd_dev->spec && rbd_dev->spec->image_id)
573                 printk(KERN_WARNING "%s: id %s: %pV\n",
574                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
575         else    /* punt */
576                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
577                         RBD_DRV_NAME, rbd_dev, &vaf);
578         va_end(args);
579 }
580
581 #ifdef RBD_DEBUG
582 #define rbd_assert(expr)                                                \
583                 if (unlikely(!(expr))) {                                \
584                         printk(KERN_ERR "\nAssertion failure in %s() "  \
585                                                 "at line %d:\n\n"       \
586                                         "\trbd_assert(%s);\n\n",        \
587                                         __func__, __LINE__, #expr);     \
588                         BUG();                                          \
589                 }
590 #else /* !RBD_DEBUG */
591 #  define rbd_assert(expr)      ((void) 0)
592 #endif /* !RBD_DEBUG */
593
594 static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
595 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
596 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
597 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
598
599 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
600 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
601 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
602 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
603 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
604                                         u64 snap_id);
605 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
606                                 u8 *order, u64 *snap_size);
607 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
608                 u64 *snap_features);
609
610 static int rbd_open(struct block_device *bdev, fmode_t mode)
611 {
612         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
613         bool removing = false;
614
615         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
616                 return -EROFS;
617
618         spin_lock_irq(&rbd_dev->lock);
619         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
620                 removing = true;
621         else
622                 rbd_dev->open_count++;
623         spin_unlock_irq(&rbd_dev->lock);
624         if (removing)
625                 return -ENOENT;
626
627         (void) get_device(&rbd_dev->dev);
628
629         return 0;
630 }
631
632 static void rbd_release(struct gendisk *disk, fmode_t mode)
633 {
634         struct rbd_device *rbd_dev = disk->private_data;
635         unsigned long open_count_before;
636
637         spin_lock_irq(&rbd_dev->lock);
638         open_count_before = rbd_dev->open_count--;
639         spin_unlock_irq(&rbd_dev->lock);
640         rbd_assert(open_count_before > 0);
641
642         put_device(&rbd_dev->dev);
643 }
644
645 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
646 {
647         int ret = 0;
648         int val;
649         bool ro;
650         bool ro_changed = false;
651
652         /* get_user() may sleep, so call it before taking rbd_dev->lock */
653         if (get_user(val, (int __user *)(arg)))
654                 return -EFAULT;
655
656         ro = val ? true : false;
657         /* Snapshot doesn't allow to write*/
658         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
659                 return -EROFS;
660
661         spin_lock_irq(&rbd_dev->lock);
662         /* prevent others open this device */
663         if (rbd_dev->open_count > 1) {
664                 ret = -EBUSY;
665                 goto out;
666         }
667
668         if (rbd_dev->mapping.read_only != ro) {
669                 rbd_dev->mapping.read_only = ro;
670                 ro_changed = true;
671         }
672
673 out:
674         spin_unlock_irq(&rbd_dev->lock);
675         /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
676         if (ret == 0 && ro_changed)
677                 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
678
679         return ret;
680 }
681
682 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
683                         unsigned int cmd, unsigned long arg)
684 {
685         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
686         int ret = 0;
687
688         switch (cmd) {
689         case BLKROSET:
690                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
691                 break;
692         default:
693                 ret = -ENOTTY;
694         }
695
696         return ret;
697 }
698
699 #ifdef CONFIG_COMPAT
700 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
701                                 unsigned int cmd, unsigned long arg)
702 {
703         return rbd_ioctl(bdev, mode, cmd, arg);
704 }
705 #endif /* CONFIG_COMPAT */
706
707 static const struct block_device_operations rbd_bd_ops = {
708         .owner                  = THIS_MODULE,
709         .open                   = rbd_open,
710         .release                = rbd_release,
711         .ioctl                  = rbd_ioctl,
712 #ifdef CONFIG_COMPAT
713         .compat_ioctl           = rbd_compat_ioctl,
714 #endif
715 };
716
717 /*
718  * Initialize an rbd client instance.  Success or not, this function
719  * consumes ceph_opts.  Caller holds client_mutex.
720  */
721 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
722 {
723         struct rbd_client *rbdc;
724         int ret = -ENOMEM;
725
726         dout("%s:\n", __func__);
727         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
728         if (!rbdc)
729                 goto out_opt;
730
731         kref_init(&rbdc->kref);
732         INIT_LIST_HEAD(&rbdc->node);
733
734         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
735         if (IS_ERR(rbdc->client))
736                 goto out_rbdc;
737         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
738
739         ret = ceph_open_session(rbdc->client);
740         if (ret < 0)
741                 goto out_client;
742
743         spin_lock(&rbd_client_list_lock);
744         list_add_tail(&rbdc->node, &rbd_client_list);
745         spin_unlock(&rbd_client_list_lock);
746
747         dout("%s: rbdc %p\n", __func__, rbdc);
748
749         return rbdc;
750 out_client:
751         ceph_destroy_client(rbdc->client);
752 out_rbdc:
753         kfree(rbdc);
754 out_opt:
755         if (ceph_opts)
756                 ceph_destroy_options(ceph_opts);
757         dout("%s: error %d\n", __func__, ret);
758
759         return ERR_PTR(ret);
760 }
761
762 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
763 {
764         kref_get(&rbdc->kref);
765
766         return rbdc;
767 }
768
769 /*
770  * Find a ceph client with specific addr and configuration.  If
771  * found, bump its reference count.
772  */
773 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
774 {
775         struct rbd_client *client_node;
776         bool found = false;
777
778         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
779                 return NULL;
780
781         spin_lock(&rbd_client_list_lock);
782         list_for_each_entry(client_node, &rbd_client_list, node) {
783                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
784                         __rbd_get_client(client_node);
785
786                         found = true;
787                         break;
788                 }
789         }
790         spin_unlock(&rbd_client_list_lock);
791
792         return found ? client_node : NULL;
793 }
794
795 /*
796  * (Per device) rbd map options
797  */
798 enum {
799         Opt_queue_depth,
800         Opt_last_int,
801         /* int args above */
802         Opt_last_string,
803         /* string args above */
804         Opt_read_only,
805         Opt_read_write,
806         Opt_lock_on_read,
807         Opt_err
808 };
809
810 static match_table_t rbd_opts_tokens = {
811         {Opt_queue_depth, "queue_depth=%d"},
812         /* int args above */
813         /* string args above */
814         {Opt_read_only, "read_only"},
815         {Opt_read_only, "ro"},          /* Alternate spelling */
816         {Opt_read_write, "read_write"},
817         {Opt_read_write, "rw"},         /* Alternate spelling */
818         {Opt_lock_on_read, "lock_on_read"},
819         {Opt_err, NULL}
820 };
821
822 struct rbd_options {
823         int     queue_depth;
824         bool    read_only;
825         bool    lock_on_read;
826 };
827
828 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
829 #define RBD_READ_ONLY_DEFAULT   false
830 #define RBD_LOCK_ON_READ_DEFAULT false
831
832 static int parse_rbd_opts_token(char *c, void *private)
833 {
834         struct rbd_options *rbd_opts = private;
835         substring_t argstr[MAX_OPT_ARGS];
836         int token, intval, ret;
837
838         token = match_token(c, rbd_opts_tokens, argstr);
839         if (token < Opt_last_int) {
840                 ret = match_int(&argstr[0], &intval);
841                 if (ret < 0) {
842                         pr_err("bad mount option arg (not int) at '%s'\n", c);
843                         return ret;
844                 }
845                 dout("got int token %d val %d\n", token, intval);
846         } else if (token > Opt_last_int && token < Opt_last_string) {
847                 dout("got string token %d val %s\n", token, argstr[0].from);
848         } else {
849                 dout("got token %d\n", token);
850         }
851
852         switch (token) {
853         case Opt_queue_depth:
854                 if (intval < 1) {
855                         pr_err("queue_depth out of range\n");
856                         return -EINVAL;
857                 }
858                 rbd_opts->queue_depth = intval;
859                 break;
860         case Opt_read_only:
861                 rbd_opts->read_only = true;
862                 break;
863         case Opt_read_write:
864                 rbd_opts->read_only = false;
865                 break;
866         case Opt_lock_on_read:
867                 rbd_opts->lock_on_read = true;
868                 break;
869         default:
870                 /* libceph prints "bad option" msg */
871                 return -EINVAL;
872         }
873
874         return 0;
875 }
876
877 static char* obj_op_name(enum obj_operation_type op_type)
878 {
879         switch (op_type) {
880         case OBJ_OP_READ:
881                 return "read";
882         case OBJ_OP_WRITE:
883                 return "write";
884         case OBJ_OP_DISCARD:
885                 return "discard";
886         default:
887                 return "???";
888         }
889 }
890
891 /*
892  * Get a ceph client with specific addr and configuration, if one does
893  * not exist create it.  Either way, ceph_opts is consumed by this
894  * function.
895  */
896 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
897 {
898         struct rbd_client *rbdc;
899
900         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
901         rbdc = rbd_client_find(ceph_opts);
902         if (rbdc)       /* using an existing client */
903                 ceph_destroy_options(ceph_opts);
904         else
905                 rbdc = rbd_client_create(ceph_opts);
906         mutex_unlock(&client_mutex);
907
908         return rbdc;
909 }
910
911 /*
912  * Destroy ceph client
913  *
914  * Caller must hold rbd_client_list_lock.
915  */
916 static void rbd_client_release(struct kref *kref)
917 {
918         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
919
920         dout("%s: rbdc %p\n", __func__, rbdc);
921         spin_lock(&rbd_client_list_lock);
922         list_del(&rbdc->node);
923         spin_unlock(&rbd_client_list_lock);
924
925         ceph_destroy_client(rbdc->client);
926         kfree(rbdc);
927 }
928
929 /*
930  * Drop reference to ceph client node. If it's not referenced anymore, release
931  * it.
932  */
933 static void rbd_put_client(struct rbd_client *rbdc)
934 {
935         if (rbdc)
936                 kref_put(&rbdc->kref, rbd_client_release);
937 }
938
939 static bool rbd_image_format_valid(u32 image_format)
940 {
941         return image_format == 1 || image_format == 2;
942 }
943
944 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
945 {
946         size_t size;
947         u32 snap_count;
948
949         /* The header has to start with the magic rbd header text */
950         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
951                 return false;
952
953         /* The bio layer requires at least sector-sized I/O */
954
955         if (ondisk->options.order < SECTOR_SHIFT)
956                 return false;
957
958         /* If we use u64 in a few spots we may be able to loosen this */
959
960         if (ondisk->options.order > 8 * sizeof (int) - 1)
961                 return false;
962
963         /*
964          * The size of a snapshot header has to fit in a size_t, and
965          * that limits the number of snapshots.
966          */
967         snap_count = le32_to_cpu(ondisk->snap_count);
968         size = SIZE_MAX - sizeof (struct ceph_snap_context);
969         if (snap_count > size / sizeof (__le64))
970                 return false;
971
972         /*
973          * Not only that, but the size of the entire the snapshot
974          * header must also be representable in a size_t.
975          */
976         size -= snap_count * sizeof (__le64);
977         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
978                 return false;
979
980         return true;
981 }
982
983 /*
984  * returns the size of an object in the image
985  */
986 static u32 rbd_obj_bytes(struct rbd_image_header *header)
987 {
988         return 1U << header->obj_order;
989 }
990
991 static void rbd_init_layout(struct rbd_device *rbd_dev)
992 {
993         if (rbd_dev->header.stripe_unit == 0 ||
994             rbd_dev->header.stripe_count == 0) {
995                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
996                 rbd_dev->header.stripe_count = 1;
997         }
998
999         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1000         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1001         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1002         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1003                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1004         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1005 }
1006
1007 /*
1008  * Fill an rbd image header with information from the given format 1
1009  * on-disk header.
1010  */
1011 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1012                                  struct rbd_image_header_ondisk *ondisk)
1013 {
1014         struct rbd_image_header *header = &rbd_dev->header;
1015         bool first_time = header->object_prefix == NULL;
1016         struct ceph_snap_context *snapc;
1017         char *object_prefix = NULL;
1018         char *snap_names = NULL;
1019         u64 *snap_sizes = NULL;
1020         u32 snap_count;
1021         int ret = -ENOMEM;
1022         u32 i;
1023
1024         /* Allocate this now to avoid having to handle failure below */
1025
1026         if (first_time) {
1027                 object_prefix = kstrndup(ondisk->object_prefix,
1028                                          sizeof(ondisk->object_prefix),
1029                                          GFP_KERNEL);
1030                 if (!object_prefix)
1031                         return -ENOMEM;
1032         }
1033
1034         /* Allocate the snapshot context and fill it in */
1035
1036         snap_count = le32_to_cpu(ondisk->snap_count);
1037         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1038         if (!snapc)
1039                 goto out_err;
1040         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1041         if (snap_count) {
1042                 struct rbd_image_snap_ondisk *snaps;
1043                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1044
1045                 /* We'll keep a copy of the snapshot names... */
1046
1047                 if (snap_names_len > (u64)SIZE_MAX)
1048                         goto out_2big;
1049                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1050                 if (!snap_names)
1051                         goto out_err;
1052
1053                 /* ...as well as the array of their sizes. */
1054                 snap_sizes = kmalloc_array(snap_count,
1055                                            sizeof(*header->snap_sizes),
1056                                            GFP_KERNEL);
1057                 if (!snap_sizes)
1058                         goto out_err;
1059
1060                 /*
1061                  * Copy the names, and fill in each snapshot's id
1062                  * and size.
1063                  *
1064                  * Note that rbd_dev_v1_header_info() guarantees the
1065                  * ondisk buffer we're working with has
1066                  * snap_names_len bytes beyond the end of the
1067                  * snapshot id array, this memcpy() is safe.
1068                  */
1069                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1070                 snaps = ondisk->snaps;
1071                 for (i = 0; i < snap_count; i++) {
1072                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1073                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1074                 }
1075         }
1076
1077         /* We won't fail any more, fill in the header */
1078
1079         if (first_time) {
1080                 header->object_prefix = object_prefix;
1081                 header->obj_order = ondisk->options.order;
1082                 rbd_init_layout(rbd_dev);
1083         } else {
1084                 ceph_put_snap_context(header->snapc);
1085                 kfree(header->snap_names);
1086                 kfree(header->snap_sizes);
1087         }
1088
1089         /* The remaining fields always get updated (when we refresh) */
1090
1091         header->image_size = le64_to_cpu(ondisk->image_size);
1092         header->snapc = snapc;
1093         header->snap_names = snap_names;
1094         header->snap_sizes = snap_sizes;
1095
1096         return 0;
1097 out_2big:
1098         ret = -EIO;
1099 out_err:
1100         kfree(snap_sizes);
1101         kfree(snap_names);
1102         ceph_put_snap_context(snapc);
1103         kfree(object_prefix);
1104
1105         return ret;
1106 }
1107
1108 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1109 {
1110         const char *snap_name;
1111
1112         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1113
1114         /* Skip over names until we find the one we are looking for */
1115
1116         snap_name = rbd_dev->header.snap_names;
1117         while (which--)
1118                 snap_name += strlen(snap_name) + 1;
1119
1120         return kstrdup(snap_name, GFP_KERNEL);
1121 }
1122
1123 /*
1124  * Snapshot id comparison function for use with qsort()/bsearch().
1125  * Note that result is for snapshots in *descending* order.
1126  */
1127 static int snapid_compare_reverse(const void *s1, const void *s2)
1128 {
1129         u64 snap_id1 = *(u64 *)s1;
1130         u64 snap_id2 = *(u64 *)s2;
1131
1132         if (snap_id1 < snap_id2)
1133                 return 1;
1134         return snap_id1 == snap_id2 ? 0 : -1;
1135 }
1136
1137 /*
1138  * Search a snapshot context to see if the given snapshot id is
1139  * present.
1140  *
1141  * Returns the position of the snapshot id in the array if it's found,
1142  * or BAD_SNAP_INDEX otherwise.
1143  *
1144  * Note: The snapshot array is in kept sorted (by the osd) in
1145  * reverse order, highest snapshot id first.
1146  */
1147 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1148 {
1149         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1150         u64 *found;
1151
1152         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1153                                 sizeof (snap_id), snapid_compare_reverse);
1154
1155         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1156 }
1157
1158 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1159                                         u64 snap_id)
1160 {
1161         u32 which;
1162         const char *snap_name;
1163
1164         which = rbd_dev_snap_index(rbd_dev, snap_id);
1165         if (which == BAD_SNAP_INDEX)
1166                 return ERR_PTR(-ENOENT);
1167
1168         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1169         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1170 }
1171
1172 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1173 {
1174         if (snap_id == CEPH_NOSNAP)
1175                 return RBD_SNAP_HEAD_NAME;
1176
1177         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1178         if (rbd_dev->image_format == 1)
1179                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1180
1181         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1182 }
1183
1184 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1185                                 u64 *snap_size)
1186 {
1187         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1188         if (snap_id == CEPH_NOSNAP) {
1189                 *snap_size = rbd_dev->header.image_size;
1190         } else if (rbd_dev->image_format == 1) {
1191                 u32 which;
1192
1193                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1194                 if (which == BAD_SNAP_INDEX)
1195                         return -ENOENT;
1196
1197                 *snap_size = rbd_dev->header.snap_sizes[which];
1198         } else {
1199                 u64 size = 0;
1200                 int ret;
1201
1202                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1203                 if (ret)
1204                         return ret;
1205
1206                 *snap_size = size;
1207         }
1208         return 0;
1209 }
1210
1211 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1212                         u64 *snap_features)
1213 {
1214         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1215         if (snap_id == CEPH_NOSNAP) {
1216                 *snap_features = rbd_dev->header.features;
1217         } else if (rbd_dev->image_format == 1) {
1218                 *snap_features = 0;     /* No features for format 1 */
1219         } else {
1220                 u64 features = 0;
1221                 int ret;
1222
1223                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1224                 if (ret)
1225                         return ret;
1226
1227                 *snap_features = features;
1228         }
1229         return 0;
1230 }
1231
1232 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1233 {
1234         u64 snap_id = rbd_dev->spec->snap_id;
1235         u64 size = 0;
1236         u64 features = 0;
1237         int ret;
1238
1239         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1240         if (ret)
1241                 return ret;
1242         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1243         if (ret)
1244                 return ret;
1245
1246         rbd_dev->mapping.size = size;
1247         rbd_dev->mapping.features = features;
1248
1249         return 0;
1250 }
1251
1252 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1253 {
1254         rbd_dev->mapping.size = 0;
1255         rbd_dev->mapping.features = 0;
1256 }
1257
1258 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1259 {
1260         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1261
1262         return offset & (segment_size - 1);
1263 }
1264
1265 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1266                                 u64 offset, u64 length)
1267 {
1268         u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
1269
1270         offset &= segment_size - 1;
1271
1272         rbd_assert(length <= U64_MAX - offset);
1273         if (offset + length > segment_size)
1274                 length = segment_size - offset;
1275
1276         return length;
1277 }
1278
1279 /*
1280  * bio helpers
1281  */
1282
1283 static void bio_chain_put(struct bio *chain)
1284 {
1285         struct bio *tmp;
1286
1287         while (chain) {
1288                 tmp = chain;
1289                 chain = chain->bi_next;
1290                 bio_put(tmp);
1291         }
1292 }
1293
1294 /*
1295  * zeros a bio chain, starting at specific offset
1296  */
1297 static void zero_bio_chain(struct bio *chain, int start_ofs)
1298 {
1299         struct bio_vec bv;
1300         struct bvec_iter iter;
1301         unsigned long flags;
1302         void *buf;
1303         int pos = 0;
1304
1305         while (chain) {
1306                 bio_for_each_segment(bv, chain, iter) {
1307                         if (pos + bv.bv_len > start_ofs) {
1308                                 int remainder = max(start_ofs - pos, 0);
1309                                 buf = bvec_kmap_irq(&bv, &flags);
1310                                 memset(buf + remainder, 0,
1311                                        bv.bv_len - remainder);
1312                                 flush_dcache_page(bv.bv_page);
1313                                 bvec_kunmap_irq(buf, &flags);
1314                         }
1315                         pos += bv.bv_len;
1316                 }
1317
1318                 chain = chain->bi_next;
1319         }
1320 }
1321
1322 /*
1323  * similar to zero_bio_chain(), zeros data defined by a page array,
1324  * starting at the given byte offset from the start of the array and
1325  * continuing up to the given end offset.  The pages array is
1326  * assumed to be big enough to hold all bytes up to the end.
1327  */
1328 static void zero_pages(struct page **pages, u64 offset, u64 end)
1329 {
1330         struct page **page = &pages[offset >> PAGE_SHIFT];
1331
1332         rbd_assert(end > offset);
1333         rbd_assert(end - offset <= (u64)SIZE_MAX);
1334         while (offset < end) {
1335                 size_t page_offset;
1336                 size_t length;
1337                 unsigned long flags;
1338                 void *kaddr;
1339
1340                 page_offset = offset & ~PAGE_MASK;
1341                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1342                 local_irq_save(flags);
1343                 kaddr = kmap_atomic(*page);
1344                 memset(kaddr + page_offset, 0, length);
1345                 flush_dcache_page(*page);
1346                 kunmap_atomic(kaddr);
1347                 local_irq_restore(flags);
1348
1349                 offset += length;
1350                 page++;
1351         }
1352 }
1353
1354 /*
1355  * Clone a portion of a bio, starting at the given byte offset
1356  * and continuing for the number of bytes indicated.
1357  */
1358 static struct bio *bio_clone_range(struct bio *bio_src,
1359                                         unsigned int offset,
1360                                         unsigned int len,
1361                                         gfp_t gfpmask)
1362 {
1363         struct bio *bio;
1364
1365         bio = bio_clone(bio_src, gfpmask);
1366         if (!bio)
1367                 return NULL;    /* ENOMEM */
1368
1369         bio_advance(bio, offset);
1370         bio->bi_iter.bi_size = len;
1371
1372         return bio;
1373 }
1374
1375 /*
1376  * Clone a portion of a bio chain, starting at the given byte offset
1377  * into the first bio in the source chain and continuing for the
1378  * number of bytes indicated.  The result is another bio chain of
1379  * exactly the given length, or a null pointer on error.
1380  *
1381  * The bio_src and offset parameters are both in-out.  On entry they
1382  * refer to the first source bio and the offset into that bio where
1383  * the start of data to be cloned is located.
1384  *
1385  * On return, bio_src is updated to refer to the bio in the source
1386  * chain that contains first un-cloned byte, and *offset will
1387  * contain the offset of that byte within that bio.
1388  */
1389 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1390                                         unsigned int *offset,
1391                                         unsigned int len,
1392                                         gfp_t gfpmask)
1393 {
1394         struct bio *bi = *bio_src;
1395         unsigned int off = *offset;
1396         struct bio *chain = NULL;
1397         struct bio **end;
1398
1399         /* Build up a chain of clone bios up to the limit */
1400
1401         if (!bi || off >= bi->bi_iter.bi_size || !len)
1402                 return NULL;            /* Nothing to clone */
1403
1404         end = &chain;
1405         while (len) {
1406                 unsigned int bi_size;
1407                 struct bio *bio;
1408
1409                 if (!bi) {
1410                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1411                         goto out_err;   /* EINVAL; ran out of bio's */
1412                 }
1413                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1414                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1415                 if (!bio)
1416                         goto out_err;   /* ENOMEM */
1417
1418                 *end = bio;
1419                 end = &bio->bi_next;
1420
1421                 off += bi_size;
1422                 if (off == bi->bi_iter.bi_size) {
1423                         bi = bi->bi_next;
1424                         off = 0;
1425                 }
1426                 len -= bi_size;
1427         }
1428         *bio_src = bi;
1429         *offset = off;
1430
1431         return chain;
1432 out_err:
1433         bio_chain_put(chain);
1434
1435         return NULL;
1436 }
1437
1438 /*
1439  * The default/initial value for all object request flags is 0.  For
1440  * each flag, once its value is set to 1 it is never reset to 0
1441  * again.
1442  */
1443 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1444 {
1445         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1446                 struct rbd_device *rbd_dev;
1447
1448                 rbd_dev = obj_request->img_request->rbd_dev;
1449                 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1450                         obj_request);
1451         }
1452 }
1453
1454 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1455 {
1456         smp_mb();
1457         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1458 }
1459
1460 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1461 {
1462         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1463                 struct rbd_device *rbd_dev = NULL;
1464
1465                 if (obj_request_img_data_test(obj_request))
1466                         rbd_dev = obj_request->img_request->rbd_dev;
1467                 rbd_warn(rbd_dev, "obj_request %p already marked done",
1468                         obj_request);
1469         }
1470 }
1471
1472 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1473 {
1474         smp_mb();
1475         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1476 }
1477
1478 /*
1479  * This sets the KNOWN flag after (possibly) setting the EXISTS
1480  * flag.  The latter is set based on the "exists" value provided.
1481  *
1482  * Note that for our purposes once an object exists it never goes
1483  * away again.  It's possible that the response from two existence
1484  * checks are separated by the creation of the target object, and
1485  * the first ("doesn't exist") response arrives *after* the second
1486  * ("does exist").  In that case we ignore the second one.
1487  */
1488 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1489                                 bool exists)
1490 {
1491         if (exists)
1492                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1493         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1494         smp_mb();
1495 }
1496
1497 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1498 {
1499         smp_mb();
1500         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1501 }
1502
1503 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1504 {
1505         smp_mb();
1506         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1507 }
1508
1509 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1510 {
1511         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1512
1513         return obj_request->img_offset <
1514             round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1515 }
1516
1517 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1518 {
1519         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1520                 kref_read(&obj_request->kref));
1521         kref_get(&obj_request->kref);
1522 }
1523
1524 static void rbd_obj_request_destroy(struct kref *kref);
1525 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1526 {
1527         rbd_assert(obj_request != NULL);
1528         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1529                 kref_read(&obj_request->kref));
1530         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1531 }
1532
1533 static void rbd_img_request_get(struct rbd_img_request *img_request)
1534 {
1535         dout("%s: img %p (was %d)\n", __func__, img_request,
1536              kref_read(&img_request->kref));
1537         kref_get(&img_request->kref);
1538 }
1539
1540 static bool img_request_child_test(struct rbd_img_request *img_request);
1541 static void rbd_parent_request_destroy(struct kref *kref);
1542 static void rbd_img_request_destroy(struct kref *kref);
1543 static void rbd_img_request_put(struct rbd_img_request *img_request)
1544 {
1545         rbd_assert(img_request != NULL);
1546         dout("%s: img %p (was %d)\n", __func__, img_request,
1547                 kref_read(&img_request->kref));
1548         if (img_request_child_test(img_request))
1549                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1550         else
1551                 kref_put(&img_request->kref, rbd_img_request_destroy);
1552 }
1553
1554 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1555                                         struct rbd_obj_request *obj_request)
1556 {
1557         rbd_assert(obj_request->img_request == NULL);
1558
1559         /* Image request now owns object's original reference */
1560         obj_request->img_request = img_request;
1561         obj_request->which = img_request->obj_request_count;
1562         rbd_assert(!obj_request_img_data_test(obj_request));
1563         obj_request_img_data_set(obj_request);
1564         rbd_assert(obj_request->which != BAD_WHICH);
1565         img_request->obj_request_count++;
1566         list_add_tail(&obj_request->links, &img_request->obj_requests);
1567         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1568                 obj_request->which);
1569 }
1570
1571 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1572                                         struct rbd_obj_request *obj_request)
1573 {
1574         rbd_assert(obj_request->which != BAD_WHICH);
1575
1576         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1577                 obj_request->which);
1578         list_del(&obj_request->links);
1579         rbd_assert(img_request->obj_request_count > 0);
1580         img_request->obj_request_count--;
1581         rbd_assert(obj_request->which == img_request->obj_request_count);
1582         obj_request->which = BAD_WHICH;
1583         rbd_assert(obj_request_img_data_test(obj_request));
1584         rbd_assert(obj_request->img_request == img_request);
1585         obj_request->img_request = NULL;
1586         obj_request->callback = NULL;
1587         rbd_obj_request_put(obj_request);
1588 }
1589
1590 static bool obj_request_type_valid(enum obj_request_type type)
1591 {
1592         switch (type) {
1593         case OBJ_REQUEST_NODATA:
1594         case OBJ_REQUEST_BIO:
1595         case OBJ_REQUEST_PAGES:
1596                 return true;
1597         default:
1598                 return false;
1599         }
1600 }
1601
1602 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1603
1604 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1605 {
1606         struct ceph_osd_request *osd_req = obj_request->osd_req;
1607
1608         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1609              obj_request, obj_request->object_no, obj_request->offset,
1610              obj_request->length, osd_req);
1611         if (obj_request_img_data_test(obj_request)) {
1612                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1613                 rbd_img_request_get(obj_request->img_request);
1614         }
1615         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1616 }
1617
1618 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1619 {
1620
1621         dout("%s: img %p\n", __func__, img_request);
1622
1623         /*
1624          * If no error occurred, compute the aggregate transfer
1625          * count for the image request.  We could instead use
1626          * atomic64_cmpxchg() to update it as each object request
1627          * completes; not clear which way is better off hand.
1628          */
1629         if (!img_request->result) {
1630                 struct rbd_obj_request *obj_request;
1631                 u64 xferred = 0;
1632
1633                 for_each_obj_request(img_request, obj_request)
1634                         xferred += obj_request->xferred;
1635                 img_request->xferred = xferred;
1636         }
1637
1638         if (img_request->callback)
1639                 img_request->callback(img_request);
1640         else
1641                 rbd_img_request_put(img_request);
1642 }
1643
1644 /*
1645  * The default/initial value for all image request flags is 0.  Each
1646  * is conditionally set to 1 at image request initialization time
1647  * and currently never change thereafter.
1648  */
1649 static void img_request_write_set(struct rbd_img_request *img_request)
1650 {
1651         set_bit(IMG_REQ_WRITE, &img_request->flags);
1652         smp_mb();
1653 }
1654
1655 static bool img_request_write_test(struct rbd_img_request *img_request)
1656 {
1657         smp_mb();
1658         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1659 }
1660
1661 /*
1662  * Set the discard flag when the img_request is an discard request
1663  */
1664 static void img_request_discard_set(struct rbd_img_request *img_request)
1665 {
1666         set_bit(IMG_REQ_DISCARD, &img_request->flags);
1667         smp_mb();
1668 }
1669
1670 static bool img_request_discard_test(struct rbd_img_request *img_request)
1671 {
1672         smp_mb();
1673         return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1674 }
1675
1676 static void img_request_child_set(struct rbd_img_request *img_request)
1677 {
1678         set_bit(IMG_REQ_CHILD, &img_request->flags);
1679         smp_mb();
1680 }
1681
1682 static void img_request_child_clear(struct rbd_img_request *img_request)
1683 {
1684         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1685         smp_mb();
1686 }
1687
1688 static bool img_request_child_test(struct rbd_img_request *img_request)
1689 {
1690         smp_mb();
1691         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1692 }
1693
1694 static void img_request_layered_set(struct rbd_img_request *img_request)
1695 {
1696         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1697         smp_mb();
1698 }
1699
1700 static void img_request_layered_clear(struct rbd_img_request *img_request)
1701 {
1702         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1703         smp_mb();
1704 }
1705
1706 static bool img_request_layered_test(struct rbd_img_request *img_request)
1707 {
1708         smp_mb();
1709         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1710 }
1711
1712 static enum obj_operation_type
1713 rbd_img_request_op_type(struct rbd_img_request *img_request)
1714 {
1715         if (img_request_write_test(img_request))
1716                 return OBJ_OP_WRITE;
1717         else if (img_request_discard_test(img_request))
1718                 return OBJ_OP_DISCARD;
1719         else
1720                 return OBJ_OP_READ;
1721 }
1722
1723 static void
1724 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1725 {
1726         u64 xferred = obj_request->xferred;
1727         u64 length = obj_request->length;
1728
1729         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1730                 obj_request, obj_request->img_request, obj_request->result,
1731                 xferred, length);
1732         /*
1733          * ENOENT means a hole in the image.  We zero-fill the entire
1734          * length of the request.  A short read also implies zero-fill
1735          * to the end of the request.  An error requires the whole
1736          * length of the request to be reported finished with an error
1737          * to the block layer.  In each case we update the xferred
1738          * count to indicate the whole request was satisfied.
1739          */
1740         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1741         if (obj_request->result == -ENOENT) {
1742                 if (obj_request->type == OBJ_REQUEST_BIO)
1743                         zero_bio_chain(obj_request->bio_list, 0);
1744                 else
1745                         zero_pages(obj_request->pages, 0, length);
1746                 obj_request->result = 0;
1747         } else if (xferred < length && !obj_request->result) {
1748                 if (obj_request->type == OBJ_REQUEST_BIO)
1749                         zero_bio_chain(obj_request->bio_list, xferred);
1750                 else
1751                         zero_pages(obj_request->pages, xferred, length);
1752         }
1753         obj_request->xferred = length;
1754         obj_request_done_set(obj_request);
1755 }
1756
1757 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1758 {
1759         dout("%s: obj %p cb %p\n", __func__, obj_request,
1760                 obj_request->callback);
1761         if (obj_request->callback)
1762                 obj_request->callback(obj_request);
1763         else
1764                 complete_all(&obj_request->completion);
1765 }
1766
1767 static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1768 {
1769         obj_request->result = err;
1770         obj_request->xferred = 0;
1771         /*
1772          * kludge - mirror rbd_obj_request_submit() to match a put in
1773          * rbd_img_obj_callback()
1774          */
1775         if (obj_request_img_data_test(obj_request)) {
1776                 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1777                 rbd_img_request_get(obj_request->img_request);
1778         }
1779         obj_request_done_set(obj_request);
1780         rbd_obj_request_complete(obj_request);
1781 }
1782
1783 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1784 {
1785         struct rbd_img_request *img_request = NULL;
1786         struct rbd_device *rbd_dev = NULL;
1787         bool layered = false;
1788
1789         if (obj_request_img_data_test(obj_request)) {
1790                 img_request = obj_request->img_request;
1791                 layered = img_request && img_request_layered_test(img_request);
1792                 rbd_dev = img_request->rbd_dev;
1793         }
1794
1795         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1796                 obj_request, img_request, obj_request->result,
1797                 obj_request->xferred, obj_request->length);
1798         if (layered && obj_request->result == -ENOENT &&
1799                         obj_request->img_offset < rbd_dev->parent_overlap)
1800                 rbd_img_parent_read(obj_request);
1801         else if (img_request)
1802                 rbd_img_obj_request_read_callback(obj_request);
1803         else
1804                 obj_request_done_set(obj_request);
1805 }
1806
1807 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1808 {
1809         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1810                 obj_request->result, obj_request->length);
1811         /*
1812          * There is no such thing as a successful short write.  Set
1813          * it to our originally-requested length.
1814          */
1815         obj_request->xferred = obj_request->length;
1816         obj_request_done_set(obj_request);
1817 }
1818
1819 static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1820 {
1821         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1822                 obj_request->result, obj_request->length);
1823         /*
1824          * There is no such thing as a successful short discard.  Set
1825          * it to our originally-requested length.
1826          */
1827         obj_request->xferred = obj_request->length;
1828         /* discarding a non-existent object is not a problem */
1829         if (obj_request->result == -ENOENT)
1830                 obj_request->result = 0;
1831         obj_request_done_set(obj_request);
1832 }
1833
1834 /*
1835  * For a simple stat call there's nothing to do.  We'll do more if
1836  * this is part of a write sequence for a layered image.
1837  */
1838 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1839 {
1840         dout("%s: obj %p\n", __func__, obj_request);
1841         obj_request_done_set(obj_request);
1842 }
1843
1844 static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1845 {
1846         dout("%s: obj %p\n", __func__, obj_request);
1847
1848         if (obj_request_img_data_test(obj_request))
1849                 rbd_osd_copyup_callback(obj_request);
1850         else
1851                 obj_request_done_set(obj_request);
1852 }
1853
1854 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1855 {
1856         struct rbd_obj_request *obj_request = osd_req->r_priv;
1857         u16 opcode;
1858
1859         dout("%s: osd_req %p\n", __func__, osd_req);
1860         rbd_assert(osd_req == obj_request->osd_req);
1861         if (obj_request_img_data_test(obj_request)) {
1862                 rbd_assert(obj_request->img_request);
1863                 rbd_assert(obj_request->which != BAD_WHICH);
1864         } else {
1865                 rbd_assert(obj_request->which == BAD_WHICH);
1866         }
1867
1868         if (osd_req->r_result < 0)
1869                 obj_request->result = osd_req->r_result;
1870
1871         /*
1872          * We support a 64-bit length, but ultimately it has to be
1873          * passed to the block layer, which just supports a 32-bit
1874          * length field.
1875          */
1876         obj_request->xferred = osd_req->r_ops[0].outdata_len;
1877         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1878
1879         opcode = osd_req->r_ops[0].op;
1880         switch (opcode) {
1881         case CEPH_OSD_OP_READ:
1882                 rbd_osd_read_callback(obj_request);
1883                 break;
1884         case CEPH_OSD_OP_SETALLOCHINT:
1885                 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1886                            osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
1887                 /* fall through */
1888         case CEPH_OSD_OP_WRITE:
1889         case CEPH_OSD_OP_WRITEFULL:
1890                 rbd_osd_write_callback(obj_request);
1891                 break;
1892         case CEPH_OSD_OP_STAT:
1893                 rbd_osd_stat_callback(obj_request);
1894                 break;
1895         case CEPH_OSD_OP_DELETE:
1896         case CEPH_OSD_OP_TRUNCATE:
1897         case CEPH_OSD_OP_ZERO:
1898                 rbd_osd_discard_callback(obj_request);
1899                 break;
1900         case CEPH_OSD_OP_CALL:
1901                 rbd_osd_call_callback(obj_request);
1902                 break;
1903         default:
1904                 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1905                          obj_request->object_no, opcode);
1906                 break;
1907         }
1908
1909         if (obj_request_done_test(obj_request))
1910                 rbd_obj_request_complete(obj_request);
1911 }
1912
1913 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1914 {
1915         struct ceph_osd_request *osd_req = obj_request->osd_req;
1916
1917         rbd_assert(obj_request_img_data_test(obj_request));
1918         osd_req->r_snapid = obj_request->img_request->snap_id;
1919 }
1920
1921 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1922 {
1923         struct ceph_osd_request *osd_req = obj_request->osd_req;
1924
1925         osd_req->r_mtime = CURRENT_TIME;
1926         osd_req->r_data_offset = obj_request->offset;
1927 }
1928
1929 static struct ceph_osd_request *
1930 __rbd_osd_req_create(struct rbd_device *rbd_dev,
1931                      struct ceph_snap_context *snapc,
1932                      int num_ops, unsigned int flags,
1933                      struct rbd_obj_request *obj_request)
1934 {
1935         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1936         struct ceph_osd_request *req;
1937         const char *name_format = rbd_dev->image_format == 1 ?
1938                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1939
1940         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1941         if (!req)
1942                 return NULL;
1943
1944         req->r_flags = flags;
1945         req->r_callback = rbd_osd_req_callback;
1946         req->r_priv = obj_request;
1947
1948         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1949         if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1950                         rbd_dev->header.object_prefix, obj_request->object_no))
1951                 goto err_req;
1952
1953         if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1954                 goto err_req;
1955
1956         return req;
1957
1958 err_req:
1959         ceph_osdc_put_request(req);
1960         return NULL;
1961 }
1962
1963 /*
1964  * Create an osd request.  A read request has one osd op (read).
1965  * A write request has either one (watch) or two (hint+write) osd ops.
1966  * (All rbd data writes are prefixed with an allocation hint op, but
1967  * technically osd watch is a write request, hence this distinction.)
1968  */
1969 static struct ceph_osd_request *rbd_osd_req_create(
1970                                         struct rbd_device *rbd_dev,
1971                                         enum obj_operation_type op_type,
1972                                         unsigned int num_ops,
1973                                         struct rbd_obj_request *obj_request)
1974 {
1975         struct ceph_snap_context *snapc = NULL;
1976
1977         if (obj_request_img_data_test(obj_request) &&
1978                 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1979                 struct rbd_img_request *img_request = obj_request->img_request;
1980                 if (op_type == OBJ_OP_WRITE) {
1981                         rbd_assert(img_request_write_test(img_request));
1982                 } else {
1983                         rbd_assert(img_request_discard_test(img_request));
1984                 }
1985                 snapc = img_request->snapc;
1986         }
1987
1988         rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1989
1990         return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1991             (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
1992             CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
1993 }
1994
1995 /*
1996  * Create a copyup osd request based on the information in the object
1997  * request supplied.  A copyup request has two or three osd ops, a
1998  * copyup method call, potentially a hint op, and a write or truncate
1999  * or zero op.
2000  */
2001 static struct ceph_osd_request *
2002 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
2003 {
2004         struct rbd_img_request *img_request;
2005         int num_osd_ops = 3;
2006
2007         rbd_assert(obj_request_img_data_test(obj_request));
2008         img_request = obj_request->img_request;
2009         rbd_assert(img_request);
2010         rbd_assert(img_request_write_test(img_request) ||
2011                         img_request_discard_test(img_request));
2012
2013         if (img_request_discard_test(img_request))
2014                 num_osd_ops = 2;
2015
2016         return __rbd_osd_req_create(img_request->rbd_dev,
2017                                     img_request->snapc, num_osd_ops,
2018                                     CEPH_OSD_FLAG_WRITE, obj_request);
2019 }
2020
2021 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2022 {
2023         ceph_osdc_put_request(osd_req);
2024 }
2025
2026 static struct rbd_obj_request *
2027 rbd_obj_request_create(enum obj_request_type type)
2028 {
2029         struct rbd_obj_request *obj_request;
2030
2031         rbd_assert(obj_request_type_valid(type));
2032
2033         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
2034         if (!obj_request)
2035                 return NULL;
2036
2037         obj_request->which = BAD_WHICH;
2038         obj_request->type = type;
2039         INIT_LIST_HEAD(&obj_request->links);
2040         init_completion(&obj_request->completion);
2041         kref_init(&obj_request->kref);
2042
2043         dout("%s %p\n", __func__, obj_request);
2044         return obj_request;
2045 }
2046
2047 static void rbd_obj_request_destroy(struct kref *kref)
2048 {
2049         struct rbd_obj_request *obj_request;
2050
2051         obj_request = container_of(kref, struct rbd_obj_request, kref);
2052
2053         dout("%s: obj %p\n", __func__, obj_request);
2054
2055         rbd_assert(obj_request->img_request == NULL);
2056         rbd_assert(obj_request->which == BAD_WHICH);
2057
2058         if (obj_request->osd_req)
2059                 rbd_osd_req_destroy(obj_request->osd_req);
2060
2061         rbd_assert(obj_request_type_valid(obj_request->type));
2062         switch (obj_request->type) {
2063         case OBJ_REQUEST_NODATA:
2064                 break;          /* Nothing to do */
2065         case OBJ_REQUEST_BIO:
2066                 if (obj_request->bio_list)
2067                         bio_chain_put(obj_request->bio_list);
2068                 break;
2069         case OBJ_REQUEST_PAGES:
2070                 /* img_data requests don't own their page array */
2071                 if (obj_request->pages &&
2072                     !obj_request_img_data_test(obj_request))
2073                         ceph_release_page_vector(obj_request->pages,
2074                                                 obj_request->page_count);
2075                 break;
2076         }
2077
2078         kmem_cache_free(rbd_obj_request_cache, obj_request);
2079 }
2080
2081 /* It's OK to call this for a device with no parent */
2082
2083 static void rbd_spec_put(struct rbd_spec *spec);
2084 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2085 {
2086         rbd_dev_remove_parent(rbd_dev);
2087         rbd_spec_put(rbd_dev->parent_spec);
2088         rbd_dev->parent_spec = NULL;
2089         rbd_dev->parent_overlap = 0;
2090 }
2091
2092 /*
2093  * Parent image reference counting is used to determine when an
2094  * image's parent fields can be safely torn down--after there are no
2095  * more in-flight requests to the parent image.  When the last
2096  * reference is dropped, cleaning them up is safe.
2097  */
2098 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2099 {
2100         int counter;
2101
2102         if (!rbd_dev->parent_spec)
2103                 return;
2104
2105         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2106         if (counter > 0)
2107                 return;
2108
2109         /* Last reference; clean up parent data structures */
2110
2111         if (!counter)
2112                 rbd_dev_unparent(rbd_dev);
2113         else
2114                 rbd_warn(rbd_dev, "parent reference underflow");
2115 }
2116
2117 /*
2118  * If an image has a non-zero parent overlap, get a reference to its
2119  * parent.
2120  *
2121  * Returns true if the rbd device has a parent with a non-zero
2122  * overlap and a reference for it was successfully taken, or
2123  * false otherwise.
2124  */
2125 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2126 {
2127         int counter = 0;
2128
2129         if (!rbd_dev->parent_spec)
2130                 return false;
2131
2132         down_read(&rbd_dev->header_rwsem);
2133         if (rbd_dev->parent_overlap)
2134                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2135         up_read(&rbd_dev->header_rwsem);
2136
2137         if (counter < 0)
2138                 rbd_warn(rbd_dev, "parent reference overflow");
2139
2140         return counter > 0;
2141 }
2142
2143 /*
2144  * Caller is responsible for filling in the list of object requests
2145  * that comprises the image request, and the Linux request pointer
2146  * (if there is one).
2147  */
2148 static struct rbd_img_request *rbd_img_request_create(
2149                                         struct rbd_device *rbd_dev,
2150                                         u64 offset, u64 length,
2151                                         enum obj_operation_type op_type,
2152                                         struct ceph_snap_context *snapc)
2153 {
2154         struct rbd_img_request *img_request;
2155
2156         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2157         if (!img_request)
2158                 return NULL;
2159
2160         img_request->rq = NULL;
2161         img_request->rbd_dev = rbd_dev;
2162         img_request->offset = offset;
2163         img_request->length = length;
2164         img_request->flags = 0;
2165         if (op_type == OBJ_OP_DISCARD) {
2166                 img_request_discard_set(img_request);
2167                 img_request->snapc = snapc;
2168         } else if (op_type == OBJ_OP_WRITE) {
2169                 img_request_write_set(img_request);
2170                 img_request->snapc = snapc;
2171         } else {
2172                 img_request->snap_id = rbd_dev->spec->snap_id;
2173         }
2174         if (rbd_dev_parent_get(rbd_dev))
2175                 img_request_layered_set(img_request);
2176         spin_lock_init(&img_request->completion_lock);
2177         img_request->next_completion = 0;
2178         img_request->callback = NULL;
2179         img_request->result = 0;
2180         img_request->obj_request_count = 0;
2181         INIT_LIST_HEAD(&img_request->obj_requests);
2182         kref_init(&img_request->kref);
2183
2184         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2185                 obj_op_name(op_type), offset, length, img_request);
2186
2187         return img_request;
2188 }
2189
2190 static void rbd_img_request_destroy(struct kref *kref)
2191 {
2192         struct rbd_img_request *img_request;
2193         struct rbd_obj_request *obj_request;
2194         struct rbd_obj_request *next_obj_request;
2195
2196         img_request = container_of(kref, struct rbd_img_request, kref);
2197
2198         dout("%s: img %p\n", __func__, img_request);
2199
2200         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2201                 rbd_img_obj_request_del(img_request, obj_request);
2202         rbd_assert(img_request->obj_request_count == 0);
2203
2204         if (img_request_layered_test(img_request)) {
2205                 img_request_layered_clear(img_request);
2206                 rbd_dev_parent_put(img_request->rbd_dev);
2207         }
2208
2209         if (img_request_write_test(img_request) ||
2210                 img_request_discard_test(img_request))
2211                 ceph_put_snap_context(img_request->snapc);
2212
2213         kmem_cache_free(rbd_img_request_cache, img_request);
2214 }
2215
2216 static struct rbd_img_request *rbd_parent_request_create(
2217                                         struct rbd_obj_request *obj_request,
2218                                         u64 img_offset, u64 length)
2219 {
2220         struct rbd_img_request *parent_request;
2221         struct rbd_device *rbd_dev;
2222
2223         rbd_assert(obj_request->img_request);
2224         rbd_dev = obj_request->img_request->rbd_dev;
2225
2226         parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2227                                                 length, OBJ_OP_READ, NULL);
2228         if (!parent_request)
2229                 return NULL;
2230
2231         img_request_child_set(parent_request);
2232         rbd_obj_request_get(obj_request);
2233         parent_request->obj_request = obj_request;
2234
2235         return parent_request;
2236 }
2237
2238 static void rbd_parent_request_destroy(struct kref *kref)
2239 {
2240         struct rbd_img_request *parent_request;
2241         struct rbd_obj_request *orig_request;
2242
2243         parent_request = container_of(kref, struct rbd_img_request, kref);
2244         orig_request = parent_request->obj_request;
2245
2246         parent_request->obj_request = NULL;
2247         rbd_obj_request_put(orig_request);
2248         img_request_child_clear(parent_request);
2249
2250         rbd_img_request_destroy(kref);
2251 }
2252
2253 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2254 {
2255         struct rbd_img_request *img_request;
2256         unsigned int xferred;
2257         int result;
2258         bool more;
2259
2260         rbd_assert(obj_request_img_data_test(obj_request));
2261         img_request = obj_request->img_request;
2262
2263         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2264         xferred = (unsigned int)obj_request->xferred;
2265         result = obj_request->result;
2266         if (result) {
2267                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2268                 enum obj_operation_type op_type;
2269
2270                 if (img_request_discard_test(img_request))
2271                         op_type = OBJ_OP_DISCARD;
2272                 else if (img_request_write_test(img_request))
2273                         op_type = OBJ_OP_WRITE;
2274                 else
2275                         op_type = OBJ_OP_READ;
2276
2277                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2278                         obj_op_name(op_type), obj_request->length,
2279                         obj_request->img_offset, obj_request->offset);
2280                 rbd_warn(rbd_dev, "  result %d xferred %x",
2281                         result, xferred);
2282                 if (!img_request->result)
2283                         img_request->result = result;
2284                 /*
2285                  * Need to end I/O on the entire obj_request worth of
2286                  * bytes in case of error.
2287                  */
2288                 xferred = obj_request->length;
2289         }
2290
2291         if (img_request_child_test(img_request)) {
2292                 rbd_assert(img_request->obj_request != NULL);
2293                 more = obj_request->which < img_request->obj_request_count - 1;
2294         } else {
2295                 rbd_assert(img_request->rq != NULL);
2296
2297                 more = blk_update_request(img_request->rq, result, xferred);
2298                 if (!more)
2299                         __blk_mq_end_request(img_request->rq, result);
2300         }
2301
2302         return more;
2303 }
2304
2305 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2306 {
2307         struct rbd_img_request *img_request;
2308         u32 which = obj_request->which;
2309         bool more = true;
2310
2311         rbd_assert(obj_request_img_data_test(obj_request));
2312         img_request = obj_request->img_request;
2313
2314         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2315         rbd_assert(img_request != NULL);
2316         rbd_assert(img_request->obj_request_count > 0);
2317         rbd_assert(which != BAD_WHICH);
2318         rbd_assert(which < img_request->obj_request_count);
2319
2320         spin_lock_irq(&img_request->completion_lock);
2321         if (which != img_request->next_completion)
2322                 goto out;
2323
2324         for_each_obj_request_from(img_request, obj_request) {
2325                 rbd_assert(more);
2326                 rbd_assert(which < img_request->obj_request_count);
2327
2328                 if (!obj_request_done_test(obj_request))
2329                         break;
2330                 more = rbd_img_obj_end_request(obj_request);
2331                 which++;
2332         }
2333
2334         rbd_assert(more ^ (which == img_request->obj_request_count));
2335         img_request->next_completion = which;
2336 out:
2337         spin_unlock_irq(&img_request->completion_lock);
2338         rbd_img_request_put(img_request);
2339
2340         if (!more)
2341                 rbd_img_request_complete(img_request);
2342 }
2343
2344 /*
2345  * Add individual osd ops to the given ceph_osd_request and prepare
2346  * them for submission. num_ops is the current number of
2347  * osd operations already to the object request.
2348  */
2349 static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2350                                 struct ceph_osd_request *osd_request,
2351                                 enum obj_operation_type op_type,
2352                                 unsigned int num_ops)
2353 {
2354         struct rbd_img_request *img_request = obj_request->img_request;
2355         struct rbd_device *rbd_dev = img_request->rbd_dev;
2356         u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2357         u64 offset = obj_request->offset;
2358         u64 length = obj_request->length;
2359         u64 img_end;
2360         u16 opcode;
2361
2362         if (op_type == OBJ_OP_DISCARD) {
2363                 if (!offset && length == object_size &&
2364                     (!img_request_layered_test(img_request) ||
2365                      !obj_request_overlaps_parent(obj_request))) {
2366                         opcode = CEPH_OSD_OP_DELETE;
2367                 } else if ((offset + length == object_size)) {
2368                         opcode = CEPH_OSD_OP_TRUNCATE;
2369                 } else {
2370                         down_read(&rbd_dev->header_rwsem);
2371                         img_end = rbd_dev->header.image_size;
2372                         up_read(&rbd_dev->header_rwsem);
2373
2374                         if (obj_request->img_offset + length == img_end)
2375                                 opcode = CEPH_OSD_OP_TRUNCATE;
2376                         else
2377                                 opcode = CEPH_OSD_OP_ZERO;
2378                 }
2379         } else if (op_type == OBJ_OP_WRITE) {
2380                 if (!offset && length == object_size)
2381                         opcode = CEPH_OSD_OP_WRITEFULL;
2382                 else
2383                         opcode = CEPH_OSD_OP_WRITE;
2384                 osd_req_op_alloc_hint_init(osd_request, num_ops,
2385                                         object_size, object_size);
2386                 num_ops++;
2387         } else {
2388                 opcode = CEPH_OSD_OP_READ;
2389         }
2390
2391         if (opcode == CEPH_OSD_OP_DELETE)
2392                 osd_req_op_init(osd_request, num_ops, opcode, 0);
2393         else
2394                 osd_req_op_extent_init(osd_request, num_ops, opcode,
2395                                        offset, length, 0, 0);
2396
2397         if (obj_request->type == OBJ_REQUEST_BIO)
2398                 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2399                                         obj_request->bio_list, length);
2400         else if (obj_request->type == OBJ_REQUEST_PAGES)
2401                 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2402                                         obj_request->pages, length,
2403                                         offset & ~PAGE_MASK, false, false);
2404
2405         /* Discards are also writes */
2406         if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2407                 rbd_osd_req_format_write(obj_request);
2408         else
2409                 rbd_osd_req_format_read(obj_request);
2410 }
2411
2412 /*
2413  * Split up an image request into one or more object requests, each
2414  * to a different object.  The "type" parameter indicates whether
2415  * "data_desc" is the pointer to the head of a list of bio
2416  * structures, or the base of a page array.  In either case this
2417  * function assumes data_desc describes memory sufficient to hold
2418  * all data described by the image request.
2419  */
2420 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2421                                         enum obj_request_type type,
2422                                         void *data_desc)
2423 {
2424         struct rbd_device *rbd_dev = img_request->rbd_dev;
2425         struct rbd_obj_request *obj_request = NULL;
2426         struct rbd_obj_request *next_obj_request;
2427         struct bio *bio_list = NULL;
2428         unsigned int bio_offset = 0;
2429         struct page **pages = NULL;
2430         enum obj_operation_type op_type;
2431         u64 img_offset;
2432         u64 resid;
2433
2434         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2435                 (int)type, data_desc);
2436
2437         img_offset = img_request->offset;
2438         resid = img_request->length;
2439         rbd_assert(resid > 0);
2440         op_type = rbd_img_request_op_type(img_request);
2441
2442         if (type == OBJ_REQUEST_BIO) {
2443                 bio_list = data_desc;
2444                 rbd_assert(img_offset ==
2445                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2446         } else if (type == OBJ_REQUEST_PAGES) {
2447                 pages = data_desc;
2448         }
2449
2450         while (resid) {
2451                 struct ceph_osd_request *osd_req;
2452                 u64 object_no = img_offset >> rbd_dev->header.obj_order;
2453                 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2454                 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
2455
2456                 obj_request = rbd_obj_request_create(type);
2457                 if (!obj_request)
2458                         goto out_unwind;
2459
2460                 obj_request->object_no = object_no;
2461                 obj_request->offset = offset;
2462                 obj_request->length = length;
2463
2464                 /*
2465                  * set obj_request->img_request before creating the
2466                  * osd_request so that it gets the right snapc
2467                  */
2468                 rbd_img_obj_request_add(img_request, obj_request);
2469
2470                 if (type == OBJ_REQUEST_BIO) {
2471                         unsigned int clone_size;
2472
2473                         rbd_assert(length <= (u64)UINT_MAX);
2474                         clone_size = (unsigned int)length;
2475                         obj_request->bio_list =
2476                                         bio_chain_clone_range(&bio_list,
2477                                                                 &bio_offset,
2478                                                                 clone_size,
2479                                                                 GFP_NOIO);
2480                         if (!obj_request->bio_list)
2481                                 goto out_unwind;
2482                 } else if (type == OBJ_REQUEST_PAGES) {
2483                         unsigned int page_count;
2484
2485                         obj_request->pages = pages;
2486                         page_count = (u32)calc_pages_for(offset, length);
2487                         obj_request->page_count = page_count;
2488                         if ((offset + length) & ~PAGE_MASK)
2489                                 page_count--;   /* more on last page */
2490                         pages += page_count;
2491                 }
2492
2493                 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2494                                         (op_type == OBJ_OP_WRITE) ? 2 : 1,
2495                                         obj_request);
2496                 if (!osd_req)
2497                         goto out_unwind;
2498
2499                 obj_request->osd_req = osd_req;
2500                 obj_request->callback = rbd_img_obj_callback;
2501                 obj_request->img_offset = img_offset;
2502
2503                 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2504
2505                 img_offset += length;
2506                 resid -= length;
2507         }
2508
2509         return 0;
2510
2511 out_unwind:
2512         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2513                 rbd_img_obj_request_del(img_request, obj_request);
2514
2515         return -ENOMEM;
2516 }
2517
2518 static void
2519 rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
2520 {
2521         struct rbd_img_request *img_request;
2522         struct rbd_device *rbd_dev;
2523         struct page **pages;
2524         u32 page_count;
2525
2526         dout("%s: obj %p\n", __func__, obj_request);
2527
2528         rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2529                 obj_request->type == OBJ_REQUEST_NODATA);
2530         rbd_assert(obj_request_img_data_test(obj_request));
2531         img_request = obj_request->img_request;
2532         rbd_assert(img_request);
2533
2534         rbd_dev = img_request->rbd_dev;
2535         rbd_assert(rbd_dev);
2536
2537         pages = obj_request->copyup_pages;
2538         rbd_assert(pages != NULL);
2539         obj_request->copyup_pages = NULL;
2540         page_count = obj_request->copyup_page_count;
2541         rbd_assert(page_count);
2542         obj_request->copyup_page_count = 0;
2543         ceph_release_page_vector(pages, page_count);
2544
2545         /*
2546          * We want the transfer count to reflect the size of the
2547          * original write request.  There is no such thing as a
2548          * successful short write, so if the request was successful
2549          * we can just set it to the originally-requested length.
2550          */
2551         if (!obj_request->result)
2552                 obj_request->xferred = obj_request->length;
2553
2554         obj_request_done_set(obj_request);
2555 }
2556
2557 static void
2558 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2559 {
2560         struct rbd_obj_request *orig_request;
2561         struct ceph_osd_request *osd_req;
2562         struct rbd_device *rbd_dev;
2563         struct page **pages;
2564         enum obj_operation_type op_type;
2565         u32 page_count;
2566         int img_result;
2567         u64 parent_length;
2568
2569         rbd_assert(img_request_child_test(img_request));
2570
2571         /* First get what we need from the image request */
2572
2573         pages = img_request->copyup_pages;
2574         rbd_assert(pages != NULL);
2575         img_request->copyup_pages = NULL;
2576         page_count = img_request->copyup_page_count;
2577         rbd_assert(page_count);
2578         img_request->copyup_page_count = 0;
2579
2580         orig_request = img_request->obj_request;
2581         rbd_assert(orig_request != NULL);
2582         rbd_assert(obj_request_type_valid(orig_request->type));
2583         img_result = img_request->result;
2584         parent_length = img_request->length;
2585         rbd_assert(img_result || parent_length == img_request->xferred);
2586         rbd_img_request_put(img_request);
2587
2588         rbd_assert(orig_request->img_request);
2589         rbd_dev = orig_request->img_request->rbd_dev;
2590         rbd_assert(rbd_dev);
2591
2592         /*
2593          * If the overlap has become 0 (most likely because the
2594          * image has been flattened) we need to free the pages
2595          * and re-submit the original write request.
2596          */
2597         if (!rbd_dev->parent_overlap) {
2598                 ceph_release_page_vector(pages, page_count);
2599                 rbd_obj_request_submit(orig_request);
2600                 return;
2601         }
2602
2603         if (img_result)
2604                 goto out_err;
2605
2606         /*
2607          * The original osd request is of no use to use any more.
2608          * We need a new one that can hold the three ops in a copyup
2609          * request.  Allocate the new copyup osd request for the
2610          * original request, and release the old one.
2611          */
2612         img_result = -ENOMEM;
2613         osd_req = rbd_osd_req_create_copyup(orig_request);
2614         if (!osd_req)
2615                 goto out_err;
2616         rbd_osd_req_destroy(orig_request->osd_req);
2617         orig_request->osd_req = osd_req;
2618         orig_request->copyup_pages = pages;
2619         orig_request->copyup_page_count = page_count;
2620
2621         /* Initialize the copyup op */
2622
2623         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2624         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2625                                                 false, false);
2626
2627         /* Add the other op(s) */
2628
2629         op_type = rbd_img_request_op_type(orig_request->img_request);
2630         rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2631
2632         /* All set, send it off. */
2633
2634         rbd_obj_request_submit(orig_request);
2635         return;
2636
2637 out_err:
2638         ceph_release_page_vector(pages, page_count);
2639         rbd_obj_request_error(orig_request, img_result);
2640 }
2641
2642 /*
2643  * Read from the parent image the range of data that covers the
2644  * entire target of the given object request.  This is used for
2645  * satisfying a layered image write request when the target of an
2646  * object request from the image request does not exist.
2647  *
2648  * A page array big enough to hold the returned data is allocated
2649  * and supplied to rbd_img_request_fill() as the "data descriptor."
2650  * When the read completes, this page array will be transferred to
2651  * the original object request for the copyup operation.
2652  *
2653  * If an error occurs, it is recorded as the result of the original
2654  * object request in rbd_img_obj_exists_callback().
2655  */
2656 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2657 {
2658         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2659         struct rbd_img_request *parent_request = NULL;
2660         u64 img_offset;
2661         u64 length;
2662         struct page **pages = NULL;
2663         u32 page_count;
2664         int result;
2665
2666         rbd_assert(rbd_dev->parent != NULL);
2667
2668         /*
2669          * Determine the byte range covered by the object in the
2670          * child image to which the original request was to be sent.
2671          */
2672         img_offset = obj_request->img_offset - obj_request->offset;
2673         length = rbd_obj_bytes(&rbd_dev->header);
2674
2675         /*
2676          * There is no defined parent data beyond the parent
2677          * overlap, so limit what we read at that boundary if
2678          * necessary.
2679          */
2680         if (img_offset + length > rbd_dev->parent_overlap) {
2681                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2682                 length = rbd_dev->parent_overlap - img_offset;
2683         }
2684
2685         /*
2686          * Allocate a page array big enough to receive the data read
2687          * from the parent.
2688          */
2689         page_count = (u32)calc_pages_for(0, length);
2690         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2691         if (IS_ERR(pages)) {
2692                 result = PTR_ERR(pages);
2693                 pages = NULL;
2694                 goto out_err;
2695         }
2696
2697         result = -ENOMEM;
2698         parent_request = rbd_parent_request_create(obj_request,
2699                                                 img_offset, length);
2700         if (!parent_request)
2701                 goto out_err;
2702
2703         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2704         if (result)
2705                 goto out_err;
2706
2707         parent_request->copyup_pages = pages;
2708         parent_request->copyup_page_count = page_count;
2709         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2710
2711         result = rbd_img_request_submit(parent_request);
2712         if (!result)
2713                 return 0;
2714
2715         parent_request->copyup_pages = NULL;
2716         parent_request->copyup_page_count = 0;
2717         parent_request->obj_request = NULL;
2718         rbd_obj_request_put(obj_request);
2719 out_err:
2720         if (pages)
2721                 ceph_release_page_vector(pages, page_count);
2722         if (parent_request)
2723                 rbd_img_request_put(parent_request);
2724         return result;
2725 }
2726
2727 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2728 {
2729         struct rbd_obj_request *orig_request;
2730         struct rbd_device *rbd_dev;
2731         int result;
2732
2733         rbd_assert(!obj_request_img_data_test(obj_request));
2734
2735         /*
2736          * All we need from the object request is the original
2737          * request and the result of the STAT op.  Grab those, then
2738          * we're done with the request.
2739          */
2740         orig_request = obj_request->obj_request;
2741         obj_request->obj_request = NULL;
2742         rbd_obj_request_put(orig_request);
2743         rbd_assert(orig_request);
2744         rbd_assert(orig_request->img_request);
2745
2746         result = obj_request->result;
2747         obj_request->result = 0;
2748
2749         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2750                 obj_request, orig_request, result,
2751                 obj_request->xferred, obj_request->length);
2752         rbd_obj_request_put(obj_request);
2753
2754         /*
2755          * If the overlap has become 0 (most likely because the
2756          * image has been flattened) we need to re-submit the
2757          * original request.
2758          */
2759         rbd_dev = orig_request->img_request->rbd_dev;
2760         if (!rbd_dev->parent_overlap) {
2761                 rbd_obj_request_submit(orig_request);
2762                 return;
2763         }
2764
2765         /*
2766          * Our only purpose here is to determine whether the object
2767          * exists, and we don't want to treat the non-existence as
2768          * an error.  If something else comes back, transfer the
2769          * error to the original request and complete it now.
2770          */
2771         if (!result) {
2772                 obj_request_existence_set(orig_request, true);
2773         } else if (result == -ENOENT) {
2774                 obj_request_existence_set(orig_request, false);
2775         } else {
2776                 goto fail_orig_request;
2777         }
2778
2779         /*
2780          * Resubmit the original request now that we have recorded
2781          * whether the target object exists.
2782          */
2783         result = rbd_img_obj_request_submit(orig_request);
2784         if (result)
2785                 goto fail_orig_request;
2786
2787         return;
2788
2789 fail_orig_request:
2790         rbd_obj_request_error(orig_request, result);
2791 }
2792
2793 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2794 {
2795         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
2796         struct rbd_obj_request *stat_request;
2797         struct page **pages;
2798         u32 page_count;
2799         size_t size;
2800         int ret;
2801
2802         stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
2803         if (!stat_request)
2804                 return -ENOMEM;
2805
2806         stat_request->object_no = obj_request->object_no;
2807
2808         stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2809                                                    stat_request);
2810         if (!stat_request->osd_req) {
2811                 ret = -ENOMEM;
2812                 goto fail_stat_request;
2813         }
2814
2815         /*
2816          * The response data for a STAT call consists of:
2817          *     le64 length;
2818          *     struct {
2819          *         le32 tv_sec;
2820          *         le32 tv_nsec;
2821          *     } mtime;
2822          */
2823         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2824         page_count = (u32)calc_pages_for(0, size);
2825         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2826         if (IS_ERR(pages)) {
2827                 ret = PTR_ERR(pages);
2828                 goto fail_stat_request;
2829         }
2830
2831         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2832         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2833                                      false, false);
2834
2835         rbd_obj_request_get(obj_request);
2836         stat_request->obj_request = obj_request;
2837         stat_request->pages = pages;
2838         stat_request->page_count = page_count;
2839         stat_request->callback = rbd_img_obj_exists_callback;
2840
2841         rbd_obj_request_submit(stat_request);
2842         return 0;
2843
2844 fail_stat_request:
2845         rbd_obj_request_put(stat_request);
2846         return ret;
2847 }
2848
2849 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2850 {
2851         struct rbd_img_request *img_request = obj_request->img_request;
2852         struct rbd_device *rbd_dev = img_request->rbd_dev;
2853
2854         /* Reads */
2855         if (!img_request_write_test(img_request) &&
2856             !img_request_discard_test(img_request))
2857                 return true;
2858
2859         /* Non-layered writes */
2860         if (!img_request_layered_test(img_request))
2861                 return true;
2862
2863         /*
2864          * Layered writes outside of the parent overlap range don't
2865          * share any data with the parent.
2866          */
2867         if (!obj_request_overlaps_parent(obj_request))
2868                 return true;
2869
2870         /*
2871          * Entire-object layered writes - we will overwrite whatever
2872          * parent data there is anyway.
2873          */
2874         if (!obj_request->offset &&
2875             obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2876                 return true;
2877
2878         /*
2879          * If the object is known to already exist, its parent data has
2880          * already been copied.
2881          */
2882         if (obj_request_known_test(obj_request) &&
2883             obj_request_exists_test(obj_request))
2884                 return true;
2885
2886         return false;
2887 }
2888
2889 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2890 {
2891         rbd_assert(obj_request_img_data_test(obj_request));
2892         rbd_assert(obj_request_type_valid(obj_request->type));
2893         rbd_assert(obj_request->img_request);
2894
2895         if (img_obj_request_simple(obj_request)) {
2896                 rbd_obj_request_submit(obj_request);
2897                 return 0;
2898         }
2899
2900         /*
2901          * It's a layered write.  The target object might exist but
2902          * we may not know that yet.  If we know it doesn't exist,
2903          * start by reading the data for the full target object from
2904          * the parent so we can use it for a copyup to the target.
2905          */
2906         if (obj_request_known_test(obj_request))
2907                 return rbd_img_obj_parent_read_full(obj_request);
2908
2909         /* We don't know whether the target exists.  Go find out. */
2910
2911         return rbd_img_obj_exists_submit(obj_request);
2912 }
2913
2914 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2915 {
2916         struct rbd_obj_request *obj_request;
2917         struct rbd_obj_request *next_obj_request;
2918         int ret = 0;
2919
2920         dout("%s: img %p\n", __func__, img_request);
2921
2922         rbd_img_request_get(img_request);
2923         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2924                 ret = rbd_img_obj_request_submit(obj_request);
2925                 if (ret)
2926                         goto out_put_ireq;
2927         }
2928
2929 out_put_ireq:
2930         rbd_img_request_put(img_request);
2931         return ret;
2932 }
2933
2934 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2935 {
2936         struct rbd_obj_request *obj_request;
2937         struct rbd_device *rbd_dev;
2938         u64 obj_end;
2939         u64 img_xferred;
2940         int img_result;
2941
2942         rbd_assert(img_request_child_test(img_request));
2943
2944         /* First get what we need from the image request and release it */
2945
2946         obj_request = img_request->obj_request;
2947         img_xferred = img_request->xferred;
2948         img_result = img_request->result;
2949         rbd_img_request_put(img_request);
2950
2951         /*
2952          * If the overlap has become 0 (most likely because the
2953          * image has been flattened) we need to re-submit the
2954          * original request.
2955          */
2956         rbd_assert(obj_request);
2957         rbd_assert(obj_request->img_request);
2958         rbd_dev = obj_request->img_request->rbd_dev;
2959         if (!rbd_dev->parent_overlap) {
2960                 rbd_obj_request_submit(obj_request);
2961                 return;
2962         }
2963
2964         obj_request->result = img_result;
2965         if (obj_request->result)
2966                 goto out;
2967
2968         /*
2969          * We need to zero anything beyond the parent overlap
2970          * boundary.  Since rbd_img_obj_request_read_callback()
2971          * will zero anything beyond the end of a short read, an
2972          * easy way to do this is to pretend the data from the
2973          * parent came up short--ending at the overlap boundary.
2974          */
2975         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2976         obj_end = obj_request->img_offset + obj_request->length;
2977         if (obj_end > rbd_dev->parent_overlap) {
2978                 u64 xferred = 0;
2979
2980                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2981                         xferred = rbd_dev->parent_overlap -
2982                                         obj_request->img_offset;
2983
2984                 obj_request->xferred = min(img_xferred, xferred);
2985         } else {
2986                 obj_request->xferred = img_xferred;
2987         }
2988 out:
2989         rbd_img_obj_request_read_callback(obj_request);
2990         rbd_obj_request_complete(obj_request);
2991 }
2992
2993 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2994 {
2995         struct rbd_img_request *img_request;
2996         int result;
2997
2998         rbd_assert(obj_request_img_data_test(obj_request));
2999         rbd_assert(obj_request->img_request != NULL);
3000         rbd_assert(obj_request->result == (s32) -ENOENT);
3001         rbd_assert(obj_request_type_valid(obj_request->type));
3002
3003         /* rbd_read_finish(obj_request, obj_request->length); */
3004         img_request = rbd_parent_request_create(obj_request,
3005                                                 obj_request->img_offset,
3006                                                 obj_request->length);
3007         result = -ENOMEM;
3008         if (!img_request)
3009                 goto out_err;
3010
3011         if (obj_request->type == OBJ_REQUEST_BIO)
3012                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3013                                                 obj_request->bio_list);
3014         else
3015                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3016                                                 obj_request->pages);
3017         if (result)
3018                 goto out_err;
3019
3020         img_request->callback = rbd_img_parent_read_callback;
3021         result = rbd_img_request_submit(img_request);
3022         if (result)
3023                 goto out_err;
3024
3025         return;
3026 out_err:
3027         if (img_request)
3028                 rbd_img_request_put(img_request);
3029         obj_request->result = result;
3030         obj_request->xferred = 0;
3031         obj_request_done_set(obj_request);
3032 }
3033
3034 static const struct rbd_client_id rbd_empty_cid;
3035
3036 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3037                           const struct rbd_client_id *rhs)
3038 {
3039         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3040 }
3041
3042 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3043 {
3044         struct rbd_client_id cid;
3045
3046         mutex_lock(&rbd_dev->watch_mutex);
3047         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3048         cid.handle = rbd_dev->watch_cookie;
3049         mutex_unlock(&rbd_dev->watch_mutex);
3050         return cid;
3051 }
3052
3053 /*
3054  * lock_rwsem must be held for write
3055  */
3056 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3057                               const struct rbd_client_id *cid)
3058 {
3059         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3060              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3061              cid->gid, cid->handle);
3062         rbd_dev->owner_cid = *cid; /* struct */
3063 }
3064
3065 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3066 {
3067         mutex_lock(&rbd_dev->watch_mutex);
3068         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3069         mutex_unlock(&rbd_dev->watch_mutex);
3070 }
3071
3072 /*
3073  * lock_rwsem must be held for write
3074  */
3075 static int rbd_lock(struct rbd_device *rbd_dev)
3076 {
3077         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3078         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3079         char cookie[32];
3080         int ret;
3081
3082         WARN_ON(__rbd_is_lock_owner(rbd_dev));
3083
3084         format_lock_cookie(rbd_dev, cookie);
3085         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3086                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3087                             RBD_LOCK_TAG, "", 0);
3088         if (ret)
3089                 return ret;
3090
3091         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3092         rbd_set_owner_cid(rbd_dev, &cid);
3093         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3094         return 0;
3095 }
3096
3097 /*
3098  * lock_rwsem must be held for write
3099  */
3100 static int rbd_unlock(struct rbd_device *rbd_dev)
3101 {
3102         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3103         char cookie[32];
3104         int ret;
3105
3106         WARN_ON(!__rbd_is_lock_owner(rbd_dev));
3107
3108         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3109
3110         format_lock_cookie(rbd_dev, cookie);
3111         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3112                               RBD_LOCK_NAME, cookie);
3113         if (ret && ret != -ENOENT) {
3114                 rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
3115                 return ret;
3116         }
3117
3118         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3119         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3120         return 0;
3121 }
3122
3123 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3124                                 enum rbd_notify_op notify_op,
3125                                 struct page ***preply_pages,
3126                                 size_t *preply_len)
3127 {
3128         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3129         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3130         int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3131         char buf[buf_size];
3132         void *p = buf;
3133
3134         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3135
3136         /* encode *LockPayload NotifyMessage (op + ClientId) */
3137         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3138         ceph_encode_32(&p, notify_op);
3139         ceph_encode_64(&p, cid.gid);
3140         ceph_encode_64(&p, cid.handle);
3141
3142         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3143                                 &rbd_dev->header_oloc, buf, buf_size,
3144                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3145 }
3146
3147 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3148                                enum rbd_notify_op notify_op)
3149 {
3150         struct page **reply_pages;
3151         size_t reply_len;
3152
3153         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3154         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3155 }
3156
3157 static void rbd_notify_acquired_lock(struct work_struct *work)
3158 {
3159         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3160                                                   acquired_lock_work);
3161
3162         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3163 }
3164
3165 static void rbd_notify_released_lock(struct work_struct *work)
3166 {
3167         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3168                                                   released_lock_work);
3169
3170         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3171 }
3172
3173 static int rbd_request_lock(struct rbd_device *rbd_dev)
3174 {
3175         struct page **reply_pages;
3176         size_t reply_len;
3177         bool lock_owner_responded = false;
3178         int ret;
3179
3180         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3181
3182         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3183                                    &reply_pages, &reply_len);
3184         if (ret && ret != -ETIMEDOUT) {
3185                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3186                 goto out;
3187         }
3188
3189         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3190                 void *p = page_address(reply_pages[0]);
3191                 void *const end = p + reply_len;
3192                 u32 n;
3193
3194                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3195                 while (n--) {
3196                         u8 struct_v;
3197                         u32 len;
3198
3199                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3200                         p += 8 + 8; /* skip gid and cookie */
3201
3202                         ceph_decode_32_safe(&p, end, len, e_inval);
3203                         if (!len)
3204                                 continue;
3205
3206                         if (lock_owner_responded) {
3207                                 rbd_warn(rbd_dev,
3208                                          "duplicate lock owners detected");
3209                                 ret = -EIO;
3210                                 goto out;
3211                         }
3212
3213                         lock_owner_responded = true;
3214                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3215                                                   &struct_v, &len);
3216                         if (ret) {
3217                                 rbd_warn(rbd_dev,
3218                                          "failed to decode ResponseMessage: %d",
3219                                          ret);
3220                                 goto e_inval;
3221                         }
3222
3223                         ret = ceph_decode_32(&p);
3224                 }
3225         }
3226
3227         if (!lock_owner_responded) {
3228                 rbd_warn(rbd_dev, "no lock owners detected");
3229                 ret = -ETIMEDOUT;
3230         }
3231
3232 out:
3233         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3234         return ret;
3235
3236 e_inval:
3237         ret = -EINVAL;
3238         goto out;
3239 }
3240
3241 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3242 {
3243         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3244
3245         cancel_delayed_work(&rbd_dev->lock_dwork);
3246         if (wake_all)
3247                 wake_up_all(&rbd_dev->lock_waitq);
3248         else
3249                 wake_up(&rbd_dev->lock_waitq);
3250 }
3251
3252 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3253                                struct ceph_locker **lockers, u32 *num_lockers)
3254 {
3255         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3256         u8 lock_type;
3257         char *lock_tag;
3258         int ret;
3259
3260         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3261
3262         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3263                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3264                                  &lock_type, &lock_tag, lockers, num_lockers);
3265         if (ret)
3266                 return ret;
3267
3268         if (*num_lockers == 0) {
3269                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3270                 goto out;
3271         }
3272
3273         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3274                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3275                          lock_tag);
3276                 ret = -EBUSY;
3277                 goto out;
3278         }
3279
3280         if (lock_type == CEPH_CLS_LOCK_SHARED) {
3281                 rbd_warn(rbd_dev, "shared lock type detected");
3282                 ret = -EBUSY;
3283                 goto out;
3284         }
3285
3286         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3287                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
3288                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3289                          (*lockers)[0].id.cookie);
3290                 ret = -EBUSY;
3291                 goto out;
3292         }
3293
3294 out:
3295         kfree(lock_tag);
3296         return ret;
3297 }
3298
3299 static int find_watcher(struct rbd_device *rbd_dev,
3300                         const struct ceph_locker *locker)
3301 {
3302         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3303         struct ceph_watch_item *watchers;
3304         u32 num_watchers;
3305         u64 cookie;
3306         int i;
3307         int ret;
3308
3309         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3310                                       &rbd_dev->header_oloc, &watchers,
3311                                       &num_watchers);
3312         if (ret)
3313                 return ret;
3314
3315         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3316         for (i = 0; i < num_watchers; i++) {
3317                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3318                             sizeof(locker->info.addr)) &&
3319                     watchers[i].cookie == cookie) {
3320                         struct rbd_client_id cid = {
3321                                 .gid = le64_to_cpu(watchers[i].name.num),
3322                                 .handle = cookie,
3323                         };
3324
3325                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3326                              rbd_dev, cid.gid, cid.handle);
3327                         rbd_set_owner_cid(rbd_dev, &cid);
3328                         ret = 1;
3329                         goto out;
3330                 }
3331         }
3332
3333         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3334         ret = 0;
3335 out:
3336         kfree(watchers);
3337         return ret;
3338 }
3339
3340 /*
3341  * lock_rwsem must be held for write
3342  */
3343 static int rbd_try_lock(struct rbd_device *rbd_dev)
3344 {
3345         struct ceph_client *client = rbd_dev->rbd_client->client;
3346         struct ceph_locker *lockers;
3347         u32 num_lockers;
3348         int ret;
3349
3350         for (;;) {
3351                 ret = rbd_lock(rbd_dev);
3352                 if (ret != -EBUSY)
3353                         return ret;
3354
3355                 /* determine if the current lock holder is still alive */
3356                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3357                 if (ret)
3358                         return ret;
3359
3360                 if (num_lockers == 0)
3361                         goto again;
3362
3363                 ret = find_watcher(rbd_dev, lockers);
3364                 if (ret) {
3365                         if (ret > 0)
3366                                 ret = 0; /* have to request lock */
3367                         goto out;
3368                 }
3369
3370                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3371                          ENTITY_NAME(lockers[0].id.name));
3372
3373                 ret = ceph_monc_blacklist_add(&client->monc,
3374                                               &lockers[0].info.addr);
3375                 if (ret) {
3376                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3377                                  ENTITY_NAME(lockers[0].id.name), ret);
3378                         goto out;
3379                 }
3380
3381                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3382                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
3383                                           lockers[0].id.cookie,
3384                                           &lockers[0].id.name);
3385                 if (ret && ret != -ENOENT)
3386                         goto out;
3387
3388 again:
3389                 ceph_free_lockers(lockers, num_lockers);
3390         }
3391
3392 out:
3393         ceph_free_lockers(lockers, num_lockers);
3394         return ret;
3395 }
3396
3397 /*
3398  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3399  */
3400 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3401                                                 int *pret)
3402 {
3403         enum rbd_lock_state lock_state;
3404
3405         down_read(&rbd_dev->lock_rwsem);
3406         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3407              rbd_dev->lock_state);
3408         if (__rbd_is_lock_owner(rbd_dev)) {
3409                 lock_state = rbd_dev->lock_state;
3410                 up_read(&rbd_dev->lock_rwsem);
3411                 return lock_state;
3412         }
3413
3414         up_read(&rbd_dev->lock_rwsem);
3415         down_write(&rbd_dev->lock_rwsem);
3416         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3417              rbd_dev->lock_state);
3418         if (!__rbd_is_lock_owner(rbd_dev)) {
3419                 *pret = rbd_try_lock(rbd_dev);
3420                 if (*pret)
3421                         rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3422         }
3423
3424         lock_state = rbd_dev->lock_state;
3425         up_write(&rbd_dev->lock_rwsem);
3426         return lock_state;
3427 }
3428
3429 static void rbd_acquire_lock(struct work_struct *work)
3430 {
3431         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3432                                             struct rbd_device, lock_dwork);
3433         enum rbd_lock_state lock_state;
3434         int ret;
3435
3436         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3437 again:
3438         lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3439         if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3440                 if (lock_state == RBD_LOCK_STATE_LOCKED)
3441                         wake_requests(rbd_dev, true);
3442                 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3443                      rbd_dev, lock_state, ret);
3444                 return;
3445         }
3446
3447         ret = rbd_request_lock(rbd_dev);
3448         if (ret == -ETIMEDOUT) {
3449                 goto again; /* treat this as a dead client */
3450         } else if (ret < 0) {
3451                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3452                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3453                                  RBD_RETRY_DELAY);
3454         } else {
3455                 /*
3456                  * lock owner acked, but resend if we don't see them
3457                  * release the lock
3458                  */
3459                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3460                      rbd_dev);
3461                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3462                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3463         }
3464 }
3465
3466 /*
3467  * lock_rwsem must be held for write
3468  */
3469 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3470 {
3471         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3472              rbd_dev->lock_state);
3473         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3474                 return false;
3475
3476         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3477         downgrade_write(&rbd_dev->lock_rwsem);
3478         /*
3479          * Ensure that all in-flight IO is flushed.
3480          *
3481          * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3482          * may be shared with other devices.
3483          */
3484         ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3485         up_read(&rbd_dev->lock_rwsem);
3486
3487         down_write(&rbd_dev->lock_rwsem);
3488         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3489              rbd_dev->lock_state);
3490         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3491                 return false;
3492
3493         if (!rbd_unlock(rbd_dev))
3494                 /*
3495                  * Give others a chance to grab the lock - we would re-acquire
3496                  * almost immediately if we got new IO during ceph_osdc_sync()
3497                  * otherwise.  We need to ack our own notifications, so this
3498                  * lock_dwork will be requeued from rbd_wait_state_locked()
3499                  * after wake_requests() in rbd_handle_released_lock().
3500                  */
3501                 cancel_delayed_work(&rbd_dev->lock_dwork);
3502
3503         return true;
3504 }
3505
3506 static void rbd_release_lock_work(struct work_struct *work)
3507 {
3508         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3509                                                   unlock_work);
3510
3511         down_write(&rbd_dev->lock_rwsem);
3512         rbd_release_lock(rbd_dev);
3513         up_write(&rbd_dev->lock_rwsem);
3514 }
3515
3516 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3517                                      void **p)
3518 {
3519         struct rbd_client_id cid = { 0 };
3520
3521         if (struct_v >= 2) {
3522                 cid.gid = ceph_decode_64(p);
3523                 cid.handle = ceph_decode_64(p);
3524         }
3525
3526         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3527              cid.handle);
3528         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3529                 down_write(&rbd_dev->lock_rwsem);
3530                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3531                         /*
3532                          * we already know that the remote client is
3533                          * the owner
3534                          */
3535                         up_write(&rbd_dev->lock_rwsem);
3536                         return;
3537                 }
3538
3539                 rbd_set_owner_cid(rbd_dev, &cid);
3540                 downgrade_write(&rbd_dev->lock_rwsem);
3541         } else {
3542                 down_read(&rbd_dev->lock_rwsem);
3543         }
3544
3545         if (!__rbd_is_lock_owner(rbd_dev))
3546                 wake_requests(rbd_dev, false);
3547         up_read(&rbd_dev->lock_rwsem);
3548 }
3549
3550 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3551                                      void **p)
3552 {
3553         struct rbd_client_id cid = { 0 };
3554
3555         if (struct_v >= 2) {
3556                 cid.gid = ceph_decode_64(p);
3557                 cid.handle = ceph_decode_64(p);
3558         }
3559
3560         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3561              cid.handle);
3562         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3563                 down_write(&rbd_dev->lock_rwsem);
3564                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3565                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3566                              __func__, rbd_dev, cid.gid, cid.handle,
3567                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3568                         up_write(&rbd_dev->lock_rwsem);
3569                         return;
3570                 }
3571
3572                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3573                 downgrade_write(&rbd_dev->lock_rwsem);
3574         } else {
3575                 down_read(&rbd_dev->lock_rwsem);
3576         }
3577
3578         if (!__rbd_is_lock_owner(rbd_dev))
3579                 wake_requests(rbd_dev, false);
3580         up_read(&rbd_dev->lock_rwsem);
3581 }
3582
3583 static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3584                                     void **p)
3585 {
3586         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3587         struct rbd_client_id cid = { 0 };
3588         bool need_to_send;
3589
3590         if (struct_v >= 2) {
3591                 cid.gid = ceph_decode_64(p);
3592                 cid.handle = ceph_decode_64(p);
3593         }
3594
3595         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3596              cid.handle);
3597         if (rbd_cid_equal(&cid, &my_cid))
3598                 return false;
3599
3600         down_read(&rbd_dev->lock_rwsem);
3601         need_to_send = __rbd_is_lock_owner(rbd_dev);
3602         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3603                 if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
3604                         dout("%s rbd_dev %p queueing unlock_work\n", __func__,
3605                              rbd_dev);
3606                         queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
3607                 }
3608         }
3609         up_read(&rbd_dev->lock_rwsem);
3610         return need_to_send;
3611 }
3612
3613 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3614                                      u64 notify_id, u64 cookie, s32 *result)
3615 {
3616         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3617         int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3618         char buf[buf_size];
3619         int ret;
3620
3621         if (result) {
3622                 void *p = buf;
3623
3624                 /* encode ResponseMessage */
3625                 ceph_start_encoding(&p, 1, 1,
3626                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
3627                 ceph_encode_32(&p, *result);
3628         } else {
3629                 buf_size = 0;
3630         }
3631
3632         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3633                                    &rbd_dev->header_oloc, notify_id, cookie,
3634                                    buf, buf_size);
3635         if (ret)
3636                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3637 }
3638
3639 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3640                                    u64 cookie)
3641 {
3642         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3643         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3644 }
3645
3646 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3647                                           u64 notify_id, u64 cookie, s32 result)
3648 {
3649         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3650         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3651 }
3652
3653 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3654                          u64 notifier_id, void *data, size_t data_len)
3655 {
3656         struct rbd_device *rbd_dev = arg;
3657         void *p = data;
3658         void *const end = p + data_len;
3659         u8 struct_v = 0;
3660         u32 len;
3661         u32 notify_op;
3662         int ret;
3663
3664         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3665              __func__, rbd_dev, cookie, notify_id, data_len);
3666         if (data_len) {
3667                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3668                                           &struct_v, &len);
3669                 if (ret) {
3670                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3671                                  ret);
3672                         return;
3673                 }
3674
3675                 notify_op = ceph_decode_32(&p);
3676         } else {
3677                 /* legacy notification for header updates */
3678                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3679                 len = 0;
3680         }
3681
3682         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3683         switch (notify_op) {
3684         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3685                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3686                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3687                 break;
3688         case RBD_NOTIFY_OP_RELEASED_LOCK:
3689                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3690                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3691                 break;
3692         case RBD_NOTIFY_OP_REQUEST_LOCK:
3693                 if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
3694                         /*
3695                          * send ResponseMessage(0) back so the client
3696                          * can detect a missing owner
3697                          */
3698                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3699                                                       cookie, 0);
3700                 else
3701                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3702                 break;
3703         case RBD_NOTIFY_OP_HEADER_UPDATE:
3704                 ret = rbd_dev_refresh(rbd_dev);
3705                 if (ret)
3706                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
3707
3708                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3709                 break;
3710         default:
3711                 if (rbd_is_lock_owner(rbd_dev))
3712                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3713                                                       cookie, -EOPNOTSUPP);
3714                 else
3715                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3716                 break;
3717         }
3718 }
3719
3720 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3721
3722 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3723 {
3724         struct rbd_device *rbd_dev = arg;
3725
3726         rbd_warn(rbd_dev, "encountered watch error: %d", err);
3727
3728         down_write(&rbd_dev->lock_rwsem);
3729         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3730         up_write(&rbd_dev->lock_rwsem);
3731
3732         mutex_lock(&rbd_dev->watch_mutex);
3733         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3734                 __rbd_unregister_watch(rbd_dev);
3735                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3736
3737                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3738         }
3739         mutex_unlock(&rbd_dev->watch_mutex);
3740 }
3741
3742 /*
3743  * watch_mutex must be locked
3744  */
3745 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3746 {
3747         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3748         struct ceph_osd_linger_request *handle;
3749
3750         rbd_assert(!rbd_dev->watch_handle);
3751         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3752
3753         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3754                                  &rbd_dev->header_oloc, rbd_watch_cb,
3755                                  rbd_watch_errcb, rbd_dev);
3756         if (IS_ERR(handle))
3757                 return PTR_ERR(handle);
3758
3759         rbd_dev->watch_handle = handle;
3760         return 0;
3761 }
3762
3763 /*
3764  * watch_mutex must be locked
3765  */
3766 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3767 {
3768         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3769         int ret;
3770
3771         rbd_assert(rbd_dev->watch_handle);
3772         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3773
3774         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3775         if (ret)
3776                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3777
3778         rbd_dev->watch_handle = NULL;
3779 }
3780
3781 static int rbd_register_watch(struct rbd_device *rbd_dev)
3782 {
3783         int ret;
3784
3785         mutex_lock(&rbd_dev->watch_mutex);
3786         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3787         ret = __rbd_register_watch(rbd_dev);
3788         if (ret)
3789                 goto out;
3790
3791         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3792         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3793
3794 out:
3795         mutex_unlock(&rbd_dev->watch_mutex);
3796         return ret;
3797 }
3798
3799 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3800 {
3801         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3802
3803         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3804         cancel_work_sync(&rbd_dev->acquired_lock_work);
3805         cancel_work_sync(&rbd_dev->released_lock_work);
3806         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3807         cancel_work_sync(&rbd_dev->unlock_work);
3808 }
3809
3810 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3811 {
3812         WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3813         cancel_tasks_sync(rbd_dev);
3814
3815         mutex_lock(&rbd_dev->watch_mutex);
3816         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3817                 __rbd_unregister_watch(rbd_dev);
3818         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3819         mutex_unlock(&rbd_dev->watch_mutex);
3820
3821         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3822 }
3823
3824 static void rbd_reregister_watch(struct work_struct *work)
3825 {
3826         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3827                                             struct rbd_device, watch_dwork);
3828         bool was_lock_owner = false;
3829         bool need_to_wake = false;
3830         int ret;
3831
3832         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3833
3834         down_write(&rbd_dev->lock_rwsem);
3835         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3836                 was_lock_owner = rbd_release_lock(rbd_dev);
3837
3838         mutex_lock(&rbd_dev->watch_mutex);
3839         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3840                 mutex_unlock(&rbd_dev->watch_mutex);
3841                 goto out;
3842         }
3843
3844         ret = __rbd_register_watch(rbd_dev);
3845         if (ret) {
3846                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3847                 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3848                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3849                         need_to_wake = true;
3850                 } else {
3851                         queue_delayed_work(rbd_dev->task_wq,
3852                                            &rbd_dev->watch_dwork,
3853                                            RBD_RETRY_DELAY);
3854                 }
3855                 mutex_unlock(&rbd_dev->watch_mutex);
3856                 goto out;
3857         }
3858
3859         need_to_wake = true;
3860         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3861         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3862         mutex_unlock(&rbd_dev->watch_mutex);
3863
3864         ret = rbd_dev_refresh(rbd_dev);
3865         if (ret)
3866                 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
3867
3868         if (was_lock_owner) {
3869                 ret = rbd_try_lock(rbd_dev);
3870                 if (ret)
3871                         rbd_warn(rbd_dev, "reregisteration lock failed: %d",
3872                                  ret);
3873         }
3874
3875 out:
3876         up_write(&rbd_dev->lock_rwsem);
3877         if (need_to_wake)
3878                 wake_requests(rbd_dev, true);
3879 }
3880
3881 /*
3882  * Synchronous osd object method call.  Returns the number of bytes
3883  * returned in the outbound buffer, or a negative error code.
3884  */
3885 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3886                              struct ceph_object_id *oid,
3887                              struct ceph_object_locator *oloc,
3888                              const char *method_name,
3889                              const void *outbound,
3890                              size_t outbound_size,
3891                              void *inbound,
3892                              size_t inbound_size)
3893 {
3894         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3895         struct page *req_page = NULL;
3896         struct page *reply_page;
3897         int ret;
3898
3899         /*
3900          * Method calls are ultimately read operations.  The result
3901          * should placed into the inbound buffer provided.  They
3902          * also supply outbound data--parameters for the object
3903          * method.  Currently if this is present it will be a
3904          * snapshot id.
3905          */
3906         if (outbound) {
3907                 if (outbound_size > PAGE_SIZE)
3908                         return -E2BIG;
3909
3910                 req_page = alloc_page(GFP_KERNEL);
3911                 if (!req_page)
3912                         return -ENOMEM;
3913
3914                 memcpy(page_address(req_page), outbound, outbound_size);
3915         }
3916
3917         reply_page = alloc_page(GFP_KERNEL);
3918         if (!reply_page) {
3919                 if (req_page)
3920                         __free_page(req_page);
3921                 return -ENOMEM;
3922         }
3923
3924         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3925                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
3926                              reply_page, &inbound_size);
3927         if (!ret) {
3928                 memcpy(inbound, page_address(reply_page), inbound_size);
3929                 ret = inbound_size;
3930         }
3931
3932         if (req_page)
3933                 __free_page(req_page);
3934         __free_page(reply_page);
3935         return ret;
3936 }
3937
3938 /*
3939  * lock_rwsem must be held for read
3940  */
3941 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3942 {
3943         DEFINE_WAIT(wait);
3944
3945         do {
3946                 /*
3947                  * Note the use of mod_delayed_work() in rbd_acquire_lock()
3948                  * and cancel_delayed_work() in wake_requests().
3949                  */
3950                 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3951                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3952                 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3953                                           TASK_UNINTERRUPTIBLE);
3954                 up_read(&rbd_dev->lock_rwsem);
3955                 schedule();
3956                 down_read(&rbd_dev->lock_rwsem);
3957         } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3958                  !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
3959
3960         finish_wait(&rbd_dev->lock_waitq, &wait);
3961 }
3962
3963 static void rbd_queue_workfn(struct work_struct *work)
3964 {
3965         struct request *rq = blk_mq_rq_from_pdu(work);
3966         struct rbd_device *rbd_dev = rq->q->queuedata;
3967         struct rbd_img_request *img_request;
3968         struct ceph_snap_context *snapc = NULL;
3969         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3970         u64 length = blk_rq_bytes(rq);
3971         enum obj_operation_type op_type;
3972         u64 mapping_size;
3973         bool must_be_locked;
3974         int result;
3975
3976         switch (req_op(rq)) {
3977         case REQ_OP_DISCARD:
3978                 op_type = OBJ_OP_DISCARD;
3979                 break;
3980         case REQ_OP_WRITE:
3981                 op_type = OBJ_OP_WRITE;
3982                 break;
3983         case REQ_OP_READ:
3984                 op_type = OBJ_OP_READ;
3985                 break;
3986         default:
3987                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3988                 result = -EIO;
3989                 goto err;
3990         }
3991
3992         /* Ignore/skip any zero-length requests */
3993
3994         if (!length) {
3995                 dout("%s: zero-length request\n", __func__);
3996                 result = 0;
3997                 goto err_rq;
3998         }
3999
4000         /* Only reads are allowed to a read-only device */
4001
4002         if (op_type != OBJ_OP_READ) {
4003                 if (rbd_dev->mapping.read_only) {
4004                         result = -EROFS;
4005                         goto err_rq;
4006                 }
4007                 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4008         }
4009
4010         /*
4011          * Quit early if the mapped snapshot no longer exists.  It's
4012          * still possible the snapshot will have disappeared by the
4013          * time our request arrives at the osd, but there's no sense in
4014          * sending it if we already know.
4015          */
4016         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4017                 dout("request for non-existent snapshot");
4018                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4019                 result = -ENXIO;
4020                 goto err_rq;
4021         }
4022
4023         if (offset && length > U64_MAX - offset + 1) {
4024                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4025                          length);
4026                 result = -EINVAL;
4027                 goto err_rq;    /* Shouldn't happen */
4028         }
4029
4030         blk_mq_start_request(rq);
4031
4032         down_read(&rbd_dev->header_rwsem);
4033         mapping_size = rbd_dev->mapping.size;
4034         if (op_type != OBJ_OP_READ) {
4035                 snapc = rbd_dev->header.snapc;
4036                 ceph_get_snap_context(snapc);
4037                 must_be_locked = rbd_is_lock_supported(rbd_dev);
4038         } else {
4039                 must_be_locked = rbd_dev->opts->lock_on_read &&
4040                                         rbd_is_lock_supported(rbd_dev);
4041         }
4042         up_read(&rbd_dev->header_rwsem);
4043
4044         if (offset + length > mapping_size) {
4045                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4046                          length, mapping_size);
4047                 result = -EIO;
4048                 goto err_rq;
4049         }
4050
4051         if (must_be_locked) {
4052                 down_read(&rbd_dev->lock_rwsem);
4053                 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4054                     !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
4055                         rbd_wait_state_locked(rbd_dev);
4056
4057                 WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
4058                         !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4059                 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4060                         result = -EBLACKLISTED;
4061                         goto err_unlock;
4062                 }
4063         }
4064
4065         img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4066                                              snapc);
4067         if (!img_request) {
4068                 result = -ENOMEM;
4069                 goto err_unlock;
4070         }
4071         img_request->rq = rq;
4072         snapc = NULL; /* img_request consumes a ref */
4073
4074         if (op_type == OBJ_OP_DISCARD)
4075                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4076                                               NULL);
4077         else
4078                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4079                                               rq->bio);
4080         if (result)
4081                 goto err_img_request;
4082
4083         result = rbd_img_request_submit(img_request);
4084         if (result)
4085                 goto err_img_request;
4086
4087         if (must_be_locked)
4088                 up_read(&rbd_dev->lock_rwsem);
4089         return;
4090
4091 err_img_request:
4092         rbd_img_request_put(img_request);
4093 err_unlock:
4094         if (must_be_locked)
4095                 up_read(&rbd_dev->lock_rwsem);
4096 err_rq:
4097         if (result)
4098                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4099                          obj_op_name(op_type), length, offset, result);
4100         ceph_put_snap_context(snapc);
4101 err:
4102         blk_mq_end_request(rq, result);
4103 }
4104
4105 static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4106                 const struct blk_mq_queue_data *bd)
4107 {
4108         struct request *rq = bd->rq;
4109         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4110
4111         queue_work(rbd_wq, work);
4112         return BLK_MQ_RQ_QUEUE_OK;
4113 }
4114
4115 static void rbd_free_disk(struct rbd_device *rbd_dev)
4116 {
4117         struct gendisk *disk = rbd_dev->disk;
4118
4119         if (!disk)
4120                 return;
4121
4122         rbd_dev->disk = NULL;
4123         if (disk->flags & GENHD_FL_UP) {
4124                 del_gendisk(disk);
4125                 if (disk->queue)
4126                         blk_cleanup_queue(disk->queue);
4127                 blk_mq_free_tag_set(&rbd_dev->tag_set);
4128         }
4129         put_disk(disk);
4130 }
4131
4132 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4133                              struct ceph_object_id *oid,
4134                              struct ceph_object_locator *oloc,
4135                              void *buf, int buf_len)
4136
4137 {
4138         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4139         struct ceph_osd_request *req;
4140         struct page **pages;
4141         int num_pages = calc_pages_for(0, buf_len);
4142         int ret;
4143
4144         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4145         if (!req)
4146                 return -ENOMEM;
4147
4148         ceph_oid_copy(&req->r_base_oid, oid);
4149         ceph_oloc_copy(&req->r_base_oloc, oloc);
4150         req->r_flags = CEPH_OSD_FLAG_READ;
4151
4152         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4153         if (ret)
4154                 goto out_req;
4155
4156         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4157         if (IS_ERR(pages)) {
4158                 ret = PTR_ERR(pages);
4159                 goto out_req;
4160         }
4161
4162         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4163         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4164                                          true);
4165
4166         ceph_osdc_start_request(osdc, req, false);
4167         ret = ceph_osdc_wait_request(osdc, req);
4168         if (ret >= 0)
4169                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4170
4171 out_req:
4172         ceph_osdc_put_request(req);
4173         return ret;
4174 }
4175
4176 /*
4177  * Read the complete header for the given rbd device.  On successful
4178  * return, the rbd_dev->header field will contain up-to-date
4179  * information about the image.
4180  */
4181 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4182 {
4183         struct rbd_image_header_ondisk *ondisk = NULL;
4184         u32 snap_count = 0;
4185         u64 names_size = 0;
4186         u32 want_count;
4187         int ret;
4188
4189         /*
4190          * The complete header will include an array of its 64-bit
4191          * snapshot ids, followed by the names of those snapshots as
4192          * a contiguous block of NUL-terminated strings.  Note that
4193          * the number of snapshots could change by the time we read
4194          * it in, in which case we re-read it.
4195          */
4196         do {
4197                 size_t size;
4198
4199                 kfree(ondisk);
4200
4201                 size = sizeof (*ondisk);
4202                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4203                 size += names_size;
4204                 ondisk = kmalloc(size, GFP_KERNEL);
4205                 if (!ondisk)
4206                         return -ENOMEM;
4207
4208                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4209                                         &rbd_dev->header_oloc, ondisk, size);
4210                 if (ret < 0)
4211                         goto out;
4212                 if ((size_t)ret < size) {
4213                         ret = -ENXIO;
4214                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4215                                 size, ret);
4216                         goto out;
4217                 }
4218                 if (!rbd_dev_ondisk_valid(ondisk)) {
4219                         ret = -ENXIO;
4220                         rbd_warn(rbd_dev, "invalid header");
4221                         goto out;
4222                 }
4223
4224                 names_size = le64_to_cpu(ondisk->snap_names_len);
4225                 want_count = snap_count;
4226                 snap_count = le32_to_cpu(ondisk->snap_count);
4227         } while (snap_count != want_count);
4228
4229         ret = rbd_header_from_disk(rbd_dev, ondisk);
4230 out:
4231         kfree(ondisk);
4232
4233         return ret;
4234 }
4235
4236 /*
4237  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4238  * has disappeared from the (just updated) snapshot context.
4239  */
4240 static void rbd_exists_validate(struct rbd_device *rbd_dev)
4241 {
4242         u64 snap_id;
4243
4244         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4245                 return;
4246
4247         snap_id = rbd_dev->spec->snap_id;
4248         if (snap_id == CEPH_NOSNAP)
4249                 return;
4250
4251         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4252                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4253 }
4254
4255 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4256 {
4257         sector_t size;
4258
4259         /*
4260          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4261          * try to update its size.  If REMOVING is set, updating size
4262          * is just useless work since the device can't be opened.
4263          */
4264         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4265             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4266                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4267                 dout("setting size to %llu sectors", (unsigned long long)size);
4268                 set_capacity(rbd_dev->disk, size);
4269                 revalidate_disk(rbd_dev->disk);
4270         }
4271 }
4272
4273 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4274 {
4275         u64 mapping_size;
4276         int ret;
4277
4278         down_write(&rbd_dev->header_rwsem);
4279         mapping_size = rbd_dev->mapping.size;
4280
4281         ret = rbd_dev_header_info(rbd_dev);
4282         if (ret)
4283                 goto out;
4284
4285         /*
4286          * If there is a parent, see if it has disappeared due to the
4287          * mapped image getting flattened.
4288          */
4289         if (rbd_dev->parent) {
4290                 ret = rbd_dev_v2_parent_info(rbd_dev);
4291                 if (ret)
4292                         goto out;
4293         }
4294
4295         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
4296                 rbd_dev->mapping.size = rbd_dev->header.image_size;
4297         } else {
4298                 /* validate mapped snapshot's EXISTS flag */
4299                 rbd_exists_validate(rbd_dev);
4300         }
4301
4302 out:
4303         up_write(&rbd_dev->header_rwsem);
4304         if (!ret && mapping_size != rbd_dev->mapping.size)
4305                 rbd_dev_update_size(rbd_dev);
4306
4307         return ret;
4308 }
4309
4310 static int rbd_init_request(void *data, struct request *rq,
4311                 unsigned int hctx_idx, unsigned int request_idx,
4312                 unsigned int numa_node)
4313 {
4314         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4315
4316         INIT_WORK(work, rbd_queue_workfn);
4317         return 0;
4318 }
4319
4320 static struct blk_mq_ops rbd_mq_ops = {
4321         .queue_rq       = rbd_queue_rq,
4322         .init_request   = rbd_init_request,
4323 };
4324
4325 static int rbd_init_disk(struct rbd_device *rbd_dev)
4326 {
4327         struct gendisk *disk;
4328         struct request_queue *q;
4329         u64 segment_size;
4330         int err;
4331
4332         /* create gendisk info */
4333         disk = alloc_disk(single_major ?
4334                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4335                           RBD_MINORS_PER_MAJOR);
4336         if (!disk)
4337                 return -ENOMEM;
4338
4339         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4340                  rbd_dev->dev_id);
4341         disk->major = rbd_dev->major;
4342         disk->first_minor = rbd_dev->minor;
4343         if (single_major)
4344                 disk->flags |= GENHD_FL_EXT_DEVT;
4345         disk->fops = &rbd_bd_ops;
4346         disk->private_data = rbd_dev;
4347
4348         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4349         rbd_dev->tag_set.ops = &rbd_mq_ops;
4350         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4351         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4352         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
4353         rbd_dev->tag_set.nr_hw_queues = 1;
4354         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4355
4356         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4357         if (err)
4358                 goto out_disk;
4359
4360         q = blk_mq_init_queue(&rbd_dev->tag_set);
4361         if (IS_ERR(q)) {
4362                 err = PTR_ERR(q);
4363                 goto out_tag_set;
4364         }
4365
4366         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4367         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4368
4369         /* set io sizes to object size */
4370         segment_size = rbd_obj_bytes(&rbd_dev->header);
4371         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
4372         q->limits.max_sectors = queue_max_hw_sectors(q);
4373         blk_queue_max_segments(q, segment_size / SECTOR_SIZE);
4374         blk_queue_max_segment_size(q, segment_size);
4375         blk_queue_io_min(q, segment_size);
4376         blk_queue_io_opt(q, segment_size);
4377
4378         /* enable the discard support */
4379         queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4380         q->limits.discard_granularity = segment_size;
4381         q->limits.discard_alignment = segment_size;
4382         blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
4383         q->limits.discard_zeroes_data = 1;
4384
4385         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4386                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4387
4388         disk->queue = q;
4389
4390         q->queuedata = rbd_dev;
4391
4392         rbd_dev->disk = disk;
4393
4394         return 0;
4395 out_tag_set:
4396         blk_mq_free_tag_set(&rbd_dev->tag_set);
4397 out_disk:
4398         put_disk(disk);
4399         return err;
4400 }
4401
4402 /*
4403   sysfs
4404 */
4405
4406 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4407 {
4408         return container_of(dev, struct rbd_device, dev);
4409 }
4410
4411 static ssize_t rbd_size_show(struct device *dev,
4412                              struct device_attribute *attr, char *buf)
4413 {
4414         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4415
4416         return sprintf(buf, "%llu\n",
4417                 (unsigned long long)rbd_dev->mapping.size);
4418 }
4419
4420 /*
4421  * Note this shows the features for whatever's mapped, which is not
4422  * necessarily the base image.
4423  */
4424 static ssize_t rbd_features_show(struct device *dev,
4425                              struct device_attribute *attr, char *buf)
4426 {
4427         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4428
4429         return sprintf(buf, "0x%016llx\n",
4430                         (unsigned long long)rbd_dev->mapping.features);
4431 }
4432
4433 static ssize_t rbd_major_show(struct device *dev,
4434                               struct device_attribute *attr, char *buf)
4435 {
4436         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4437
4438         if (rbd_dev->major)
4439                 return sprintf(buf, "%d\n", rbd_dev->major);
4440
4441         return sprintf(buf, "(none)\n");
4442 }
4443
4444 static ssize_t rbd_minor_show(struct device *dev,
4445                               struct device_attribute *attr, char *buf)
4446 {
4447         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4448
4449         return sprintf(buf, "%d\n", rbd_dev->minor);
4450 }
4451
4452 static ssize_t rbd_client_addr_show(struct device *dev,
4453                                     struct device_attribute *attr, char *buf)
4454 {
4455         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4456         struct ceph_entity_addr *client_addr =
4457             ceph_client_addr(rbd_dev->rbd_client->client);
4458
4459         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4460                        le32_to_cpu(client_addr->nonce));
4461 }
4462
4463 static ssize_t rbd_client_id_show(struct device *dev,
4464                                   struct device_attribute *attr, char *buf)
4465 {
4466         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4467
4468         return sprintf(buf, "client%lld\n",
4469                        ceph_client_gid(rbd_dev->rbd_client->client));
4470 }
4471
4472 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4473                                      struct device_attribute *attr, char *buf)
4474 {
4475         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4476
4477         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4478 }
4479
4480 static ssize_t rbd_config_info_show(struct device *dev,
4481                                     struct device_attribute *attr, char *buf)
4482 {
4483         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4484
4485         return sprintf(buf, "%s\n", rbd_dev->config_info);
4486 }
4487
4488 static ssize_t rbd_pool_show(struct device *dev,
4489                              struct device_attribute *attr, char *buf)
4490 {
4491         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4492
4493         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4494 }
4495
4496 static ssize_t rbd_pool_id_show(struct device *dev,
4497                              struct device_attribute *attr, char *buf)
4498 {
4499         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4500
4501         return sprintf(buf, "%llu\n",
4502                         (unsigned long long) rbd_dev->spec->pool_id);
4503 }
4504
4505 static ssize_t rbd_name_show(struct device *dev,
4506                              struct device_attribute *attr, char *buf)
4507 {
4508         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4509
4510         if (rbd_dev->spec->image_name)
4511                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4512
4513         return sprintf(buf, "(unknown)\n");
4514 }
4515
4516 static ssize_t rbd_image_id_show(struct device *dev,
4517                              struct device_attribute *attr, char *buf)
4518 {
4519         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4520
4521         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4522 }
4523
4524 /*
4525  * Shows the name of the currently-mapped snapshot (or
4526  * RBD_SNAP_HEAD_NAME for the base image).
4527  */
4528 static ssize_t rbd_snap_show(struct device *dev,
4529                              struct device_attribute *attr,
4530                              char *buf)
4531 {
4532         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4533
4534         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4535 }
4536
4537 static ssize_t rbd_snap_id_show(struct device *dev,
4538                                 struct device_attribute *attr, char *buf)
4539 {
4540         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4541
4542         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4543 }
4544
4545 /*
4546  * For a v2 image, shows the chain of parent images, separated by empty
4547  * lines.  For v1 images or if there is no parent, shows "(no parent
4548  * image)".
4549  */
4550 static ssize_t rbd_parent_show(struct device *dev,
4551                                struct device_attribute *attr,
4552                                char *buf)
4553 {
4554         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4555         ssize_t count = 0;
4556
4557         if (!rbd_dev->parent)
4558                 return sprintf(buf, "(no parent image)\n");
4559
4560         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4561                 struct rbd_spec *spec = rbd_dev->parent_spec;
4562
4563                 count += sprintf(&buf[count], "%s"
4564                             "pool_id %llu\npool_name %s\n"
4565                             "image_id %s\nimage_name %s\n"
4566                             "snap_id %llu\nsnap_name %s\n"
4567                             "overlap %llu\n",
4568                             !count ? "" : "\n", /* first? */
4569                             spec->pool_id, spec->pool_name,
4570                             spec->image_id, spec->image_name ?: "(unknown)",
4571                             spec->snap_id, spec->snap_name,
4572                             rbd_dev->parent_overlap);
4573         }
4574
4575         return count;
4576 }
4577
4578 static ssize_t rbd_image_refresh(struct device *dev,
4579                                  struct device_attribute *attr,
4580                                  const char *buf,
4581                                  size_t size)
4582 {
4583         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4584         int ret;
4585
4586         ret = rbd_dev_refresh(rbd_dev);
4587         if (ret)
4588                 return ret;
4589
4590         return size;
4591 }
4592
4593 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4594 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4595 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4596 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4597 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4598 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4599 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4600 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4601 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4602 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4603 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4604 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4605 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4606 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4607 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4608 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4609
4610 static struct attribute *rbd_attrs[] = {
4611         &dev_attr_size.attr,
4612         &dev_attr_features.attr,
4613         &dev_attr_major.attr,
4614         &dev_attr_minor.attr,
4615         &dev_attr_client_addr.attr,
4616         &dev_attr_client_id.attr,
4617         &dev_attr_cluster_fsid.attr,
4618         &dev_attr_config_info.attr,
4619         &dev_attr_pool.attr,
4620         &dev_attr_pool_id.attr,
4621         &dev_attr_name.attr,
4622         &dev_attr_image_id.attr,
4623         &dev_attr_current_snap.attr,
4624         &dev_attr_snap_id.attr,
4625         &dev_attr_parent.attr,
4626         &dev_attr_refresh.attr,
4627         NULL
4628 };
4629
4630 static struct attribute_group rbd_attr_group = {
4631         .attrs = rbd_attrs,
4632 };
4633
4634 static const struct attribute_group *rbd_attr_groups[] = {
4635         &rbd_attr_group,
4636         NULL
4637 };
4638
4639 static void rbd_dev_release(struct device *dev);
4640
4641 static const struct device_type rbd_device_type = {
4642         .name           = "rbd",
4643         .groups         = rbd_attr_groups,
4644         .release        = rbd_dev_release,
4645 };
4646
4647 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4648 {
4649         kref_get(&spec->kref);
4650
4651         return spec;
4652 }
4653
4654 static void rbd_spec_free(struct kref *kref);
4655 static void rbd_spec_put(struct rbd_spec *spec)
4656 {
4657         if (spec)
4658                 kref_put(&spec->kref, rbd_spec_free);
4659 }
4660
4661 static struct rbd_spec *rbd_spec_alloc(void)
4662 {
4663         struct rbd_spec *spec;
4664
4665         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4666         if (!spec)
4667                 return NULL;
4668
4669         spec->pool_id = CEPH_NOPOOL;
4670         spec->snap_id = CEPH_NOSNAP;
4671         kref_init(&spec->kref);
4672
4673         return spec;
4674 }
4675
4676 static void rbd_spec_free(struct kref *kref)
4677 {
4678         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4679
4680         kfree(spec->pool_name);
4681         kfree(spec->image_id);
4682         kfree(spec->image_name);
4683         kfree(spec->snap_name);
4684         kfree(spec);
4685 }
4686
4687 static void rbd_dev_free(struct rbd_device *rbd_dev)
4688 {
4689         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4690         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4691
4692         ceph_oid_destroy(&rbd_dev->header_oid);
4693         ceph_oloc_destroy(&rbd_dev->header_oloc);
4694         kfree(rbd_dev->config_info);
4695
4696         rbd_put_client(rbd_dev->rbd_client);
4697         rbd_spec_put(rbd_dev->spec);
4698         kfree(rbd_dev->opts);
4699         kfree(rbd_dev);
4700 }
4701
4702 static void rbd_dev_release(struct device *dev)
4703 {
4704         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4705         bool need_put = !!rbd_dev->opts;
4706
4707         if (need_put) {
4708                 destroy_workqueue(rbd_dev->task_wq);
4709                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4710         }
4711
4712         rbd_dev_free(rbd_dev);
4713
4714         /*
4715          * This is racy, but way better than putting module outside of
4716          * the release callback.  The race window is pretty small, so
4717          * doing something similar to dm (dm-builtin.c) is overkill.
4718          */
4719         if (need_put)
4720                 module_put(THIS_MODULE);
4721 }
4722
4723 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4724                                            struct rbd_spec *spec)
4725 {
4726         struct rbd_device *rbd_dev;
4727
4728         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4729         if (!rbd_dev)
4730                 return NULL;
4731
4732         spin_lock_init(&rbd_dev->lock);
4733         INIT_LIST_HEAD(&rbd_dev->node);
4734         init_rwsem(&rbd_dev->header_rwsem);
4735
4736         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4737         ceph_oid_init(&rbd_dev->header_oid);
4738         rbd_dev->header_oloc.pool = spec->pool_id;
4739
4740         mutex_init(&rbd_dev->watch_mutex);
4741         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4742         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4743
4744         init_rwsem(&rbd_dev->lock_rwsem);
4745         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4746         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4747         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4748         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4749         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4750         init_waitqueue_head(&rbd_dev->lock_waitq);
4751
4752         rbd_dev->dev.bus = &rbd_bus_type;
4753         rbd_dev->dev.type = &rbd_device_type;
4754         rbd_dev->dev.parent = &rbd_root_dev;
4755         device_initialize(&rbd_dev->dev);
4756
4757         rbd_dev->rbd_client = rbdc;
4758         rbd_dev->spec = spec;
4759
4760         return rbd_dev;
4761 }
4762
4763 /*
4764  * Create a mapping rbd_dev.
4765  */
4766 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4767                                          struct rbd_spec *spec,
4768                                          struct rbd_options *opts)
4769 {
4770         struct rbd_device *rbd_dev;
4771
4772         rbd_dev = __rbd_dev_create(rbdc, spec);
4773         if (!rbd_dev)
4774                 return NULL;
4775
4776         rbd_dev->opts = opts;
4777
4778         /* get an id and fill in device name */
4779         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4780                                          minor_to_rbd_dev_id(1 << MINORBITS),
4781                                          GFP_KERNEL);
4782         if (rbd_dev->dev_id < 0)
4783                 goto fail_rbd_dev;
4784
4785         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4786         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4787                                                    rbd_dev->name);
4788         if (!rbd_dev->task_wq)
4789                 goto fail_dev_id;
4790
4791         /* we have a ref from do_rbd_add() */
4792         __module_get(THIS_MODULE);
4793
4794         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4795         return rbd_dev;
4796
4797 fail_dev_id:
4798         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4799 fail_rbd_dev:
4800         rbd_dev_free(rbd_dev);
4801         return NULL;
4802 }
4803
4804 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4805 {
4806         if (rbd_dev)
4807                 put_device(&rbd_dev->dev);
4808 }
4809
4810 /*
4811  * Get the size and object order for an image snapshot, or if
4812  * snap_id is CEPH_NOSNAP, gets this information for the base
4813  * image.
4814  */
4815 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4816                                 u8 *order, u64 *snap_size)
4817 {
4818         __le64 snapid = cpu_to_le64(snap_id);
4819         int ret;
4820         struct {
4821                 u8 order;
4822                 __le64 size;
4823         } __attribute__ ((packed)) size_buf = { 0 };
4824
4825         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4826                                   &rbd_dev->header_oloc, "get_size",
4827                                   &snapid, sizeof(snapid),
4828                                   &size_buf, sizeof(size_buf));
4829         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4830         if (ret < 0)
4831                 return ret;
4832         if (ret < sizeof (size_buf))
4833                 return -ERANGE;
4834
4835         if (order) {
4836                 *order = size_buf.order;
4837                 dout("  order %u", (unsigned int)*order);
4838         }
4839         *snap_size = le64_to_cpu(size_buf.size);
4840
4841         dout("  snap_id 0x%016llx snap_size = %llu\n",
4842                 (unsigned long long)snap_id,
4843                 (unsigned long long)*snap_size);
4844
4845         return 0;
4846 }
4847
4848 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4849 {
4850         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4851                                         &rbd_dev->header.obj_order,
4852                                         &rbd_dev->header.image_size);
4853 }
4854
4855 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4856 {
4857         void *reply_buf;
4858         int ret;
4859         void *p;
4860
4861         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4862         if (!reply_buf)
4863                 return -ENOMEM;
4864
4865         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4866                                   &rbd_dev->header_oloc, "get_object_prefix",
4867                                   NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4868         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4869         if (ret < 0)
4870                 goto out;
4871
4872         p = reply_buf;
4873         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4874                                                 p + ret, NULL, GFP_NOIO);
4875         ret = 0;
4876
4877         if (IS_ERR(rbd_dev->header.object_prefix)) {
4878                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4879                 rbd_dev->header.object_prefix = NULL;
4880         } else {
4881                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4882         }
4883 out:
4884         kfree(reply_buf);
4885
4886         return ret;
4887 }
4888
4889 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4890                 u64 *snap_features)
4891 {
4892         __le64 snapid = cpu_to_le64(snap_id);
4893         struct {
4894                 __le64 features;
4895                 __le64 incompat;
4896         } __attribute__ ((packed)) features_buf = { 0 };
4897         u64 unsup;
4898         int ret;
4899
4900         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4901                                   &rbd_dev->header_oloc, "get_features",
4902                                   &snapid, sizeof(snapid),
4903                                   &features_buf, sizeof(features_buf));
4904         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4905         if (ret < 0)
4906                 return ret;
4907         if (ret < sizeof (features_buf))
4908                 return -ERANGE;
4909
4910         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4911         if (unsup) {
4912                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4913                          unsup);
4914                 return -ENXIO;
4915         }
4916
4917         *snap_features = le64_to_cpu(features_buf.features);
4918
4919         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4920                 (unsigned long long)snap_id,
4921                 (unsigned long long)*snap_features,
4922                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4923
4924         return 0;
4925 }
4926
4927 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4928 {
4929         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4930                                                 &rbd_dev->header.features);
4931 }
4932
4933 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4934 {
4935         struct rbd_spec *parent_spec;
4936         size_t size;
4937         void *reply_buf = NULL;
4938         __le64 snapid;
4939         void *p;
4940         void *end;
4941         u64 pool_id;
4942         char *image_id;
4943         u64 snap_id;
4944         u64 overlap;
4945         int ret;
4946
4947         parent_spec = rbd_spec_alloc();
4948         if (!parent_spec)
4949                 return -ENOMEM;
4950
4951         size = sizeof (__le64) +                                /* pool_id */
4952                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
4953                 sizeof (__le64) +                               /* snap_id */
4954                 sizeof (__le64);                                /* overlap */
4955         reply_buf = kmalloc(size, GFP_KERNEL);
4956         if (!reply_buf) {
4957                 ret = -ENOMEM;
4958                 goto out_err;
4959         }
4960
4961         snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4962         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4963                                   &rbd_dev->header_oloc, "get_parent",
4964                                   &snapid, sizeof(snapid), reply_buf, size);
4965         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4966         if (ret < 0)
4967                 goto out_err;
4968
4969         p = reply_buf;
4970         end = reply_buf + ret;
4971         ret = -ERANGE;
4972         ceph_decode_64_safe(&p, end, pool_id, out_err);
4973         if (pool_id == CEPH_NOPOOL) {
4974                 /*
4975                  * Either the parent never existed, or we have
4976                  * record of it but the image got flattened so it no
4977                  * longer has a parent.  When the parent of a
4978                  * layered image disappears we immediately set the
4979                  * overlap to 0.  The effect of this is that all new
4980                  * requests will be treated as if the image had no
4981                  * parent.
4982                  */
4983                 if (rbd_dev->parent_overlap) {
4984                         rbd_dev->parent_overlap = 0;
4985                         rbd_dev_parent_put(rbd_dev);
4986                         pr_info("%s: clone image has been flattened\n",
4987                                 rbd_dev->disk->disk_name);
4988                 }
4989
4990                 goto out;       /* No parent?  No problem. */
4991         }
4992
4993         /* The ceph file layout needs to fit pool id in 32 bits */
4994
4995         ret = -EIO;
4996         if (pool_id > (u64)U32_MAX) {
4997                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4998                         (unsigned long long)pool_id, U32_MAX);
4999                 goto out_err;
5000         }
5001
5002         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5003         if (IS_ERR(image_id)) {
5004                 ret = PTR_ERR(image_id);
5005                 goto out_err;
5006         }
5007         ceph_decode_64_safe(&p, end, snap_id, out_err);
5008         ceph_decode_64_safe(&p, end, overlap, out_err);
5009
5010         /*
5011          * The parent won't change (except when the clone is
5012          * flattened, already handled that).  So we only need to
5013          * record the parent spec we have not already done so.
5014          */
5015         if (!rbd_dev->parent_spec) {
5016                 parent_spec->pool_id = pool_id;
5017                 parent_spec->image_id = image_id;
5018                 parent_spec->snap_id = snap_id;
5019                 rbd_dev->parent_spec = parent_spec;
5020                 parent_spec = NULL;     /* rbd_dev now owns this */
5021         } else {
5022                 kfree(image_id);
5023         }
5024
5025         /*
5026          * We always update the parent overlap.  If it's zero we issue
5027          * a warning, as we will proceed as if there was no parent.
5028          */
5029         if (!overlap) {
5030                 if (parent_spec) {
5031                         /* refresh, careful to warn just once */
5032                         if (rbd_dev->parent_overlap)
5033                                 rbd_warn(rbd_dev,
5034                                     "clone now standalone (overlap became 0)");
5035                 } else {
5036                         /* initial probe */
5037                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5038                 }
5039         }
5040         rbd_dev->parent_overlap = overlap;
5041
5042 out:
5043         ret = 0;
5044 out_err:
5045         kfree(reply_buf);
5046         rbd_spec_put(parent_spec);
5047
5048         return ret;
5049 }
5050
5051 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5052 {
5053         struct {
5054                 __le64 stripe_unit;
5055                 __le64 stripe_count;
5056         } __attribute__ ((packed)) striping_info_buf = { 0 };
5057         size_t size = sizeof (striping_info_buf);
5058         void *p;
5059         u64 obj_size;
5060         u64 stripe_unit;
5061         u64 stripe_count;
5062         int ret;
5063
5064         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5065                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
5066                                 NULL, 0, &striping_info_buf, size);
5067         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5068         if (ret < 0)
5069                 return ret;
5070         if (ret < size)
5071                 return -ERANGE;
5072
5073         /*
5074          * We don't actually support the "fancy striping" feature
5075          * (STRIPINGV2) yet, but if the striping sizes are the
5076          * defaults the behavior is the same as before.  So find
5077          * out, and only fail if the image has non-default values.
5078          */
5079         ret = -EINVAL;
5080         obj_size = rbd_obj_bytes(&rbd_dev->header);
5081         p = &striping_info_buf;
5082         stripe_unit = ceph_decode_64(&p);
5083         if (stripe_unit != obj_size) {
5084                 rbd_warn(rbd_dev, "unsupported stripe unit "
5085                                 "(got %llu want %llu)",
5086                                 stripe_unit, obj_size);
5087                 return -EINVAL;
5088         }
5089         stripe_count = ceph_decode_64(&p);
5090         if (stripe_count != 1) {
5091                 rbd_warn(rbd_dev, "unsupported stripe count "
5092                                 "(got %llu want 1)", stripe_count);
5093                 return -EINVAL;
5094         }
5095         rbd_dev->header.stripe_unit = stripe_unit;
5096         rbd_dev->header.stripe_count = stripe_count;
5097
5098         return 0;
5099 }
5100
5101 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5102 {
5103         __le64 data_pool_id;
5104         int ret;
5105
5106         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5107                                   &rbd_dev->header_oloc, "get_data_pool",
5108                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
5109         if (ret < 0)
5110                 return ret;
5111         if (ret < sizeof(data_pool_id))
5112                 return -EBADMSG;
5113
5114         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5115         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5116         return 0;
5117 }
5118
5119 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5120 {
5121         CEPH_DEFINE_OID_ONSTACK(oid);
5122         size_t image_id_size;
5123         char *image_id;
5124         void *p;
5125         void *end;
5126         size_t size;
5127         void *reply_buf = NULL;
5128         size_t len = 0;
5129         char *image_name = NULL;
5130         int ret;
5131
5132         rbd_assert(!rbd_dev->spec->image_name);
5133
5134         len = strlen(rbd_dev->spec->image_id);
5135         image_id_size = sizeof (__le32) + len;
5136         image_id = kmalloc(image_id_size, GFP_KERNEL);
5137         if (!image_id)
5138                 return NULL;
5139
5140         p = image_id;
5141         end = image_id + image_id_size;
5142         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5143
5144         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5145         reply_buf = kmalloc(size, GFP_KERNEL);
5146         if (!reply_buf)
5147                 goto out;
5148
5149         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5150         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5151                                   "dir_get_name", image_id, image_id_size,
5152                                   reply_buf, size);
5153         if (ret < 0)
5154                 goto out;
5155         p = reply_buf;
5156         end = reply_buf + ret;
5157
5158         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5159         if (IS_ERR(image_name))
5160                 image_name = NULL;
5161         else
5162                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5163 out:
5164         kfree(reply_buf);
5165         kfree(image_id);
5166
5167         return image_name;
5168 }
5169
5170 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5171 {
5172         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5173         const char *snap_name;
5174         u32 which = 0;
5175
5176         /* Skip over names until we find the one we are looking for */
5177
5178         snap_name = rbd_dev->header.snap_names;
5179         while (which < snapc->num_snaps) {
5180                 if (!strcmp(name, snap_name))
5181                         return snapc->snaps[which];
5182                 snap_name += strlen(snap_name) + 1;
5183                 which++;
5184         }
5185         return CEPH_NOSNAP;
5186 }
5187
5188 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5189 {
5190         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5191         u32 which;
5192         bool found = false;
5193         u64 snap_id;
5194
5195         for (which = 0; !found && which < snapc->num_snaps; which++) {
5196                 const char *snap_name;
5197
5198                 snap_id = snapc->snaps[which];
5199                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5200                 if (IS_ERR(snap_name)) {
5201                         /* ignore no-longer existing snapshots */
5202                         if (PTR_ERR(snap_name) == -ENOENT)
5203                                 continue;
5204                         else
5205                                 break;
5206                 }
5207                 found = !strcmp(name, snap_name);
5208                 kfree(snap_name);
5209         }
5210         return found ? snap_id : CEPH_NOSNAP;
5211 }
5212
5213 /*
5214  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5215  * no snapshot by that name is found, or if an error occurs.
5216  */
5217 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5218 {
5219         if (rbd_dev->image_format == 1)
5220                 return rbd_v1_snap_id_by_name(rbd_dev, name);
5221
5222         return rbd_v2_snap_id_by_name(rbd_dev, name);
5223 }
5224
5225 /*
5226  * An image being mapped will have everything but the snap id.
5227  */
5228 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5229 {
5230         struct rbd_spec *spec = rbd_dev->spec;
5231
5232         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5233         rbd_assert(spec->image_id && spec->image_name);
5234         rbd_assert(spec->snap_name);
5235
5236         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5237                 u64 snap_id;
5238
5239                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5240                 if (snap_id == CEPH_NOSNAP)
5241                         return -ENOENT;
5242
5243                 spec->snap_id = snap_id;
5244         } else {
5245                 spec->snap_id = CEPH_NOSNAP;
5246         }
5247
5248         return 0;
5249 }
5250
5251 /*
5252  * A parent image will have all ids but none of the names.
5253  *
5254  * All names in an rbd spec are dynamically allocated.  It's OK if we
5255  * can't figure out the name for an image id.
5256  */
5257 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5258 {
5259         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5260         struct rbd_spec *spec = rbd_dev->spec;
5261         const char *pool_name;
5262         const char *image_name;
5263         const char *snap_name;
5264         int ret;
5265
5266         rbd_assert(spec->pool_id != CEPH_NOPOOL);
5267         rbd_assert(spec->image_id);
5268         rbd_assert(spec->snap_id != CEPH_NOSNAP);
5269
5270         /* Get the pool name; we have to make our own copy of this */
5271
5272         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5273         if (!pool_name) {
5274                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5275                 return -EIO;
5276         }
5277         pool_name = kstrdup(pool_name, GFP_KERNEL);
5278         if (!pool_name)
5279                 return -ENOMEM;
5280
5281         /* Fetch the image name; tolerate failure here */
5282
5283         image_name = rbd_dev_image_name(rbd_dev);
5284         if (!image_name)
5285                 rbd_warn(rbd_dev, "unable to get image name");
5286
5287         /* Fetch the snapshot name */
5288
5289         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5290         if (IS_ERR(snap_name)) {
5291                 ret = PTR_ERR(snap_name);
5292                 goto out_err;
5293         }
5294
5295         spec->pool_name = pool_name;
5296         spec->image_name = image_name;
5297         spec->snap_name = snap_name;
5298
5299         return 0;
5300
5301 out_err:
5302         kfree(image_name);
5303         kfree(pool_name);
5304         return ret;
5305 }
5306
5307 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5308 {
5309         size_t size;
5310         int ret;
5311         void *reply_buf;
5312         void *p;
5313         void *end;
5314         u64 seq;
5315         u32 snap_count;
5316         struct ceph_snap_context *snapc;
5317         u32 i;
5318
5319         /*
5320          * We'll need room for the seq value (maximum snapshot id),
5321          * snapshot count, and array of that many snapshot ids.
5322          * For now we have a fixed upper limit on the number we're
5323          * prepared to receive.
5324          */
5325         size = sizeof (__le64) + sizeof (__le32) +
5326                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
5327         reply_buf = kzalloc(size, GFP_KERNEL);
5328         if (!reply_buf)
5329                 return -ENOMEM;
5330
5331         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5332                                   &rbd_dev->header_oloc, "get_snapcontext",
5333                                   NULL, 0, reply_buf, size);
5334         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5335         if (ret < 0)
5336                 goto out;
5337
5338         p = reply_buf;
5339         end = reply_buf + ret;
5340         ret = -ERANGE;
5341         ceph_decode_64_safe(&p, end, seq, out);
5342         ceph_decode_32_safe(&p, end, snap_count, out);
5343
5344         /*
5345          * Make sure the reported number of snapshot ids wouldn't go
5346          * beyond the end of our buffer.  But before checking that,
5347          * make sure the computed size of the snapshot context we
5348          * allocate is representable in a size_t.
5349          */
5350         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5351                                  / sizeof (u64)) {
5352                 ret = -EINVAL;
5353                 goto out;
5354         }
5355         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5356                 goto out;
5357         ret = 0;
5358
5359         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5360         if (!snapc) {
5361                 ret = -ENOMEM;
5362                 goto out;
5363         }
5364         snapc->seq = seq;
5365         for (i = 0; i < snap_count; i++)
5366                 snapc->snaps[i] = ceph_decode_64(&p);
5367
5368         ceph_put_snap_context(rbd_dev->header.snapc);
5369         rbd_dev->header.snapc = snapc;
5370
5371         dout("  snap context seq = %llu, snap_count = %u\n",
5372                 (unsigned long long)seq, (unsigned int)snap_count);
5373 out:
5374         kfree(reply_buf);
5375
5376         return ret;
5377 }
5378
5379 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5380                                         u64 snap_id)
5381 {
5382         size_t size;
5383         void *reply_buf;
5384         __le64 snapid;
5385         int ret;
5386         void *p;
5387         void *end;
5388         char *snap_name;
5389
5390         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5391         reply_buf = kmalloc(size, GFP_KERNEL);
5392         if (!reply_buf)
5393                 return ERR_PTR(-ENOMEM);
5394
5395         snapid = cpu_to_le64(snap_id);
5396         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5397                                   &rbd_dev->header_oloc, "get_snapshot_name",
5398                                   &snapid, sizeof(snapid), reply_buf, size);
5399         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5400         if (ret < 0) {
5401                 snap_name = ERR_PTR(ret);
5402                 goto out;
5403         }
5404
5405         p = reply_buf;
5406         end = reply_buf + ret;
5407         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5408         if (IS_ERR(snap_name))
5409                 goto out;
5410
5411         dout("  snap_id 0x%016llx snap_name = %s\n",
5412                 (unsigned long long)snap_id, snap_name);
5413 out:
5414         kfree(reply_buf);
5415
5416         return snap_name;
5417 }
5418
5419 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5420 {
5421         bool first_time = rbd_dev->header.object_prefix == NULL;
5422         int ret;
5423
5424         ret = rbd_dev_v2_image_size(rbd_dev);
5425         if (ret)
5426                 return ret;
5427
5428         if (first_time) {
5429                 ret = rbd_dev_v2_header_onetime(rbd_dev);
5430                 if (ret)
5431                         return ret;
5432         }
5433
5434         ret = rbd_dev_v2_snap_context(rbd_dev);
5435         if (ret && first_time) {
5436                 kfree(rbd_dev->header.object_prefix);
5437                 rbd_dev->header.object_prefix = NULL;
5438         }
5439
5440         return ret;
5441 }
5442
5443 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5444 {
5445         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5446
5447         if (rbd_dev->image_format == 1)
5448                 return rbd_dev_v1_header_info(rbd_dev);
5449
5450         return rbd_dev_v2_header_info(rbd_dev);
5451 }
5452
5453 /*
5454  * Skips over white space at *buf, and updates *buf to point to the
5455  * first found non-space character (if any). Returns the length of
5456  * the token (string of non-white space characters) found.  Note
5457  * that *buf must be terminated with '\0'.
5458  */
5459 static inline size_t next_token(const char **buf)
5460 {
5461         /*
5462         * These are the characters that produce nonzero for
5463         * isspace() in the "C" and "POSIX" locales.
5464         */
5465         const char *spaces = " \f\n\r\t\v";
5466
5467         *buf += strspn(*buf, spaces);   /* Find start of token */
5468
5469         return strcspn(*buf, spaces);   /* Return token length */
5470 }
5471
5472 /*
5473  * Finds the next token in *buf, dynamically allocates a buffer big
5474  * enough to hold a copy of it, and copies the token into the new
5475  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5476  * that a duplicate buffer is created even for a zero-length token.
5477  *
5478  * Returns a pointer to the newly-allocated duplicate, or a null
5479  * pointer if memory for the duplicate was not available.  If
5480  * the lenp argument is a non-null pointer, the length of the token
5481  * (not including the '\0') is returned in *lenp.
5482  *
5483  * If successful, the *buf pointer will be updated to point beyond
5484  * the end of the found token.
5485  *
5486  * Note: uses GFP_KERNEL for allocation.
5487  */
5488 static inline char *dup_token(const char **buf, size_t *lenp)
5489 {
5490         char *dup;
5491         size_t len;
5492
5493         len = next_token(buf);
5494         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5495         if (!dup)
5496                 return NULL;
5497         *(dup + len) = '\0';
5498         *buf += len;
5499
5500         if (lenp)
5501                 *lenp = len;
5502
5503         return dup;
5504 }
5505
5506 /*
5507  * Parse the options provided for an "rbd add" (i.e., rbd image
5508  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5509  * and the data written is passed here via a NUL-terminated buffer.
5510  * Returns 0 if successful or an error code otherwise.
5511  *
5512  * The information extracted from these options is recorded in
5513  * the other parameters which return dynamically-allocated
5514  * structures:
5515  *  ceph_opts
5516  *      The address of a pointer that will refer to a ceph options
5517  *      structure.  Caller must release the returned pointer using
5518  *      ceph_destroy_options() when it is no longer needed.
5519  *  rbd_opts
5520  *      Address of an rbd options pointer.  Fully initialized by
5521  *      this function; caller must release with kfree().
5522  *  spec
5523  *      Address of an rbd image specification pointer.  Fully
5524  *      initialized by this function based on parsed options.
5525  *      Caller must release with rbd_spec_put().
5526  *
5527  * The options passed take this form:
5528  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5529  * where:
5530  *  <mon_addrs>
5531  *      A comma-separated list of one or more monitor addresses.
5532  *      A monitor address is an ip address, optionally followed
5533  *      by a port number (separated by a colon).
5534  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5535  *  <options>
5536  *      A comma-separated list of ceph and/or rbd options.
5537  *  <pool_name>
5538  *      The name of the rados pool containing the rbd image.
5539  *  <image_name>
5540  *      The name of the image in that pool to map.
5541  *  <snap_id>
5542  *      An optional snapshot id.  If provided, the mapping will
5543  *      present data from the image at the time that snapshot was
5544  *      created.  The image head is used if no snapshot id is
5545  *      provided.  Snapshot mappings are always read-only.
5546  */
5547 static int rbd_add_parse_args(const char *buf,
5548                                 struct ceph_options **ceph_opts,
5549                                 struct rbd_options **opts,
5550                                 struct rbd_spec **rbd_spec)
5551 {
5552         size_t len;
5553         char *options;
5554         const char *mon_addrs;
5555         char *snap_name;
5556         size_t mon_addrs_size;
5557         struct rbd_spec *spec = NULL;
5558         struct rbd_options *rbd_opts = NULL;
5559         struct ceph_options *copts;
5560         int ret;
5561
5562         /* The first four tokens are required */
5563
5564         len = next_token(&buf);
5565         if (!len) {
5566                 rbd_warn(NULL, "no monitor address(es) provided");
5567                 return -EINVAL;
5568         }
5569         mon_addrs = buf;
5570         mon_addrs_size = len + 1;
5571         buf += len;
5572
5573         ret = -EINVAL;
5574         options = dup_token(&buf, NULL);
5575         if (!options)
5576                 return -ENOMEM;
5577         if (!*options) {
5578                 rbd_warn(NULL, "no options provided");
5579                 goto out_err;
5580         }
5581
5582         spec = rbd_spec_alloc();
5583         if (!spec)
5584                 goto out_mem;
5585
5586         spec->pool_name = dup_token(&buf, NULL);
5587         if (!spec->pool_name)
5588                 goto out_mem;
5589         if (!*spec->pool_name) {
5590                 rbd_warn(NULL, "no pool name provided");
5591                 goto out_err;
5592         }
5593
5594         spec->image_name = dup_token(&buf, NULL);
5595         if (!spec->image_name)
5596                 goto out_mem;
5597         if (!*spec->image_name) {
5598                 rbd_warn(NULL, "no image name provided");
5599                 goto out_err;
5600         }
5601
5602         /*
5603          * Snapshot name is optional; default is to use "-"
5604          * (indicating the head/no snapshot).
5605          */
5606         len = next_token(&buf);
5607         if (!len) {
5608                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5609                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5610         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5611                 ret = -ENAMETOOLONG;
5612                 goto out_err;
5613         }
5614         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5615         if (!snap_name)
5616                 goto out_mem;
5617         *(snap_name + len) = '\0';
5618         spec->snap_name = snap_name;
5619
5620         /* Initialize all rbd options to the defaults */
5621
5622         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5623         if (!rbd_opts)
5624                 goto out_mem;
5625
5626         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5627         rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5628         rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5629
5630         copts = ceph_parse_options(options, mon_addrs,
5631                                         mon_addrs + mon_addrs_size - 1,
5632                                         parse_rbd_opts_token, rbd_opts);
5633         if (IS_ERR(copts)) {
5634                 ret = PTR_ERR(copts);
5635                 goto out_err;
5636         }
5637         kfree(options);
5638
5639         *ceph_opts = copts;
5640         *opts = rbd_opts;
5641         *rbd_spec = spec;
5642
5643         return 0;
5644 out_mem:
5645         ret = -ENOMEM;
5646 out_err:
5647         kfree(rbd_opts);
5648         rbd_spec_put(spec);
5649         kfree(options);
5650
5651         return ret;
5652 }
5653
5654 /*
5655  * Return pool id (>= 0) or a negative error code.
5656  */
5657 static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5658 {
5659         struct ceph_options *opts = rbdc->client->options;
5660         u64 newest_epoch;
5661         int tries = 0;
5662         int ret;
5663
5664 again:
5665         ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5666         if (ret == -ENOENT && tries++ < 1) {
5667                 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5668                                             &newest_epoch);
5669                 if (ret < 0)
5670                         return ret;
5671
5672                 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5673                         ceph_osdc_maybe_request_map(&rbdc->client->osdc);
5674                         (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5675                                                      newest_epoch,
5676                                                      opts->mount_timeout);
5677                         goto again;
5678                 } else {
5679                         /* the osdmap we have is new enough */
5680                         return -ENOENT;
5681                 }
5682         }
5683
5684         return ret;
5685 }
5686
5687 /*
5688  * An rbd format 2 image has a unique identifier, distinct from the
5689  * name given to it by the user.  Internally, that identifier is
5690  * what's used to specify the names of objects related to the image.
5691  *
5692  * A special "rbd id" object is used to map an rbd image name to its
5693  * id.  If that object doesn't exist, then there is no v2 rbd image
5694  * with the supplied name.
5695  *
5696  * This function will record the given rbd_dev's image_id field if
5697  * it can be determined, and in that case will return 0.  If any
5698  * errors occur a negative errno will be returned and the rbd_dev's
5699  * image_id field will be unchanged (and should be NULL).
5700  */
5701 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5702 {
5703         int ret;
5704         size_t size;
5705         CEPH_DEFINE_OID_ONSTACK(oid);
5706         void *response;
5707         char *image_id;
5708
5709         /*
5710          * When probing a parent image, the image id is already
5711          * known (and the image name likely is not).  There's no
5712          * need to fetch the image id again in this case.  We
5713          * do still need to set the image format though.
5714          */
5715         if (rbd_dev->spec->image_id) {
5716                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5717
5718                 return 0;
5719         }
5720
5721         /*
5722          * First, see if the format 2 image id file exists, and if
5723          * so, get the image's persistent id from it.
5724          */
5725         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5726                                rbd_dev->spec->image_name);
5727         if (ret)
5728                 return ret;
5729
5730         dout("rbd id object name is %s\n", oid.name);
5731
5732         /* Response will be an encoded string, which includes a length */
5733
5734         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5735         response = kzalloc(size, GFP_NOIO);
5736         if (!response) {
5737                 ret = -ENOMEM;
5738                 goto out;
5739         }
5740
5741         /* If it doesn't exist we'll assume it's a format 1 image */
5742
5743         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5744                                   "get_id", NULL, 0,
5745                                   response, RBD_IMAGE_ID_LEN_MAX);
5746         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5747         if (ret == -ENOENT) {
5748                 image_id = kstrdup("", GFP_KERNEL);
5749                 ret = image_id ? 0 : -ENOMEM;
5750                 if (!ret)
5751                         rbd_dev->image_format = 1;
5752         } else if (ret >= 0) {
5753                 void *p = response;
5754
5755                 image_id = ceph_extract_encoded_string(&p, p + ret,
5756                                                 NULL, GFP_NOIO);
5757                 ret = PTR_ERR_OR_ZERO(image_id);
5758                 if (!ret)
5759                         rbd_dev->image_format = 2;
5760         }
5761
5762         if (!ret) {
5763                 rbd_dev->spec->image_id = image_id;
5764                 dout("image_id is %s\n", image_id);
5765         }
5766 out:
5767         kfree(response);
5768         ceph_oid_destroy(&oid);
5769         return ret;
5770 }
5771
5772 /*
5773  * Undo whatever state changes are made by v1 or v2 header info
5774  * call.
5775  */
5776 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5777 {
5778         struct rbd_image_header *header;
5779
5780         rbd_dev_parent_put(rbd_dev);
5781
5782         /* Free dynamic fields from the header, then zero it out */
5783
5784         header = &rbd_dev->header;
5785         ceph_put_snap_context(header->snapc);
5786         kfree(header->snap_sizes);
5787         kfree(header->snap_names);
5788         kfree(header->object_prefix);
5789         memset(header, 0, sizeof (*header));
5790 }
5791
5792 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5793 {
5794         int ret;
5795
5796         ret = rbd_dev_v2_object_prefix(rbd_dev);
5797         if (ret)
5798                 goto out_err;
5799
5800         /*
5801          * Get the and check features for the image.  Currently the
5802          * features are assumed to never change.
5803          */
5804         ret = rbd_dev_v2_features(rbd_dev);
5805         if (ret)
5806                 goto out_err;
5807
5808         /* If the image supports fancy striping, get its parameters */
5809
5810         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5811                 ret = rbd_dev_v2_striping_info(rbd_dev);
5812                 if (ret < 0)
5813                         goto out_err;
5814         }
5815
5816         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5817                 ret = rbd_dev_v2_data_pool(rbd_dev);
5818                 if (ret)
5819                         goto out_err;
5820         }
5821
5822         rbd_init_layout(rbd_dev);
5823         return 0;
5824
5825 out_err:
5826         rbd_dev->header.features = 0;
5827         kfree(rbd_dev->header.object_prefix);
5828         rbd_dev->header.object_prefix = NULL;
5829         return ret;
5830 }
5831
5832 /*
5833  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5834  * rbd_dev_image_probe() recursion depth, which means it's also the
5835  * length of the already discovered part of the parent chain.
5836  */
5837 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5838 {
5839         struct rbd_device *parent = NULL;
5840         int ret;
5841
5842         if (!rbd_dev->parent_spec)
5843                 return 0;
5844
5845         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5846                 pr_info("parent chain is too long (%d)\n", depth);
5847                 ret = -EINVAL;
5848                 goto out_err;
5849         }
5850
5851         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5852         if (!parent) {
5853                 ret = -ENOMEM;
5854                 goto out_err;
5855         }
5856
5857         /*
5858          * Images related by parent/child relationships always share
5859          * rbd_client and spec/parent_spec, so bump their refcounts.
5860          */
5861         __rbd_get_client(rbd_dev->rbd_client);
5862         rbd_spec_get(rbd_dev->parent_spec);
5863
5864         ret = rbd_dev_image_probe(parent, depth);
5865         if (ret < 0)
5866                 goto out_err;
5867
5868         rbd_dev->parent = parent;
5869         atomic_set(&rbd_dev->parent_ref, 1);
5870         return 0;
5871
5872 out_err:
5873         rbd_dev_unparent(rbd_dev);
5874         rbd_dev_destroy(parent);
5875         return ret;
5876 }
5877
5878 /*
5879  * rbd_dev->header_rwsem must be locked for write and will be unlocked
5880  * upon return.
5881  */
5882 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5883 {
5884         int ret;
5885
5886         /* Record our major and minor device numbers. */
5887
5888         if (!single_major) {
5889                 ret = register_blkdev(0, rbd_dev->name);
5890                 if (ret < 0)
5891                         goto err_out_unlock;
5892
5893                 rbd_dev->major = ret;
5894                 rbd_dev->minor = 0;
5895         } else {
5896                 rbd_dev->major = rbd_major;
5897                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5898         }
5899
5900         /* Set up the blkdev mapping. */
5901
5902         ret = rbd_init_disk(rbd_dev);
5903         if (ret)
5904                 goto err_out_blkdev;
5905
5906         ret = rbd_dev_mapping_set(rbd_dev);
5907         if (ret)
5908                 goto err_out_disk;
5909
5910         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5911         set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5912
5913         dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5914         ret = device_add(&rbd_dev->dev);
5915         if (ret)
5916                 goto err_out_mapping;
5917
5918         /* Everything's ready.  Announce the disk to the world. */
5919
5920         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5921         up_write(&rbd_dev->header_rwsem);
5922
5923         spin_lock(&rbd_dev_list_lock);
5924         list_add_tail(&rbd_dev->node, &rbd_dev_list);
5925         spin_unlock(&rbd_dev_list_lock);
5926
5927         add_disk(rbd_dev->disk);
5928         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5929                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5930                 rbd_dev->header.features);
5931
5932         return ret;
5933
5934 err_out_mapping:
5935         rbd_dev_mapping_clear(rbd_dev);
5936 err_out_disk:
5937         rbd_free_disk(rbd_dev);
5938 err_out_blkdev:
5939         if (!single_major)
5940                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5941 err_out_unlock:
5942         up_write(&rbd_dev->header_rwsem);
5943         return ret;
5944 }
5945
5946 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5947 {
5948         struct rbd_spec *spec = rbd_dev->spec;
5949         int ret;
5950
5951         /* Record the header object name for this rbd image. */
5952
5953         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5954         if (rbd_dev->image_format == 1)
5955                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5956                                        spec->image_name, RBD_SUFFIX);
5957         else
5958                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5959                                        RBD_HEADER_PREFIX, spec->image_id);
5960
5961         return ret;
5962 }
5963
5964 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5965 {
5966         rbd_dev_unprobe(rbd_dev);
5967         rbd_dev->image_format = 0;
5968         kfree(rbd_dev->spec->image_id);
5969         rbd_dev->spec->image_id = NULL;
5970
5971         rbd_dev_destroy(rbd_dev);
5972 }
5973
5974 /*
5975  * Probe for the existence of the header object for the given rbd
5976  * device.  If this image is the one being mapped (i.e., not a
5977  * parent), initiate a watch on its header object before using that
5978  * object to get detailed information about the rbd image.
5979  */
5980 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
5981 {
5982         int ret;
5983
5984         /*
5985          * Get the id from the image id object.  Unless there's an
5986          * error, rbd_dev->spec->image_id will be filled in with
5987          * a dynamically-allocated string, and rbd_dev->image_format
5988          * will be set to either 1 or 2.
5989          */
5990         ret = rbd_dev_image_id(rbd_dev);
5991         if (ret)
5992                 return ret;
5993
5994         ret = rbd_dev_header_name(rbd_dev);
5995         if (ret)
5996                 goto err_out_format;
5997
5998         if (!depth) {
5999                 ret = rbd_register_watch(rbd_dev);
6000                 if (ret) {
6001                         if (ret == -ENOENT)
6002                                 pr_info("image %s/%s does not exist\n",
6003                                         rbd_dev->spec->pool_name,
6004                                         rbd_dev->spec->image_name);
6005                         goto err_out_format;
6006                 }
6007         }
6008
6009         ret = rbd_dev_header_info(rbd_dev);
6010         if (ret)
6011                 goto err_out_watch;
6012
6013         /*
6014          * If this image is the one being mapped, we have pool name and
6015          * id, image name and id, and snap name - need to fill snap id.
6016          * Otherwise this is a parent image, identified by pool, image
6017          * and snap ids - need to fill in names for those ids.
6018          */
6019         if (!depth)
6020                 ret = rbd_spec_fill_snap_id(rbd_dev);
6021         else
6022                 ret = rbd_spec_fill_names(rbd_dev);
6023         if (ret) {
6024                 if (ret == -ENOENT)
6025                         pr_info("snap %s/%s@%s does not exist\n",
6026                                 rbd_dev->spec->pool_name,
6027                                 rbd_dev->spec->image_name,
6028                                 rbd_dev->spec->snap_name);
6029                 goto err_out_probe;
6030         }
6031
6032         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6033                 ret = rbd_dev_v2_parent_info(rbd_dev);
6034                 if (ret)
6035                         goto err_out_probe;
6036
6037                 /*
6038                  * Need to warn users if this image is the one being
6039                  * mapped and has a parent.
6040                  */
6041                 if (!depth && rbd_dev->parent_spec)
6042                         rbd_warn(rbd_dev,
6043                                  "WARNING: kernel layering is EXPERIMENTAL!");
6044         }
6045
6046         ret = rbd_dev_probe_parent(rbd_dev, depth);
6047         if (ret)
6048                 goto err_out_probe;
6049
6050         dout("discovered format %u image, header name is %s\n",
6051                 rbd_dev->image_format, rbd_dev->header_oid.name);
6052         return 0;
6053
6054 err_out_probe:
6055         rbd_dev_unprobe(rbd_dev);
6056 err_out_watch:
6057         if (!depth)
6058                 rbd_unregister_watch(rbd_dev);
6059 err_out_format:
6060         rbd_dev->image_format = 0;
6061         kfree(rbd_dev->spec->image_id);
6062         rbd_dev->spec->image_id = NULL;
6063         return ret;
6064 }
6065
6066 static ssize_t do_rbd_add(struct bus_type *bus,
6067                           const char *buf,
6068                           size_t count)
6069 {
6070         struct rbd_device *rbd_dev = NULL;
6071         struct ceph_options *ceph_opts = NULL;
6072         struct rbd_options *rbd_opts = NULL;
6073         struct rbd_spec *spec = NULL;
6074         struct rbd_client *rbdc;
6075         bool read_only;
6076         int rc;
6077
6078         if (!try_module_get(THIS_MODULE))
6079                 return -ENODEV;
6080
6081         /* parse add command */
6082         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
6083         if (rc < 0)
6084                 goto out;
6085
6086         rbdc = rbd_get_client(ceph_opts);
6087         if (IS_ERR(rbdc)) {
6088                 rc = PTR_ERR(rbdc);
6089                 goto err_out_args;
6090         }
6091
6092         /* pick the pool */
6093         rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
6094         if (rc < 0) {
6095                 if (rc == -ENOENT)
6096                         pr_info("pool %s does not exist\n", spec->pool_name);
6097                 goto err_out_client;
6098         }
6099         spec->pool_id = (u64)rc;
6100
6101         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
6102         if (!rbd_dev) {
6103                 rc = -ENOMEM;
6104                 goto err_out_client;
6105         }
6106         rbdc = NULL;            /* rbd_dev now owns this */
6107         spec = NULL;            /* rbd_dev now owns this */
6108         rbd_opts = NULL;        /* rbd_dev now owns this */
6109
6110         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6111         if (!rbd_dev->config_info) {
6112                 rc = -ENOMEM;
6113                 goto err_out_rbd_dev;
6114         }
6115
6116         down_write(&rbd_dev->header_rwsem);
6117         rc = rbd_dev_image_probe(rbd_dev, 0);
6118         if (rc < 0) {
6119                 up_write(&rbd_dev->header_rwsem);
6120                 goto err_out_rbd_dev;
6121         }
6122
6123         /* If we are mapping a snapshot it must be marked read-only */
6124
6125         read_only = rbd_dev->opts->read_only;
6126         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6127                 read_only = true;
6128         rbd_dev->mapping.read_only = read_only;
6129
6130         rc = rbd_dev_device_setup(rbd_dev);
6131         if (rc) {
6132                 /*
6133                  * rbd_unregister_watch() can't be moved into
6134                  * rbd_dev_image_release() without refactoring, see
6135                  * commit 1f3ef78861ac.
6136                  */
6137                 rbd_unregister_watch(rbd_dev);
6138                 rbd_dev_image_release(rbd_dev);
6139                 goto out;
6140         }
6141
6142         rc = count;
6143 out:
6144         module_put(THIS_MODULE);
6145         return rc;
6146
6147 err_out_rbd_dev:
6148         rbd_dev_destroy(rbd_dev);
6149 err_out_client:
6150         rbd_put_client(rbdc);
6151 err_out_args:
6152         rbd_spec_put(spec);
6153         kfree(rbd_opts);
6154         goto out;
6155 }
6156
6157 static ssize_t rbd_add(struct bus_type *bus,
6158                        const char *buf,
6159                        size_t count)
6160 {
6161         if (single_major)
6162                 return -EINVAL;
6163
6164         return do_rbd_add(bus, buf, count);
6165 }
6166
6167 static ssize_t rbd_add_single_major(struct bus_type *bus,
6168                                     const char *buf,
6169                                     size_t count)
6170 {
6171         return do_rbd_add(bus, buf, count);
6172 }
6173
6174 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6175 {
6176         rbd_free_disk(rbd_dev);
6177
6178         spin_lock(&rbd_dev_list_lock);
6179         list_del_init(&rbd_dev->node);
6180         spin_unlock(&rbd_dev_list_lock);
6181
6182         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6183         device_del(&rbd_dev->dev);
6184         rbd_dev_mapping_clear(rbd_dev);
6185         if (!single_major)
6186                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6187 }
6188
6189 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6190 {
6191         while (rbd_dev->parent) {
6192                 struct rbd_device *first = rbd_dev;
6193                 struct rbd_device *second = first->parent;
6194                 struct rbd_device *third;
6195
6196                 /*
6197                  * Follow to the parent with no grandparent and
6198                  * remove it.
6199                  */
6200                 while (second && (third = second->parent)) {
6201                         first = second;
6202                         second = third;
6203                 }
6204                 rbd_assert(second);
6205                 rbd_dev_image_release(second);
6206                 first->parent = NULL;
6207                 first->parent_overlap = 0;
6208
6209                 rbd_assert(first->parent_spec);
6210                 rbd_spec_put(first->parent_spec);
6211                 first->parent_spec = NULL;
6212         }
6213 }
6214
6215 static ssize_t do_rbd_remove(struct bus_type *bus,
6216                              const char *buf,
6217                              size_t count)
6218 {
6219         struct rbd_device *rbd_dev = NULL;
6220         struct list_head *tmp;
6221         int dev_id;
6222         char opt_buf[6];
6223         bool already = false;
6224         bool force = false;
6225         int ret;
6226
6227         dev_id = -1;
6228         opt_buf[0] = '\0';
6229         sscanf(buf, "%d %5s", &dev_id, opt_buf);
6230         if (dev_id < 0) {
6231                 pr_err("dev_id out of range\n");
6232                 return -EINVAL;
6233         }
6234         if (opt_buf[0] != '\0') {
6235                 if (!strcmp(opt_buf, "force")) {
6236                         force = true;
6237                 } else {
6238                         pr_err("bad remove option at '%s'\n", opt_buf);
6239                         return -EINVAL;
6240                 }
6241         }
6242
6243         ret = -ENOENT;
6244         spin_lock(&rbd_dev_list_lock);
6245         list_for_each(tmp, &rbd_dev_list) {
6246                 rbd_dev = list_entry(tmp, struct rbd_device, node);
6247                 if (rbd_dev->dev_id == dev_id) {
6248                         ret = 0;
6249                         break;
6250                 }
6251         }
6252         if (!ret) {
6253                 spin_lock_irq(&rbd_dev->lock);
6254                 if (rbd_dev->open_count && !force)
6255                         ret = -EBUSY;
6256                 else
6257                         already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6258                                                         &rbd_dev->flags);
6259                 spin_unlock_irq(&rbd_dev->lock);
6260         }
6261         spin_unlock(&rbd_dev_list_lock);
6262         if (ret < 0 || already)
6263                 return ret;
6264
6265         if (force) {
6266                 /*
6267                  * Prevent new IO from being queued and wait for existing
6268                  * IO to complete/fail.
6269                  */
6270                 blk_mq_freeze_queue(rbd_dev->disk->queue);
6271                 blk_set_queue_dying(rbd_dev->disk->queue);
6272         }
6273
6274         down_write(&rbd_dev->lock_rwsem);
6275         if (__rbd_is_lock_owner(rbd_dev))
6276                 rbd_unlock(rbd_dev);
6277         up_write(&rbd_dev->lock_rwsem);
6278         rbd_unregister_watch(rbd_dev);
6279
6280         /*
6281          * Don't free anything from rbd_dev->disk until after all
6282          * notifies are completely processed. Otherwise
6283          * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
6284          * in a potential use after free of rbd_dev->disk or rbd_dev.
6285          */
6286         rbd_dev_device_release(rbd_dev);
6287         rbd_dev_image_release(rbd_dev);
6288
6289         return count;
6290 }
6291
6292 static ssize_t rbd_remove(struct bus_type *bus,
6293                           const char *buf,
6294                           size_t count)
6295 {
6296         if (single_major)
6297                 return -EINVAL;
6298
6299         return do_rbd_remove(bus, buf, count);
6300 }
6301
6302 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6303                                        const char *buf,
6304                                        size_t count)
6305 {
6306         return do_rbd_remove(bus, buf, count);
6307 }
6308
6309 /*
6310  * create control files in sysfs
6311  * /sys/bus/rbd/...
6312  */
6313 static int rbd_sysfs_init(void)
6314 {
6315         int ret;
6316
6317         ret = device_register(&rbd_root_dev);
6318         if (ret < 0)
6319                 return ret;
6320
6321         ret = bus_register(&rbd_bus_type);
6322         if (ret < 0)
6323                 device_unregister(&rbd_root_dev);
6324
6325         return ret;
6326 }
6327
6328 static void rbd_sysfs_cleanup(void)
6329 {
6330         bus_unregister(&rbd_bus_type);
6331         device_unregister(&rbd_root_dev);
6332 }
6333
6334 static int rbd_slab_init(void)
6335 {
6336         rbd_assert(!rbd_img_request_cache);
6337         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6338         if (!rbd_img_request_cache)
6339                 return -ENOMEM;
6340
6341         rbd_assert(!rbd_obj_request_cache);
6342         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6343         if (!rbd_obj_request_cache)
6344                 goto out_err;
6345
6346         return 0;
6347
6348 out_err:
6349         kmem_cache_destroy(rbd_img_request_cache);
6350         rbd_img_request_cache = NULL;
6351         return -ENOMEM;
6352 }
6353
6354 static void rbd_slab_exit(void)
6355 {
6356         rbd_assert(rbd_obj_request_cache);
6357         kmem_cache_destroy(rbd_obj_request_cache);
6358         rbd_obj_request_cache = NULL;
6359
6360         rbd_assert(rbd_img_request_cache);
6361         kmem_cache_destroy(rbd_img_request_cache);
6362         rbd_img_request_cache = NULL;
6363 }
6364
6365 static int __init rbd_init(void)
6366 {
6367         int rc;
6368
6369         if (!libceph_compatible(NULL)) {
6370                 rbd_warn(NULL, "libceph incompatibility (quitting)");
6371                 return -EINVAL;
6372         }
6373
6374         rc = rbd_slab_init();
6375         if (rc)
6376                 return rc;
6377
6378         /*
6379          * The number of active work items is limited by the number of
6380          * rbd devices * queue depth, so leave @max_active at default.
6381          */
6382         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6383         if (!rbd_wq) {
6384                 rc = -ENOMEM;
6385                 goto err_out_slab;
6386         }
6387
6388         if (single_major) {
6389                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6390                 if (rbd_major < 0) {
6391                         rc = rbd_major;
6392                         goto err_out_wq;
6393                 }
6394         }
6395
6396         rc = rbd_sysfs_init();
6397         if (rc)
6398                 goto err_out_blkdev;
6399
6400         if (single_major)
6401                 pr_info("loaded (major %d)\n", rbd_major);
6402         else
6403                 pr_info("loaded\n");
6404
6405         return 0;
6406
6407 err_out_blkdev:
6408         if (single_major)
6409                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6410 err_out_wq:
6411         destroy_workqueue(rbd_wq);
6412 err_out_slab:
6413         rbd_slab_exit();
6414         return rc;
6415 }
6416
6417 static void __exit rbd_exit(void)
6418 {
6419         ida_destroy(&rbd_dev_id_ida);
6420         rbd_sysfs_cleanup();
6421         if (single_major)
6422                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6423         destroy_workqueue(rbd_wq);
6424         rbd_slab_exit();
6425 }
6426
6427 module_init(rbd_init);
6428 module_exit(rbd_exit);
6429
6430 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6431 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6432 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6433 /* following authorship retained from original osdblk.c */
6434 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6435
6436 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6437 MODULE_LICENSE("GPL");