Merge tag 'trace-v5.0-rc3' of git://git.kernel.org/pub/scm/linux/kernel/git/rostedt...
[sfrench/cifs-2.6.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/parser.h>
38 #include <linux/bsearch.h>
39
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
44 #include <linux/fs.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
49
50 #include "rbd_types.h"
51
52 #define RBD_DEBUG       /* Activate rbd_assert() calls */
53
54 /*
55  * Increment the given counter and return its updated value.
56  * If the counter is already 0 it will not be incremented.
57  * If the counter is already at its maximum value returns
58  * -EINVAL without updating it.
59  */
60 static int atomic_inc_return_safe(atomic_t *v)
61 {
62         unsigned int counter;
63
64         counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65         if (counter <= (unsigned int)INT_MAX)
66                 return (int)counter;
67
68         atomic_dec(v);
69
70         return -EINVAL;
71 }
72
73 /* Decrement the counter.  Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
75 {
76         int counter;
77
78         counter = atomic_dec_return(v);
79         if (counter >= 0)
80                 return counter;
81
82         atomic_inc(v);
83
84         return -EINVAL;
85 }
86
87 #define RBD_DRV_NAME "rbd"
88
89 #define RBD_MINORS_PER_MAJOR            256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
91
92 #define RBD_MAX_PARENT_CHAIN_LEN        16
93
94 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN   \
96                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
99
100 #define RBD_SNAP_HEAD_NAME      "-"
101
102 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
103
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX    64
107
108 #define RBD_OBJ_PREFIX_LEN_MAX  64
109
110 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
111 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
112
113 /* Feature bits */
114
115 #define RBD_FEATURE_LAYERING            (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
118 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
119 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
120
121 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
122                                  RBD_FEATURE_STRIPINGV2 |       \
123                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
124                                  RBD_FEATURE_DATA_POOL |        \
125                                  RBD_FEATURE_OPERATIONS)
126
127 /* Features supported by this (client software) implementation. */
128
129 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
130
131 /*
132  * An RBD device name will be "rbd#", where the "rbd" comes from
133  * RBD_DRV_NAME above, and # is a unique integer identifier.
134  */
135 #define DEV_NAME_LEN            32
136
137 /*
138  * block device image metadata (in-memory version)
139  */
140 struct rbd_image_header {
141         /* These six fields never change for a given rbd image */
142         char *object_prefix;
143         __u8 obj_order;
144         u64 stripe_unit;
145         u64 stripe_count;
146         s64 data_pool_id;
147         u64 features;           /* Might be changeable someday? */
148
149         /* The remaining fields need to be updated occasionally */
150         u64 image_size;
151         struct ceph_snap_context *snapc;
152         char *snap_names;       /* format 1 only */
153         u64 *snap_sizes;        /* format 1 only */
154 };
155
156 /*
157  * An rbd image specification.
158  *
159  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
160  * identify an image.  Each rbd_dev structure includes a pointer to
161  * an rbd_spec structure that encapsulates this identity.
162  *
163  * Each of the id's in an rbd_spec has an associated name.  For a
164  * user-mapped image, the names are supplied and the id's associated
165  * with them are looked up.  For a layered image, a parent image is
166  * defined by the tuple, and the names are looked up.
167  *
168  * An rbd_dev structure contains a parent_spec pointer which is
169  * non-null if the image it represents is a child in a layered
170  * image.  This pointer will refer to the rbd_spec structure used
171  * by the parent rbd_dev for its own identity (i.e., the structure
172  * is shared between the parent and child).
173  *
174  * Since these structures are populated once, during the discovery
175  * phase of image construction, they are effectively immutable so
176  * we make no effort to synchronize access to them.
177  *
178  * Note that code herein does not assume the image name is known (it
179  * could be a null pointer).
180  */
181 struct rbd_spec {
182         u64             pool_id;
183         const char      *pool_name;
184         const char      *pool_ns;       /* NULL if default, never "" */
185
186         const char      *image_id;
187         const char      *image_name;
188
189         u64             snap_id;
190         const char      *snap_name;
191
192         struct kref     kref;
193 };
194
195 /*
196  * an instance of the client.  multiple devices may share an rbd client.
197  */
198 struct rbd_client {
199         struct ceph_client      *client;
200         struct kref             kref;
201         struct list_head        node;
202 };
203
204 struct rbd_img_request;
205
206 enum obj_request_type {
207         OBJ_REQUEST_NODATA = 1,
208         OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
209         OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
210         OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
211 };
212
213 enum obj_operation_type {
214         OBJ_OP_READ = 1,
215         OBJ_OP_WRITE,
216         OBJ_OP_DISCARD,
217 };
218
219 /*
220  * Writes go through the following state machine to deal with
221  * layering:
222  *
223  *                       need copyup
224  * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
225  *        |     ^                              |
226  *        v     \------------------------------/
227  *      done
228  *        ^
229  *        |
230  * RBD_OBJ_WRITE_FLAT
231  *
232  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
233  * there is a parent or not.
234  */
235 enum rbd_obj_write_state {
236         RBD_OBJ_WRITE_FLAT = 1,
237         RBD_OBJ_WRITE_GUARD,
238         RBD_OBJ_WRITE_COPYUP,
239 };
240
241 struct rbd_obj_request {
242         struct ceph_object_extent ex;
243         union {
244                 bool                    tried_parent;   /* for reads */
245                 enum rbd_obj_write_state write_state;   /* for writes */
246         };
247
248         struct rbd_img_request  *img_request;
249         struct ceph_file_extent *img_extents;
250         u32                     num_img_extents;
251
252         union {
253                 struct ceph_bio_iter    bio_pos;
254                 struct {
255                         struct ceph_bvec_iter   bvec_pos;
256                         u32                     bvec_count;
257                         u32                     bvec_idx;
258                 };
259         };
260         struct bio_vec          *copyup_bvecs;
261         u32                     copyup_bvec_count;
262
263         struct ceph_osd_request *osd_req;
264
265         u64                     xferred;        /* bytes transferred */
266         int                     result;
267
268         struct kref             kref;
269 };
270
271 enum img_req_flags {
272         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
273         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
274 };
275
276 struct rbd_img_request {
277         struct rbd_device       *rbd_dev;
278         enum obj_operation_type op_type;
279         enum obj_request_type   data_type;
280         unsigned long           flags;
281         union {
282                 u64                     snap_id;        /* for reads */
283                 struct ceph_snap_context *snapc;        /* for writes */
284         };
285         union {
286                 struct request          *rq;            /* block request */
287                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
288         };
289         spinlock_t              completion_lock;
290         u64                     xferred;/* aggregate bytes transferred */
291         int                     result; /* first nonzero obj_request result */
292
293         struct list_head        object_extents; /* obj_req.ex structs */
294         u32                     obj_request_count;
295         u32                     pending_count;
296
297         struct kref             kref;
298 };
299
300 #define for_each_obj_request(ireq, oreq) \
301         list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
302 #define for_each_obj_request_safe(ireq, oreq, n) \
303         list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
304
305 enum rbd_watch_state {
306         RBD_WATCH_STATE_UNREGISTERED,
307         RBD_WATCH_STATE_REGISTERED,
308         RBD_WATCH_STATE_ERROR,
309 };
310
311 enum rbd_lock_state {
312         RBD_LOCK_STATE_UNLOCKED,
313         RBD_LOCK_STATE_LOCKED,
314         RBD_LOCK_STATE_RELEASING,
315 };
316
317 /* WatchNotify::ClientId */
318 struct rbd_client_id {
319         u64 gid;
320         u64 handle;
321 };
322
323 struct rbd_mapping {
324         u64                     size;
325         u64                     features;
326 };
327
328 /*
329  * a single device
330  */
331 struct rbd_device {
332         int                     dev_id;         /* blkdev unique id */
333
334         int                     major;          /* blkdev assigned major */
335         int                     minor;
336         struct gendisk          *disk;          /* blkdev's gendisk and rq */
337
338         u32                     image_format;   /* Either 1 or 2 */
339         struct rbd_client       *rbd_client;
340
341         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342
343         spinlock_t              lock;           /* queue, flags, open_count */
344
345         struct rbd_image_header header;
346         unsigned long           flags;          /* possibly lock protected */
347         struct rbd_spec         *spec;
348         struct rbd_options      *opts;
349         char                    *config_info;   /* add{,_single_major} string */
350
351         struct ceph_object_id   header_oid;
352         struct ceph_object_locator header_oloc;
353
354         struct ceph_file_layout layout;         /* used for all rbd requests */
355
356         struct mutex            watch_mutex;
357         enum rbd_watch_state    watch_state;
358         struct ceph_osd_linger_request *watch_handle;
359         u64                     watch_cookie;
360         struct delayed_work     watch_dwork;
361
362         struct rw_semaphore     lock_rwsem;
363         enum rbd_lock_state     lock_state;
364         char                    lock_cookie[32];
365         struct rbd_client_id    owner_cid;
366         struct work_struct      acquired_lock_work;
367         struct work_struct      released_lock_work;
368         struct delayed_work     lock_dwork;
369         struct work_struct      unlock_work;
370         wait_queue_head_t       lock_waitq;
371
372         struct workqueue_struct *task_wq;
373
374         struct rbd_spec         *parent_spec;
375         u64                     parent_overlap;
376         atomic_t                parent_ref;
377         struct rbd_device       *parent;
378
379         /* Block layer tags. */
380         struct blk_mq_tag_set   tag_set;
381
382         /* protects updating the header */
383         struct rw_semaphore     header_rwsem;
384
385         struct rbd_mapping      mapping;
386
387         struct list_head        node;
388
389         /* sysfs related */
390         struct device           dev;
391         unsigned long           open_count;     /* protected by lock */
392 };
393
394 /*
395  * Flag bits for rbd_dev->flags:
396  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
397  *   by rbd_dev->lock
398  * - BLACKLISTED is protected by rbd_dev->lock_rwsem
399  */
400 enum rbd_dev_flags {
401         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
402         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
403         RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
404 };
405
406 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
407
408 static LIST_HEAD(rbd_dev_list);    /* devices */
409 static DEFINE_SPINLOCK(rbd_dev_list_lock);
410
411 static LIST_HEAD(rbd_client_list);              /* clients */
412 static DEFINE_SPINLOCK(rbd_client_list_lock);
413
414 /* Slab caches for frequently-allocated structures */
415
416 static struct kmem_cache        *rbd_img_request_cache;
417 static struct kmem_cache        *rbd_obj_request_cache;
418
419 static int rbd_major;
420 static DEFINE_IDA(rbd_dev_id_ida);
421
422 static struct workqueue_struct *rbd_wq;
423
424 /*
425  * single-major requires >= 0.75 version of userspace rbd utility.
426  */
427 static bool single_major = true;
428 module_param(single_major, bool, 0444);
429 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
430
431 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
432                        size_t count);
433 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
434                           size_t count);
435 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
436                                     size_t count);
437 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
438                                        size_t count);
439 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
440
441 static int rbd_dev_id_to_minor(int dev_id)
442 {
443         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
444 }
445
446 static int minor_to_rbd_dev_id(int minor)
447 {
448         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
449 }
450
451 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
452 {
453         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
454                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
455 }
456
457 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
458 {
459         bool is_lock_owner;
460
461         down_read(&rbd_dev->lock_rwsem);
462         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
463         up_read(&rbd_dev->lock_rwsem);
464         return is_lock_owner;
465 }
466
467 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
468 {
469         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
470 }
471
472 static BUS_ATTR(add, 0200, NULL, rbd_add);
473 static BUS_ATTR(remove, 0200, NULL, rbd_remove);
474 static BUS_ATTR(add_single_major, 0200, NULL, rbd_add_single_major);
475 static BUS_ATTR(remove_single_major, 0200, NULL, rbd_remove_single_major);
476 static BUS_ATTR(supported_features, 0444, rbd_supported_features_show, NULL);
477
478 static struct attribute *rbd_bus_attrs[] = {
479         &bus_attr_add.attr,
480         &bus_attr_remove.attr,
481         &bus_attr_add_single_major.attr,
482         &bus_attr_remove_single_major.attr,
483         &bus_attr_supported_features.attr,
484         NULL,
485 };
486
487 static umode_t rbd_bus_is_visible(struct kobject *kobj,
488                                   struct attribute *attr, int index)
489 {
490         if (!single_major &&
491             (attr == &bus_attr_add_single_major.attr ||
492              attr == &bus_attr_remove_single_major.attr))
493                 return 0;
494
495         return attr->mode;
496 }
497
498 static const struct attribute_group rbd_bus_group = {
499         .attrs = rbd_bus_attrs,
500         .is_visible = rbd_bus_is_visible,
501 };
502 __ATTRIBUTE_GROUPS(rbd_bus);
503
504 static struct bus_type rbd_bus_type = {
505         .name           = "rbd",
506         .bus_groups     = rbd_bus_groups,
507 };
508
509 static void rbd_root_dev_release(struct device *dev)
510 {
511 }
512
513 static struct device rbd_root_dev = {
514         .init_name =    "rbd",
515         .release =      rbd_root_dev_release,
516 };
517
518 static __printf(2, 3)
519 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
520 {
521         struct va_format vaf;
522         va_list args;
523
524         va_start(args, fmt);
525         vaf.fmt = fmt;
526         vaf.va = &args;
527
528         if (!rbd_dev)
529                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
530         else if (rbd_dev->disk)
531                 printk(KERN_WARNING "%s: %s: %pV\n",
532                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
533         else if (rbd_dev->spec && rbd_dev->spec->image_name)
534                 printk(KERN_WARNING "%s: image %s: %pV\n",
535                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
536         else if (rbd_dev->spec && rbd_dev->spec->image_id)
537                 printk(KERN_WARNING "%s: id %s: %pV\n",
538                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
539         else    /* punt */
540                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
541                         RBD_DRV_NAME, rbd_dev, &vaf);
542         va_end(args);
543 }
544
545 #ifdef RBD_DEBUG
546 #define rbd_assert(expr)                                                \
547                 if (unlikely(!(expr))) {                                \
548                         printk(KERN_ERR "\nAssertion failure in %s() "  \
549                                                 "at line %d:\n\n"       \
550                                         "\trbd_assert(%s);\n\n",        \
551                                         __func__, __LINE__, #expr);     \
552                         BUG();                                          \
553                 }
554 #else /* !RBD_DEBUG */
555 #  define rbd_assert(expr)      ((void) 0)
556 #endif /* !RBD_DEBUG */
557
558 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
559
560 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
561 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
562 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
563 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
564 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
565                                         u64 snap_id);
566 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
567                                 u8 *order, u64 *snap_size);
568 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
569                 u64 *snap_features);
570
571 static int rbd_open(struct block_device *bdev, fmode_t mode)
572 {
573         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
574         bool removing = false;
575
576         spin_lock_irq(&rbd_dev->lock);
577         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
578                 removing = true;
579         else
580                 rbd_dev->open_count++;
581         spin_unlock_irq(&rbd_dev->lock);
582         if (removing)
583                 return -ENOENT;
584
585         (void) get_device(&rbd_dev->dev);
586
587         return 0;
588 }
589
590 static void rbd_release(struct gendisk *disk, fmode_t mode)
591 {
592         struct rbd_device *rbd_dev = disk->private_data;
593         unsigned long open_count_before;
594
595         spin_lock_irq(&rbd_dev->lock);
596         open_count_before = rbd_dev->open_count--;
597         spin_unlock_irq(&rbd_dev->lock);
598         rbd_assert(open_count_before > 0);
599
600         put_device(&rbd_dev->dev);
601 }
602
603 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
604 {
605         int ro;
606
607         if (get_user(ro, (int __user *)arg))
608                 return -EFAULT;
609
610         /* Snapshots can't be marked read-write */
611         if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
612                 return -EROFS;
613
614         /* Let blkdev_roset() handle it */
615         return -ENOTTY;
616 }
617
618 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
619                         unsigned int cmd, unsigned long arg)
620 {
621         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
622         int ret;
623
624         switch (cmd) {
625         case BLKROSET:
626                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
627                 break;
628         default:
629                 ret = -ENOTTY;
630         }
631
632         return ret;
633 }
634
635 #ifdef CONFIG_COMPAT
636 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
637                                 unsigned int cmd, unsigned long arg)
638 {
639         return rbd_ioctl(bdev, mode, cmd, arg);
640 }
641 #endif /* CONFIG_COMPAT */
642
643 static const struct block_device_operations rbd_bd_ops = {
644         .owner                  = THIS_MODULE,
645         .open                   = rbd_open,
646         .release                = rbd_release,
647         .ioctl                  = rbd_ioctl,
648 #ifdef CONFIG_COMPAT
649         .compat_ioctl           = rbd_compat_ioctl,
650 #endif
651 };
652
653 /*
654  * Initialize an rbd client instance.  Success or not, this function
655  * consumes ceph_opts.  Caller holds client_mutex.
656  */
657 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
658 {
659         struct rbd_client *rbdc;
660         int ret = -ENOMEM;
661
662         dout("%s:\n", __func__);
663         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
664         if (!rbdc)
665                 goto out_opt;
666
667         kref_init(&rbdc->kref);
668         INIT_LIST_HEAD(&rbdc->node);
669
670         rbdc->client = ceph_create_client(ceph_opts, rbdc);
671         if (IS_ERR(rbdc->client))
672                 goto out_rbdc;
673         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
674
675         ret = ceph_open_session(rbdc->client);
676         if (ret < 0)
677                 goto out_client;
678
679         spin_lock(&rbd_client_list_lock);
680         list_add_tail(&rbdc->node, &rbd_client_list);
681         spin_unlock(&rbd_client_list_lock);
682
683         dout("%s: rbdc %p\n", __func__, rbdc);
684
685         return rbdc;
686 out_client:
687         ceph_destroy_client(rbdc->client);
688 out_rbdc:
689         kfree(rbdc);
690 out_opt:
691         if (ceph_opts)
692                 ceph_destroy_options(ceph_opts);
693         dout("%s: error %d\n", __func__, ret);
694
695         return ERR_PTR(ret);
696 }
697
698 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
699 {
700         kref_get(&rbdc->kref);
701
702         return rbdc;
703 }
704
705 /*
706  * Find a ceph client with specific addr and configuration.  If
707  * found, bump its reference count.
708  */
709 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
710 {
711         struct rbd_client *client_node;
712         bool found = false;
713
714         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
715                 return NULL;
716
717         spin_lock(&rbd_client_list_lock);
718         list_for_each_entry(client_node, &rbd_client_list, node) {
719                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
720                         __rbd_get_client(client_node);
721
722                         found = true;
723                         break;
724                 }
725         }
726         spin_unlock(&rbd_client_list_lock);
727
728         return found ? client_node : NULL;
729 }
730
731 /*
732  * (Per device) rbd map options
733  */
734 enum {
735         Opt_queue_depth,
736         Opt_lock_timeout,
737         Opt_last_int,
738         /* int args above */
739         Opt_pool_ns,
740         Opt_last_string,
741         /* string args above */
742         Opt_read_only,
743         Opt_read_write,
744         Opt_lock_on_read,
745         Opt_exclusive,
746         Opt_notrim,
747         Opt_err
748 };
749
750 static match_table_t rbd_opts_tokens = {
751         {Opt_queue_depth, "queue_depth=%d"},
752         {Opt_lock_timeout, "lock_timeout=%d"},
753         /* int args above */
754         {Opt_pool_ns, "_pool_ns=%s"},
755         /* string args above */
756         {Opt_read_only, "read_only"},
757         {Opt_read_only, "ro"},          /* Alternate spelling */
758         {Opt_read_write, "read_write"},
759         {Opt_read_write, "rw"},         /* Alternate spelling */
760         {Opt_lock_on_read, "lock_on_read"},
761         {Opt_exclusive, "exclusive"},
762         {Opt_notrim, "notrim"},
763         {Opt_err, NULL}
764 };
765
766 struct rbd_options {
767         int     queue_depth;
768         unsigned long   lock_timeout;
769         bool    read_only;
770         bool    lock_on_read;
771         bool    exclusive;
772         bool    trim;
773 };
774
775 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
776 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
777 #define RBD_READ_ONLY_DEFAULT   false
778 #define RBD_LOCK_ON_READ_DEFAULT false
779 #define RBD_EXCLUSIVE_DEFAULT   false
780 #define RBD_TRIM_DEFAULT        true
781
782 struct parse_rbd_opts_ctx {
783         struct rbd_spec         *spec;
784         struct rbd_options      *opts;
785 };
786
787 static int parse_rbd_opts_token(char *c, void *private)
788 {
789         struct parse_rbd_opts_ctx *pctx = private;
790         substring_t argstr[MAX_OPT_ARGS];
791         int token, intval, ret;
792
793         token = match_token(c, rbd_opts_tokens, argstr);
794         if (token < Opt_last_int) {
795                 ret = match_int(&argstr[0], &intval);
796                 if (ret < 0) {
797                         pr_err("bad option arg (not int) at '%s'\n", c);
798                         return ret;
799                 }
800                 dout("got int token %d val %d\n", token, intval);
801         } else if (token > Opt_last_int && token < Opt_last_string) {
802                 dout("got string token %d val %s\n", token, argstr[0].from);
803         } else {
804                 dout("got token %d\n", token);
805         }
806
807         switch (token) {
808         case Opt_queue_depth:
809                 if (intval < 1) {
810                         pr_err("queue_depth out of range\n");
811                         return -EINVAL;
812                 }
813                 pctx->opts->queue_depth = intval;
814                 break;
815         case Opt_lock_timeout:
816                 /* 0 is "wait forever" (i.e. infinite timeout) */
817                 if (intval < 0 || intval > INT_MAX / 1000) {
818                         pr_err("lock_timeout out of range\n");
819                         return -EINVAL;
820                 }
821                 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
822                 break;
823         case Opt_pool_ns:
824                 kfree(pctx->spec->pool_ns);
825                 pctx->spec->pool_ns = match_strdup(argstr);
826                 if (!pctx->spec->pool_ns)
827                         return -ENOMEM;
828                 break;
829         case Opt_read_only:
830                 pctx->opts->read_only = true;
831                 break;
832         case Opt_read_write:
833                 pctx->opts->read_only = false;
834                 break;
835         case Opt_lock_on_read:
836                 pctx->opts->lock_on_read = true;
837                 break;
838         case Opt_exclusive:
839                 pctx->opts->exclusive = true;
840                 break;
841         case Opt_notrim:
842                 pctx->opts->trim = false;
843                 break;
844         default:
845                 /* libceph prints "bad option" msg */
846                 return -EINVAL;
847         }
848
849         return 0;
850 }
851
852 static char* obj_op_name(enum obj_operation_type op_type)
853 {
854         switch (op_type) {
855         case OBJ_OP_READ:
856                 return "read";
857         case OBJ_OP_WRITE:
858                 return "write";
859         case OBJ_OP_DISCARD:
860                 return "discard";
861         default:
862                 return "???";
863         }
864 }
865
866 /*
867  * Destroy ceph client
868  *
869  * Caller must hold rbd_client_list_lock.
870  */
871 static void rbd_client_release(struct kref *kref)
872 {
873         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
874
875         dout("%s: rbdc %p\n", __func__, rbdc);
876         spin_lock(&rbd_client_list_lock);
877         list_del(&rbdc->node);
878         spin_unlock(&rbd_client_list_lock);
879
880         ceph_destroy_client(rbdc->client);
881         kfree(rbdc);
882 }
883
884 /*
885  * Drop reference to ceph client node. If it's not referenced anymore, release
886  * it.
887  */
888 static void rbd_put_client(struct rbd_client *rbdc)
889 {
890         if (rbdc)
891                 kref_put(&rbdc->kref, rbd_client_release);
892 }
893
894 static int wait_for_latest_osdmap(struct ceph_client *client)
895 {
896         u64 newest_epoch;
897         int ret;
898
899         ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
900         if (ret)
901                 return ret;
902
903         if (client->osdc.osdmap->epoch >= newest_epoch)
904                 return 0;
905
906         ceph_osdc_maybe_request_map(&client->osdc);
907         return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
908                                      client->options->mount_timeout);
909 }
910
911 /*
912  * Get a ceph client with specific addr and configuration, if one does
913  * not exist create it.  Either way, ceph_opts is consumed by this
914  * function.
915  */
916 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
917 {
918         struct rbd_client *rbdc;
919         int ret;
920
921         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
922         rbdc = rbd_client_find(ceph_opts);
923         if (rbdc) {
924                 ceph_destroy_options(ceph_opts);
925
926                 /*
927                  * Using an existing client.  Make sure ->pg_pools is up to
928                  * date before we look up the pool id in do_rbd_add().
929                  */
930                 ret = wait_for_latest_osdmap(rbdc->client);
931                 if (ret) {
932                         rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
933                         rbd_put_client(rbdc);
934                         rbdc = ERR_PTR(ret);
935                 }
936         } else {
937                 rbdc = rbd_client_create(ceph_opts);
938         }
939         mutex_unlock(&client_mutex);
940
941         return rbdc;
942 }
943
944 static bool rbd_image_format_valid(u32 image_format)
945 {
946         return image_format == 1 || image_format == 2;
947 }
948
949 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
950 {
951         size_t size;
952         u32 snap_count;
953
954         /* The header has to start with the magic rbd header text */
955         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
956                 return false;
957
958         /* The bio layer requires at least sector-sized I/O */
959
960         if (ondisk->options.order < SECTOR_SHIFT)
961                 return false;
962
963         /* If we use u64 in a few spots we may be able to loosen this */
964
965         if (ondisk->options.order > 8 * sizeof (int) - 1)
966                 return false;
967
968         /*
969          * The size of a snapshot header has to fit in a size_t, and
970          * that limits the number of snapshots.
971          */
972         snap_count = le32_to_cpu(ondisk->snap_count);
973         size = SIZE_MAX - sizeof (struct ceph_snap_context);
974         if (snap_count > size / sizeof (__le64))
975                 return false;
976
977         /*
978          * Not only that, but the size of the entire the snapshot
979          * header must also be representable in a size_t.
980          */
981         size -= snap_count * sizeof (__le64);
982         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
983                 return false;
984
985         return true;
986 }
987
988 /*
989  * returns the size of an object in the image
990  */
991 static u32 rbd_obj_bytes(struct rbd_image_header *header)
992 {
993         return 1U << header->obj_order;
994 }
995
996 static void rbd_init_layout(struct rbd_device *rbd_dev)
997 {
998         if (rbd_dev->header.stripe_unit == 0 ||
999             rbd_dev->header.stripe_count == 0) {
1000                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1001                 rbd_dev->header.stripe_count = 1;
1002         }
1003
1004         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1005         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1006         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1007         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1008                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1009         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1010 }
1011
1012 /*
1013  * Fill an rbd image header with information from the given format 1
1014  * on-disk header.
1015  */
1016 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1017                                  struct rbd_image_header_ondisk *ondisk)
1018 {
1019         struct rbd_image_header *header = &rbd_dev->header;
1020         bool first_time = header->object_prefix == NULL;
1021         struct ceph_snap_context *snapc;
1022         char *object_prefix = NULL;
1023         char *snap_names = NULL;
1024         u64 *snap_sizes = NULL;
1025         u32 snap_count;
1026         int ret = -ENOMEM;
1027         u32 i;
1028
1029         /* Allocate this now to avoid having to handle failure below */
1030
1031         if (first_time) {
1032                 object_prefix = kstrndup(ondisk->object_prefix,
1033                                          sizeof(ondisk->object_prefix),
1034                                          GFP_KERNEL);
1035                 if (!object_prefix)
1036                         return -ENOMEM;
1037         }
1038
1039         /* Allocate the snapshot context and fill it in */
1040
1041         snap_count = le32_to_cpu(ondisk->snap_count);
1042         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1043         if (!snapc)
1044                 goto out_err;
1045         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1046         if (snap_count) {
1047                 struct rbd_image_snap_ondisk *snaps;
1048                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1049
1050                 /* We'll keep a copy of the snapshot names... */
1051
1052                 if (snap_names_len > (u64)SIZE_MAX)
1053                         goto out_2big;
1054                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1055                 if (!snap_names)
1056                         goto out_err;
1057
1058                 /* ...as well as the array of their sizes. */
1059                 snap_sizes = kmalloc_array(snap_count,
1060                                            sizeof(*header->snap_sizes),
1061                                            GFP_KERNEL);
1062                 if (!snap_sizes)
1063                         goto out_err;
1064
1065                 /*
1066                  * Copy the names, and fill in each snapshot's id
1067                  * and size.
1068                  *
1069                  * Note that rbd_dev_v1_header_info() guarantees the
1070                  * ondisk buffer we're working with has
1071                  * snap_names_len bytes beyond the end of the
1072                  * snapshot id array, this memcpy() is safe.
1073                  */
1074                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1075                 snaps = ondisk->snaps;
1076                 for (i = 0; i < snap_count; i++) {
1077                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1078                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1079                 }
1080         }
1081
1082         /* We won't fail any more, fill in the header */
1083
1084         if (first_time) {
1085                 header->object_prefix = object_prefix;
1086                 header->obj_order = ondisk->options.order;
1087                 rbd_init_layout(rbd_dev);
1088         } else {
1089                 ceph_put_snap_context(header->snapc);
1090                 kfree(header->snap_names);
1091                 kfree(header->snap_sizes);
1092         }
1093
1094         /* The remaining fields always get updated (when we refresh) */
1095
1096         header->image_size = le64_to_cpu(ondisk->image_size);
1097         header->snapc = snapc;
1098         header->snap_names = snap_names;
1099         header->snap_sizes = snap_sizes;
1100
1101         return 0;
1102 out_2big:
1103         ret = -EIO;
1104 out_err:
1105         kfree(snap_sizes);
1106         kfree(snap_names);
1107         ceph_put_snap_context(snapc);
1108         kfree(object_prefix);
1109
1110         return ret;
1111 }
1112
1113 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1114 {
1115         const char *snap_name;
1116
1117         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1118
1119         /* Skip over names until we find the one we are looking for */
1120
1121         snap_name = rbd_dev->header.snap_names;
1122         while (which--)
1123                 snap_name += strlen(snap_name) + 1;
1124
1125         return kstrdup(snap_name, GFP_KERNEL);
1126 }
1127
1128 /*
1129  * Snapshot id comparison function for use with qsort()/bsearch().
1130  * Note that result is for snapshots in *descending* order.
1131  */
1132 static int snapid_compare_reverse(const void *s1, const void *s2)
1133 {
1134         u64 snap_id1 = *(u64 *)s1;
1135         u64 snap_id2 = *(u64 *)s2;
1136
1137         if (snap_id1 < snap_id2)
1138                 return 1;
1139         return snap_id1 == snap_id2 ? 0 : -1;
1140 }
1141
1142 /*
1143  * Search a snapshot context to see if the given snapshot id is
1144  * present.
1145  *
1146  * Returns the position of the snapshot id in the array if it's found,
1147  * or BAD_SNAP_INDEX otherwise.
1148  *
1149  * Note: The snapshot array is in kept sorted (by the osd) in
1150  * reverse order, highest snapshot id first.
1151  */
1152 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1153 {
1154         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1155         u64 *found;
1156
1157         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1158                                 sizeof (snap_id), snapid_compare_reverse);
1159
1160         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1161 }
1162
1163 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1164                                         u64 snap_id)
1165 {
1166         u32 which;
1167         const char *snap_name;
1168
1169         which = rbd_dev_snap_index(rbd_dev, snap_id);
1170         if (which == BAD_SNAP_INDEX)
1171                 return ERR_PTR(-ENOENT);
1172
1173         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1174         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1175 }
1176
1177 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1178 {
1179         if (snap_id == CEPH_NOSNAP)
1180                 return RBD_SNAP_HEAD_NAME;
1181
1182         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1183         if (rbd_dev->image_format == 1)
1184                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1185
1186         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1187 }
1188
1189 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1190                                 u64 *snap_size)
1191 {
1192         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1193         if (snap_id == CEPH_NOSNAP) {
1194                 *snap_size = rbd_dev->header.image_size;
1195         } else if (rbd_dev->image_format == 1) {
1196                 u32 which;
1197
1198                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1199                 if (which == BAD_SNAP_INDEX)
1200                         return -ENOENT;
1201
1202                 *snap_size = rbd_dev->header.snap_sizes[which];
1203         } else {
1204                 u64 size = 0;
1205                 int ret;
1206
1207                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1208                 if (ret)
1209                         return ret;
1210
1211                 *snap_size = size;
1212         }
1213         return 0;
1214 }
1215
1216 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1217                         u64 *snap_features)
1218 {
1219         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1220         if (snap_id == CEPH_NOSNAP) {
1221                 *snap_features = rbd_dev->header.features;
1222         } else if (rbd_dev->image_format == 1) {
1223                 *snap_features = 0;     /* No features for format 1 */
1224         } else {
1225                 u64 features = 0;
1226                 int ret;
1227
1228                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1229                 if (ret)
1230                         return ret;
1231
1232                 *snap_features = features;
1233         }
1234         return 0;
1235 }
1236
1237 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1238 {
1239         u64 snap_id = rbd_dev->spec->snap_id;
1240         u64 size = 0;
1241         u64 features = 0;
1242         int ret;
1243
1244         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1245         if (ret)
1246                 return ret;
1247         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1248         if (ret)
1249                 return ret;
1250
1251         rbd_dev->mapping.size = size;
1252         rbd_dev->mapping.features = features;
1253
1254         return 0;
1255 }
1256
1257 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1258 {
1259         rbd_dev->mapping.size = 0;
1260         rbd_dev->mapping.features = 0;
1261 }
1262
1263 static void zero_bvec(struct bio_vec *bv)
1264 {
1265         void *buf;
1266         unsigned long flags;
1267
1268         buf = bvec_kmap_irq(bv, &flags);
1269         memset(buf, 0, bv->bv_len);
1270         flush_dcache_page(bv->bv_page);
1271         bvec_kunmap_irq(buf, &flags);
1272 }
1273
1274 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1275 {
1276         struct ceph_bio_iter it = *bio_pos;
1277
1278         ceph_bio_iter_advance(&it, off);
1279         ceph_bio_iter_advance_step(&it, bytes, ({
1280                 zero_bvec(&bv);
1281         }));
1282 }
1283
1284 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1285 {
1286         struct ceph_bvec_iter it = *bvec_pos;
1287
1288         ceph_bvec_iter_advance(&it, off);
1289         ceph_bvec_iter_advance_step(&it, bytes, ({
1290                 zero_bvec(&bv);
1291         }));
1292 }
1293
1294 /*
1295  * Zero a range in @obj_req data buffer defined by a bio (list) or
1296  * (private) bio_vec array.
1297  *
1298  * @off is relative to the start of the data buffer.
1299  */
1300 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1301                                u32 bytes)
1302 {
1303         switch (obj_req->img_request->data_type) {
1304         case OBJ_REQUEST_BIO:
1305                 zero_bios(&obj_req->bio_pos, off, bytes);
1306                 break;
1307         case OBJ_REQUEST_BVECS:
1308         case OBJ_REQUEST_OWN_BVECS:
1309                 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1310                 break;
1311         default:
1312                 rbd_assert(0);
1313         }
1314 }
1315
1316 static void rbd_obj_request_destroy(struct kref *kref);
1317 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1318 {
1319         rbd_assert(obj_request != NULL);
1320         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1321                 kref_read(&obj_request->kref));
1322         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1323 }
1324
1325 static void rbd_img_request_get(struct rbd_img_request *img_request)
1326 {
1327         dout("%s: img %p (was %d)\n", __func__, img_request,
1328              kref_read(&img_request->kref));
1329         kref_get(&img_request->kref);
1330 }
1331
1332 static void rbd_img_request_destroy(struct kref *kref);
1333 static void rbd_img_request_put(struct rbd_img_request *img_request)
1334 {
1335         rbd_assert(img_request != NULL);
1336         dout("%s: img %p (was %d)\n", __func__, img_request,
1337                 kref_read(&img_request->kref));
1338         kref_put(&img_request->kref, rbd_img_request_destroy);
1339 }
1340
1341 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1342                                         struct rbd_obj_request *obj_request)
1343 {
1344         rbd_assert(obj_request->img_request == NULL);
1345
1346         /* Image request now owns object's original reference */
1347         obj_request->img_request = img_request;
1348         img_request->obj_request_count++;
1349         img_request->pending_count++;
1350         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1351 }
1352
1353 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1354                                         struct rbd_obj_request *obj_request)
1355 {
1356         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1357         list_del(&obj_request->ex.oe_item);
1358         rbd_assert(img_request->obj_request_count > 0);
1359         img_request->obj_request_count--;
1360         rbd_assert(obj_request->img_request == img_request);
1361         rbd_obj_request_put(obj_request);
1362 }
1363
1364 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1365 {
1366         struct ceph_osd_request *osd_req = obj_request->osd_req;
1367
1368         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1369              obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1370              obj_request->ex.oe_len, osd_req);
1371         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1372 }
1373
1374 /*
1375  * The default/initial value for all image request flags is 0.  Each
1376  * is conditionally set to 1 at image request initialization time
1377  * and currently never change thereafter.
1378  */
1379 static void img_request_layered_set(struct rbd_img_request *img_request)
1380 {
1381         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1382         smp_mb();
1383 }
1384
1385 static void img_request_layered_clear(struct rbd_img_request *img_request)
1386 {
1387         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1388         smp_mb();
1389 }
1390
1391 static bool img_request_layered_test(struct rbd_img_request *img_request)
1392 {
1393         smp_mb();
1394         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1395 }
1396
1397 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1398 {
1399         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1400
1401         return !obj_req->ex.oe_off &&
1402                obj_req->ex.oe_len == rbd_dev->layout.object_size;
1403 }
1404
1405 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1406 {
1407         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1408
1409         return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1410                                         rbd_dev->layout.object_size;
1411 }
1412
1413 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1414 {
1415         return ceph_file_extents_bytes(obj_req->img_extents,
1416                                        obj_req->num_img_extents);
1417 }
1418
1419 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1420 {
1421         switch (img_req->op_type) {
1422         case OBJ_OP_READ:
1423                 return false;
1424         case OBJ_OP_WRITE:
1425         case OBJ_OP_DISCARD:
1426                 return true;
1427         default:
1428                 BUG();
1429         }
1430 }
1431
1432 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1433
1434 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1435 {
1436         struct rbd_obj_request *obj_req = osd_req->r_priv;
1437
1438         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1439              osd_req->r_result, obj_req);
1440         rbd_assert(osd_req == obj_req->osd_req);
1441
1442         obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1443         if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1444                 obj_req->xferred = osd_req->r_result;
1445         else
1446                 /*
1447                  * Writes aren't allowed to return a data payload.  In some
1448                  * guarded write cases (e.g. stat + zero on an empty object)
1449                  * a stat response makes it through, but we don't care.
1450                  */
1451                 obj_req->xferred = 0;
1452
1453         rbd_obj_handle_request(obj_req);
1454 }
1455
1456 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1457 {
1458         struct ceph_osd_request *osd_req = obj_request->osd_req;
1459
1460         osd_req->r_flags = CEPH_OSD_FLAG_READ;
1461         osd_req->r_snapid = obj_request->img_request->snap_id;
1462 }
1463
1464 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1465 {
1466         struct ceph_osd_request *osd_req = obj_request->osd_req;
1467
1468         osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1469         ktime_get_real_ts64(&osd_req->r_mtime);
1470         osd_req->r_data_offset = obj_request->ex.oe_off;
1471 }
1472
1473 static struct ceph_osd_request *
1474 rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
1475 {
1476         struct rbd_img_request *img_req = obj_req->img_request;
1477         struct rbd_device *rbd_dev = img_req->rbd_dev;
1478         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1479         struct ceph_osd_request *req;
1480         const char *name_format = rbd_dev->image_format == 1 ?
1481                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1482
1483         req = ceph_osdc_alloc_request(osdc,
1484                         (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1485                         num_ops, false, GFP_NOIO);
1486         if (!req)
1487                 return NULL;
1488
1489         req->r_callback = rbd_osd_req_callback;
1490         req->r_priv = obj_req;
1491
1492         /*
1493          * Data objects may be stored in a separate pool, but always in
1494          * the same namespace in that pool as the header in its pool.
1495          */
1496         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1497         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1498
1499         if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1500                         rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
1501                 goto err_req;
1502
1503         return req;
1504
1505 err_req:
1506         ceph_osdc_put_request(req);
1507         return NULL;
1508 }
1509
1510 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1511 {
1512         ceph_osdc_put_request(osd_req);
1513 }
1514
1515 static struct rbd_obj_request *rbd_obj_request_create(void)
1516 {
1517         struct rbd_obj_request *obj_request;
1518
1519         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1520         if (!obj_request)
1521                 return NULL;
1522
1523         ceph_object_extent_init(&obj_request->ex);
1524         kref_init(&obj_request->kref);
1525
1526         dout("%s %p\n", __func__, obj_request);
1527         return obj_request;
1528 }
1529
1530 static void rbd_obj_request_destroy(struct kref *kref)
1531 {
1532         struct rbd_obj_request *obj_request;
1533         u32 i;
1534
1535         obj_request = container_of(kref, struct rbd_obj_request, kref);
1536
1537         dout("%s: obj %p\n", __func__, obj_request);
1538
1539         if (obj_request->osd_req)
1540                 rbd_osd_req_destroy(obj_request->osd_req);
1541
1542         switch (obj_request->img_request->data_type) {
1543         case OBJ_REQUEST_NODATA:
1544         case OBJ_REQUEST_BIO:
1545         case OBJ_REQUEST_BVECS:
1546                 break;          /* Nothing to do */
1547         case OBJ_REQUEST_OWN_BVECS:
1548                 kfree(obj_request->bvec_pos.bvecs);
1549                 break;
1550         default:
1551                 rbd_assert(0);
1552         }
1553
1554         kfree(obj_request->img_extents);
1555         if (obj_request->copyup_bvecs) {
1556                 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1557                         if (obj_request->copyup_bvecs[i].bv_page)
1558                                 __free_page(obj_request->copyup_bvecs[i].bv_page);
1559                 }
1560                 kfree(obj_request->copyup_bvecs);
1561         }
1562
1563         kmem_cache_free(rbd_obj_request_cache, obj_request);
1564 }
1565
1566 /* It's OK to call this for a device with no parent */
1567
1568 static void rbd_spec_put(struct rbd_spec *spec);
1569 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1570 {
1571         rbd_dev_remove_parent(rbd_dev);
1572         rbd_spec_put(rbd_dev->parent_spec);
1573         rbd_dev->parent_spec = NULL;
1574         rbd_dev->parent_overlap = 0;
1575 }
1576
1577 /*
1578  * Parent image reference counting is used to determine when an
1579  * image's parent fields can be safely torn down--after there are no
1580  * more in-flight requests to the parent image.  When the last
1581  * reference is dropped, cleaning them up is safe.
1582  */
1583 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1584 {
1585         int counter;
1586
1587         if (!rbd_dev->parent_spec)
1588                 return;
1589
1590         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1591         if (counter > 0)
1592                 return;
1593
1594         /* Last reference; clean up parent data structures */
1595
1596         if (!counter)
1597                 rbd_dev_unparent(rbd_dev);
1598         else
1599                 rbd_warn(rbd_dev, "parent reference underflow");
1600 }
1601
1602 /*
1603  * If an image has a non-zero parent overlap, get a reference to its
1604  * parent.
1605  *
1606  * Returns true if the rbd device has a parent with a non-zero
1607  * overlap and a reference for it was successfully taken, or
1608  * false otherwise.
1609  */
1610 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1611 {
1612         int counter = 0;
1613
1614         if (!rbd_dev->parent_spec)
1615                 return false;
1616
1617         down_read(&rbd_dev->header_rwsem);
1618         if (rbd_dev->parent_overlap)
1619                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1620         up_read(&rbd_dev->header_rwsem);
1621
1622         if (counter < 0)
1623                 rbd_warn(rbd_dev, "parent reference overflow");
1624
1625         return counter > 0;
1626 }
1627
1628 /*
1629  * Caller is responsible for filling in the list of object requests
1630  * that comprises the image request, and the Linux request pointer
1631  * (if there is one).
1632  */
1633 static struct rbd_img_request *rbd_img_request_create(
1634                                         struct rbd_device *rbd_dev,
1635                                         enum obj_operation_type op_type,
1636                                         struct ceph_snap_context *snapc)
1637 {
1638         struct rbd_img_request *img_request;
1639
1640         img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1641         if (!img_request)
1642                 return NULL;
1643
1644         img_request->rbd_dev = rbd_dev;
1645         img_request->op_type = op_type;
1646         if (!rbd_img_is_write(img_request))
1647                 img_request->snap_id = rbd_dev->spec->snap_id;
1648         else
1649                 img_request->snapc = snapc;
1650
1651         if (rbd_dev_parent_get(rbd_dev))
1652                 img_request_layered_set(img_request);
1653
1654         spin_lock_init(&img_request->completion_lock);
1655         INIT_LIST_HEAD(&img_request->object_extents);
1656         kref_init(&img_request->kref);
1657
1658         dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1659              obj_op_name(op_type), img_request);
1660         return img_request;
1661 }
1662
1663 static void rbd_img_request_destroy(struct kref *kref)
1664 {
1665         struct rbd_img_request *img_request;
1666         struct rbd_obj_request *obj_request;
1667         struct rbd_obj_request *next_obj_request;
1668
1669         img_request = container_of(kref, struct rbd_img_request, kref);
1670
1671         dout("%s: img %p\n", __func__, img_request);
1672
1673         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1674                 rbd_img_obj_request_del(img_request, obj_request);
1675         rbd_assert(img_request->obj_request_count == 0);
1676
1677         if (img_request_layered_test(img_request)) {
1678                 img_request_layered_clear(img_request);
1679                 rbd_dev_parent_put(img_request->rbd_dev);
1680         }
1681
1682         if (rbd_img_is_write(img_request))
1683                 ceph_put_snap_context(img_request->snapc);
1684
1685         kmem_cache_free(rbd_img_request_cache, img_request);
1686 }
1687
1688 static void prune_extents(struct ceph_file_extent *img_extents,
1689                           u32 *num_img_extents, u64 overlap)
1690 {
1691         u32 cnt = *num_img_extents;
1692
1693         /* drop extents completely beyond the overlap */
1694         while (cnt && img_extents[cnt - 1].fe_off >= overlap)
1695                 cnt--;
1696
1697         if (cnt) {
1698                 struct ceph_file_extent *ex = &img_extents[cnt - 1];
1699
1700                 /* trim final overlapping extent */
1701                 if (ex->fe_off + ex->fe_len > overlap)
1702                         ex->fe_len = overlap - ex->fe_off;
1703         }
1704
1705         *num_img_extents = cnt;
1706 }
1707
1708 /*
1709  * Determine the byte range(s) covered by either just the object extent
1710  * or the entire object in the parent image.
1711  */
1712 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
1713                                     bool entire)
1714 {
1715         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1716         int ret;
1717
1718         if (!rbd_dev->parent_overlap)
1719                 return 0;
1720
1721         ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
1722                                   entire ? 0 : obj_req->ex.oe_off,
1723                                   entire ? rbd_dev->layout.object_size :
1724                                                         obj_req->ex.oe_len,
1725                                   &obj_req->img_extents,
1726                                   &obj_req->num_img_extents);
1727         if (ret)
1728                 return ret;
1729
1730         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
1731                       rbd_dev->parent_overlap);
1732         return 0;
1733 }
1734
1735 static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
1736 {
1737         switch (obj_req->img_request->data_type) {
1738         case OBJ_REQUEST_BIO:
1739                 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
1740                                                &obj_req->bio_pos,
1741                                                obj_req->ex.oe_len);
1742                 break;
1743         case OBJ_REQUEST_BVECS:
1744         case OBJ_REQUEST_OWN_BVECS:
1745                 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
1746                                                         obj_req->ex.oe_len);
1747                 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
1748                 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
1749                                                     &obj_req->bvec_pos);
1750                 break;
1751         default:
1752                 rbd_assert(0);
1753         }
1754 }
1755
1756 static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1757 {
1758         obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
1759         if (!obj_req->osd_req)
1760                 return -ENOMEM;
1761
1762         osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
1763                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1764         rbd_osd_req_setup_data(obj_req, 0);
1765
1766         rbd_osd_req_format_read(obj_req);
1767         return 0;
1768 }
1769
1770 static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1771                                 unsigned int which)
1772 {
1773         struct page **pages;
1774
1775         /*
1776          * The response data for a STAT call consists of:
1777          *     le64 length;
1778          *     struct {
1779          *         le32 tv_sec;
1780          *         le32 tv_nsec;
1781          *     } mtime;
1782          */
1783         pages = ceph_alloc_page_vector(1, GFP_NOIO);
1784         if (IS_ERR(pages))
1785                 return PTR_ERR(pages);
1786
1787         osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1788         osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
1789                                      8 + sizeof(struct ceph_timespec),
1790                                      0, false, true);
1791         return 0;
1792 }
1793
1794 static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1795                                   unsigned int which)
1796 {
1797         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1798         u16 opcode;
1799
1800         osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1801                                    rbd_dev->layout.object_size,
1802                                    rbd_dev->layout.object_size);
1803
1804         if (rbd_obj_is_entire(obj_req))
1805                 opcode = CEPH_OSD_OP_WRITEFULL;
1806         else
1807                 opcode = CEPH_OSD_OP_WRITE;
1808
1809         osd_req_op_extent_init(obj_req->osd_req, which, opcode,
1810                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1811         rbd_osd_req_setup_data(obj_req, which++);
1812
1813         rbd_assert(which == obj_req->osd_req->r_num_ops);
1814         rbd_osd_req_format_write(obj_req);
1815 }
1816
1817 static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
1818 {
1819         unsigned int num_osd_ops, which = 0;
1820         int ret;
1821
1822         /* reverse map the entire object onto the parent */
1823         ret = rbd_obj_calc_img_extents(obj_req, true);
1824         if (ret)
1825                 return ret;
1826
1827         if (obj_req->num_img_extents) {
1828                 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1829                 num_osd_ops = 3; /* stat + setallochint + write/writefull */
1830         } else {
1831                 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1832                 num_osd_ops = 2; /* setallochint + write/writefull */
1833         }
1834
1835         obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1836         if (!obj_req->osd_req)
1837                 return -ENOMEM;
1838
1839         if (obj_req->num_img_extents) {
1840                 ret = __rbd_obj_setup_stat(obj_req, which++);
1841                 if (ret)
1842                         return ret;
1843         }
1844
1845         __rbd_obj_setup_write(obj_req, which);
1846         return 0;
1847 }
1848
1849 static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
1850                                     unsigned int which)
1851 {
1852         u16 opcode;
1853
1854         if (rbd_obj_is_entire(obj_req)) {
1855                 if (obj_req->num_img_extents) {
1856                         osd_req_op_init(obj_req->osd_req, which++,
1857                                         CEPH_OSD_OP_CREATE, 0);
1858                         opcode = CEPH_OSD_OP_TRUNCATE;
1859                 } else {
1860                         osd_req_op_init(obj_req->osd_req, which++,
1861                                         CEPH_OSD_OP_DELETE, 0);
1862                         opcode = 0;
1863                 }
1864         } else if (rbd_obj_is_tail(obj_req)) {
1865                 opcode = CEPH_OSD_OP_TRUNCATE;
1866         } else {
1867                 opcode = CEPH_OSD_OP_ZERO;
1868         }
1869
1870         if (opcode)
1871                 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
1872                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
1873                                        0, 0);
1874
1875         rbd_assert(which == obj_req->osd_req->r_num_ops);
1876         rbd_osd_req_format_write(obj_req);
1877 }
1878
1879 static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
1880 {
1881         unsigned int num_osd_ops, which = 0;
1882         int ret;
1883
1884         /* reverse map the entire object onto the parent */
1885         ret = rbd_obj_calc_img_extents(obj_req, true);
1886         if (ret)
1887                 return ret;
1888
1889         if (rbd_obj_is_entire(obj_req)) {
1890                 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1891                 if (obj_req->num_img_extents)
1892                         num_osd_ops = 2; /* create + truncate */
1893                 else
1894                         num_osd_ops = 1; /* delete */
1895         } else {
1896                 if (obj_req->num_img_extents) {
1897                         obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1898                         num_osd_ops = 2; /* stat + truncate/zero */
1899                 } else {
1900                         obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1901                         num_osd_ops = 1; /* truncate/zero */
1902                 }
1903         }
1904
1905         obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1906         if (!obj_req->osd_req)
1907                 return -ENOMEM;
1908
1909         if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
1910                 ret = __rbd_obj_setup_stat(obj_req, which++);
1911                 if (ret)
1912                         return ret;
1913         }
1914
1915         __rbd_obj_setup_discard(obj_req, which);
1916         return 0;
1917 }
1918
1919 /*
1920  * For each object request in @img_req, allocate an OSD request, add
1921  * individual OSD ops and prepare them for submission.  The number of
1922  * OSD ops depends on op_type and the overlap point (if any).
1923  */
1924 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1925 {
1926         struct rbd_obj_request *obj_req;
1927         int ret;
1928
1929         for_each_obj_request(img_req, obj_req) {
1930                 switch (img_req->op_type) {
1931                 case OBJ_OP_READ:
1932                         ret = rbd_obj_setup_read(obj_req);
1933                         break;
1934                 case OBJ_OP_WRITE:
1935                         ret = rbd_obj_setup_write(obj_req);
1936                         break;
1937                 case OBJ_OP_DISCARD:
1938                         ret = rbd_obj_setup_discard(obj_req);
1939                         break;
1940                 default:
1941                         rbd_assert(0);
1942                 }
1943                 if (ret)
1944                         return ret;
1945
1946                 ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
1947                 if (ret)
1948                         return ret;
1949         }
1950
1951         return 0;
1952 }
1953
1954 union rbd_img_fill_iter {
1955         struct ceph_bio_iter    bio_iter;
1956         struct ceph_bvec_iter   bvec_iter;
1957 };
1958
1959 struct rbd_img_fill_ctx {
1960         enum obj_request_type   pos_type;
1961         union rbd_img_fill_iter *pos;
1962         union rbd_img_fill_iter iter;
1963         ceph_object_extent_fn_t set_pos_fn;
1964         ceph_object_extent_fn_t count_fn;
1965         ceph_object_extent_fn_t copy_fn;
1966 };
1967
1968 static struct ceph_object_extent *alloc_object_extent(void *arg)
1969 {
1970         struct rbd_img_request *img_req = arg;
1971         struct rbd_obj_request *obj_req;
1972
1973         obj_req = rbd_obj_request_create();
1974         if (!obj_req)
1975                 return NULL;
1976
1977         rbd_img_obj_request_add(img_req, obj_req);
1978         return &obj_req->ex;
1979 }
1980
1981 /*
1982  * While su != os && sc == 1 is technically not fancy (it's the same
1983  * layout as su == os && sc == 1), we can't use the nocopy path for it
1984  * because ->set_pos_fn() should be called only once per object.
1985  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
1986  * treat su != os && sc == 1 as fancy.
1987  */
1988 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
1989 {
1990         return l->stripe_unit != l->object_size;
1991 }
1992
1993 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
1994                                        struct ceph_file_extent *img_extents,
1995                                        u32 num_img_extents,
1996                                        struct rbd_img_fill_ctx *fctx)
1997 {
1998         u32 i;
1999         int ret;
2000
2001         img_req->data_type = fctx->pos_type;
2002
2003         /*
2004          * Create object requests and set each object request's starting
2005          * position in the provided bio (list) or bio_vec array.
2006          */
2007         fctx->iter = *fctx->pos;
2008         for (i = 0; i < num_img_extents; i++) {
2009                 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2010                                            img_extents[i].fe_off,
2011                                            img_extents[i].fe_len,
2012                                            &img_req->object_extents,
2013                                            alloc_object_extent, img_req,
2014                                            fctx->set_pos_fn, &fctx->iter);
2015                 if (ret)
2016                         return ret;
2017         }
2018
2019         return __rbd_img_fill_request(img_req);
2020 }
2021
2022 /*
2023  * Map a list of image extents to a list of object extents, create the
2024  * corresponding object requests (normally each to a different object,
2025  * but not always) and add them to @img_req.  For each object request,
2026  * set up its data descriptor to point to the corresponding chunk(s) of
2027  * @fctx->pos data buffer.
2028  *
2029  * Because ceph_file_to_extents() will merge adjacent object extents
2030  * together, each object request's data descriptor may point to multiple
2031  * different chunks of @fctx->pos data buffer.
2032  *
2033  * @fctx->pos data buffer is assumed to be large enough.
2034  */
2035 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2036                                 struct ceph_file_extent *img_extents,
2037                                 u32 num_img_extents,
2038                                 struct rbd_img_fill_ctx *fctx)
2039 {
2040         struct rbd_device *rbd_dev = img_req->rbd_dev;
2041         struct rbd_obj_request *obj_req;
2042         u32 i;
2043         int ret;
2044
2045         if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2046             !rbd_layout_is_fancy(&rbd_dev->layout))
2047                 return rbd_img_fill_request_nocopy(img_req, img_extents,
2048                                                    num_img_extents, fctx);
2049
2050         img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2051
2052         /*
2053          * Create object requests and determine ->bvec_count for each object
2054          * request.  Note that ->bvec_count sum over all object requests may
2055          * be greater than the number of bio_vecs in the provided bio (list)
2056          * or bio_vec array because when mapped, those bio_vecs can straddle
2057          * stripe unit boundaries.
2058          */
2059         fctx->iter = *fctx->pos;
2060         for (i = 0; i < num_img_extents; i++) {
2061                 ret = ceph_file_to_extents(&rbd_dev->layout,
2062                                            img_extents[i].fe_off,
2063                                            img_extents[i].fe_len,
2064                                            &img_req->object_extents,
2065                                            alloc_object_extent, img_req,
2066                                            fctx->count_fn, &fctx->iter);
2067                 if (ret)
2068                         return ret;
2069         }
2070
2071         for_each_obj_request(img_req, obj_req) {
2072                 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2073                                               sizeof(*obj_req->bvec_pos.bvecs),
2074                                               GFP_NOIO);
2075                 if (!obj_req->bvec_pos.bvecs)
2076                         return -ENOMEM;
2077         }
2078
2079         /*
2080          * Fill in each object request's private bio_vec array, splitting and
2081          * rearranging the provided bio_vecs in stripe unit chunks as needed.
2082          */
2083         fctx->iter = *fctx->pos;
2084         for (i = 0; i < num_img_extents; i++) {
2085                 ret = ceph_iterate_extents(&rbd_dev->layout,
2086                                            img_extents[i].fe_off,
2087                                            img_extents[i].fe_len,
2088                                            &img_req->object_extents,
2089                                            fctx->copy_fn, &fctx->iter);
2090                 if (ret)
2091                         return ret;
2092         }
2093
2094         return __rbd_img_fill_request(img_req);
2095 }
2096
2097 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2098                                u64 off, u64 len)
2099 {
2100         struct ceph_file_extent ex = { off, len };
2101         union rbd_img_fill_iter dummy;
2102         struct rbd_img_fill_ctx fctx = {
2103                 .pos_type = OBJ_REQUEST_NODATA,
2104                 .pos = &dummy,
2105         };
2106
2107         return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2108 }
2109
2110 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2111 {
2112         struct rbd_obj_request *obj_req =
2113             container_of(ex, struct rbd_obj_request, ex);
2114         struct ceph_bio_iter *it = arg;
2115
2116         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2117         obj_req->bio_pos = *it;
2118         ceph_bio_iter_advance(it, bytes);
2119 }
2120
2121 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2122 {
2123         struct rbd_obj_request *obj_req =
2124             container_of(ex, struct rbd_obj_request, ex);
2125         struct ceph_bio_iter *it = arg;
2126
2127         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2128         ceph_bio_iter_advance_step(it, bytes, ({
2129                 obj_req->bvec_count++;
2130         }));
2131
2132 }
2133
2134 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2135 {
2136         struct rbd_obj_request *obj_req =
2137             container_of(ex, struct rbd_obj_request, ex);
2138         struct ceph_bio_iter *it = arg;
2139
2140         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2141         ceph_bio_iter_advance_step(it, bytes, ({
2142                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2143                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2144         }));
2145 }
2146
2147 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2148                                    struct ceph_file_extent *img_extents,
2149                                    u32 num_img_extents,
2150                                    struct ceph_bio_iter *bio_pos)
2151 {
2152         struct rbd_img_fill_ctx fctx = {
2153                 .pos_type = OBJ_REQUEST_BIO,
2154                 .pos = (union rbd_img_fill_iter *)bio_pos,
2155                 .set_pos_fn = set_bio_pos,
2156                 .count_fn = count_bio_bvecs,
2157                 .copy_fn = copy_bio_bvecs,
2158         };
2159
2160         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2161                                     &fctx);
2162 }
2163
2164 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2165                                  u64 off, u64 len, struct bio *bio)
2166 {
2167         struct ceph_file_extent ex = { off, len };
2168         struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2169
2170         return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2171 }
2172
2173 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2174 {
2175         struct rbd_obj_request *obj_req =
2176             container_of(ex, struct rbd_obj_request, ex);
2177         struct ceph_bvec_iter *it = arg;
2178
2179         obj_req->bvec_pos = *it;
2180         ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2181         ceph_bvec_iter_advance(it, bytes);
2182 }
2183
2184 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2185 {
2186         struct rbd_obj_request *obj_req =
2187             container_of(ex, struct rbd_obj_request, ex);
2188         struct ceph_bvec_iter *it = arg;
2189
2190         ceph_bvec_iter_advance_step(it, bytes, ({
2191                 obj_req->bvec_count++;
2192         }));
2193 }
2194
2195 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2196 {
2197         struct rbd_obj_request *obj_req =
2198             container_of(ex, struct rbd_obj_request, ex);
2199         struct ceph_bvec_iter *it = arg;
2200
2201         ceph_bvec_iter_advance_step(it, bytes, ({
2202                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2203                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2204         }));
2205 }
2206
2207 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2208                                      struct ceph_file_extent *img_extents,
2209                                      u32 num_img_extents,
2210                                      struct ceph_bvec_iter *bvec_pos)
2211 {
2212         struct rbd_img_fill_ctx fctx = {
2213                 .pos_type = OBJ_REQUEST_BVECS,
2214                 .pos = (union rbd_img_fill_iter *)bvec_pos,
2215                 .set_pos_fn = set_bvec_pos,
2216                 .count_fn = count_bvecs,
2217                 .copy_fn = copy_bvecs,
2218         };
2219
2220         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2221                                     &fctx);
2222 }
2223
2224 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2225                                    struct ceph_file_extent *img_extents,
2226                                    u32 num_img_extents,
2227                                    struct bio_vec *bvecs)
2228 {
2229         struct ceph_bvec_iter it = {
2230                 .bvecs = bvecs,
2231                 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2232                                                              num_img_extents) },
2233         };
2234
2235         return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2236                                          &it);
2237 }
2238
2239 static void rbd_img_request_submit(struct rbd_img_request *img_request)
2240 {
2241         struct rbd_obj_request *obj_request;
2242
2243         dout("%s: img %p\n", __func__, img_request);
2244
2245         rbd_img_request_get(img_request);
2246         for_each_obj_request(img_request, obj_request)
2247                 rbd_obj_request_submit(obj_request);
2248
2249         rbd_img_request_put(img_request);
2250 }
2251
2252 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2253 {
2254         struct rbd_img_request *img_req = obj_req->img_request;
2255         struct rbd_img_request *child_img_req;
2256         int ret;
2257
2258         child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2259                                                OBJ_OP_READ, NULL);
2260         if (!child_img_req)
2261                 return -ENOMEM;
2262
2263         __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2264         child_img_req->obj_request = obj_req;
2265
2266         if (!rbd_img_is_write(img_req)) {
2267                 switch (img_req->data_type) {
2268                 case OBJ_REQUEST_BIO:
2269                         ret = __rbd_img_fill_from_bio(child_img_req,
2270                                                       obj_req->img_extents,
2271                                                       obj_req->num_img_extents,
2272                                                       &obj_req->bio_pos);
2273                         break;
2274                 case OBJ_REQUEST_BVECS:
2275                 case OBJ_REQUEST_OWN_BVECS:
2276                         ret = __rbd_img_fill_from_bvecs(child_img_req,
2277                                                       obj_req->img_extents,
2278                                                       obj_req->num_img_extents,
2279                                                       &obj_req->bvec_pos);
2280                         break;
2281                 default:
2282                         rbd_assert(0);
2283                 }
2284         } else {
2285                 ret = rbd_img_fill_from_bvecs(child_img_req,
2286                                               obj_req->img_extents,
2287                                               obj_req->num_img_extents,
2288                                               obj_req->copyup_bvecs);
2289         }
2290         if (ret) {
2291                 rbd_img_request_put(child_img_req);
2292                 return ret;
2293         }
2294
2295         rbd_img_request_submit(child_img_req);
2296         return 0;
2297 }
2298
2299 static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2300 {
2301         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2302         int ret;
2303
2304         if (obj_req->result == -ENOENT &&
2305             rbd_dev->parent_overlap && !obj_req->tried_parent) {
2306                 /* reverse map this object extent onto the parent */
2307                 ret = rbd_obj_calc_img_extents(obj_req, false);
2308                 if (ret) {
2309                         obj_req->result = ret;
2310                         return true;
2311                 }
2312
2313                 if (obj_req->num_img_extents) {
2314                         obj_req->tried_parent = true;
2315                         ret = rbd_obj_read_from_parent(obj_req);
2316                         if (ret) {
2317                                 obj_req->result = ret;
2318                                 return true;
2319                         }
2320                         return false;
2321                 }
2322         }
2323
2324         /*
2325          * -ENOENT means a hole in the image -- zero-fill the entire
2326          * length of the request.  A short read also implies zero-fill
2327          * to the end of the request.  In both cases we update xferred
2328          * count to indicate the whole request was satisfied.
2329          */
2330         if (obj_req->result == -ENOENT ||
2331             (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
2332                 rbd_assert(!obj_req->xferred || !obj_req->result);
2333                 rbd_obj_zero_range(obj_req, obj_req->xferred,
2334                                    obj_req->ex.oe_len - obj_req->xferred);
2335                 obj_req->result = 0;
2336                 obj_req->xferred = obj_req->ex.oe_len;
2337         }
2338
2339         return true;
2340 }
2341
2342 /*
2343  * copyup_bvecs pages are never highmem pages
2344  */
2345 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2346 {
2347         struct ceph_bvec_iter it = {
2348                 .bvecs = bvecs,
2349                 .iter = { .bi_size = bytes },
2350         };
2351
2352         ceph_bvec_iter_advance_step(&it, bytes, ({
2353                 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2354                                bv.bv_len))
2355                         return false;
2356         }));
2357         return true;
2358 }
2359
2360 static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2361 {
2362         unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
2363         int ret;
2364
2365         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2366         rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2367         rbd_osd_req_destroy(obj_req->osd_req);
2368
2369         /*
2370          * Create a copyup request with the same number of OSD ops as
2371          * the original request.  The original request was stat + op(s),
2372          * the new copyup request will be copyup + the same op(s).
2373          */
2374         obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
2375         if (!obj_req->osd_req)
2376                 return -ENOMEM;
2377
2378         ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
2379         if (ret)
2380                 return ret;
2381
2382         /*
2383          * Only send non-zero copyup data to save some I/O and network
2384          * bandwidth -- zero copyup data is equivalent to the object not
2385          * existing.
2386          */
2387         if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2388                 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2389                 bytes = 0;
2390         }
2391         osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2392                                           obj_req->copyup_bvecs,
2393                                           obj_req->copyup_bvec_count,
2394                                           bytes);
2395
2396         switch (obj_req->img_request->op_type) {
2397         case OBJ_OP_WRITE:
2398                 __rbd_obj_setup_write(obj_req, 1);
2399                 break;
2400         case OBJ_OP_DISCARD:
2401                 rbd_assert(!rbd_obj_is_entire(obj_req));
2402                 __rbd_obj_setup_discard(obj_req, 1);
2403                 break;
2404         default:
2405                 rbd_assert(0);
2406         }
2407
2408         ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
2409         if (ret)
2410                 return ret;
2411
2412         rbd_obj_request_submit(obj_req);
2413         return 0;
2414 }
2415
2416 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
2417 {
2418         u32 i;
2419
2420         rbd_assert(!obj_req->copyup_bvecs);
2421         obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2422         obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2423                                         sizeof(*obj_req->copyup_bvecs),
2424                                         GFP_NOIO);
2425         if (!obj_req->copyup_bvecs)
2426                 return -ENOMEM;
2427
2428         for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2429                 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2430
2431                 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2432                 if (!obj_req->copyup_bvecs[i].bv_page)
2433                         return -ENOMEM;
2434
2435                 obj_req->copyup_bvecs[i].bv_offset = 0;
2436                 obj_req->copyup_bvecs[i].bv_len = len;
2437                 obj_overlap -= len;
2438         }
2439
2440         rbd_assert(!obj_overlap);
2441         return 0;
2442 }
2443
2444 static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
2445 {
2446         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2447         int ret;
2448
2449         rbd_assert(obj_req->num_img_extents);
2450         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2451                       rbd_dev->parent_overlap);
2452         if (!obj_req->num_img_extents) {
2453                 /*
2454                  * The overlap has become 0 (most likely because the
2455                  * image has been flattened).  Use rbd_obj_issue_copyup()
2456                  * to re-submit the original write request -- the copyup
2457                  * operation itself will be a no-op, since someone must
2458                  * have populated the child object while we weren't
2459                  * looking.  Move to WRITE_FLAT state as we'll be done
2460                  * with the operation once the null copyup completes.
2461                  */
2462                 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2463                 return rbd_obj_issue_copyup(obj_req, 0);
2464         }
2465
2466         ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
2467         if (ret)
2468                 return ret;
2469
2470         obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
2471         return rbd_obj_read_from_parent(obj_req);
2472 }
2473
2474 static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
2475 {
2476         int ret;
2477
2478 again:
2479         switch (obj_req->write_state) {
2480         case RBD_OBJ_WRITE_GUARD:
2481                 rbd_assert(!obj_req->xferred);
2482                 if (obj_req->result == -ENOENT) {
2483                         /*
2484                          * The target object doesn't exist.  Read the data for
2485                          * the entire target object up to the overlap point (if
2486                          * any) from the parent, so we can use it for a copyup.
2487                          */
2488                         ret = rbd_obj_handle_write_guard(obj_req);
2489                         if (ret) {
2490                                 obj_req->result = ret;
2491                                 return true;
2492                         }
2493                         return false;
2494                 }
2495                 /* fall through */
2496         case RBD_OBJ_WRITE_FLAT:
2497                 if (!obj_req->result)
2498                         /*
2499                          * There is no such thing as a successful short
2500                          * write -- indicate the whole request was satisfied.
2501                          */
2502                         obj_req->xferred = obj_req->ex.oe_len;
2503                 return true;
2504         case RBD_OBJ_WRITE_COPYUP:
2505                 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2506                 if (obj_req->result)
2507                         goto again;
2508
2509                 rbd_assert(obj_req->xferred);
2510                 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2511                 if (ret) {
2512                         obj_req->result = ret;
2513                         return true;
2514                 }
2515                 return false;
2516         default:
2517                 BUG();
2518         }
2519 }
2520
2521 /*
2522  * Returns true if @obj_req is completed, or false otherwise.
2523  */
2524 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2525 {
2526         switch (obj_req->img_request->op_type) {
2527         case OBJ_OP_READ:
2528                 return rbd_obj_handle_read(obj_req);
2529         case OBJ_OP_WRITE:
2530                 return rbd_obj_handle_write(obj_req);
2531         case OBJ_OP_DISCARD:
2532                 if (rbd_obj_handle_write(obj_req)) {
2533                         /*
2534                          * Hide -ENOENT from delete/truncate/zero -- discarding
2535                          * a non-existent object is not a problem.
2536                          */
2537                         if (obj_req->result == -ENOENT) {
2538                                 obj_req->result = 0;
2539                                 obj_req->xferred = obj_req->ex.oe_len;
2540                         }
2541                         return true;
2542                 }
2543                 return false;
2544         default:
2545                 BUG();
2546         }
2547 }
2548
2549 static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
2550 {
2551         struct rbd_img_request *img_req = obj_req->img_request;
2552
2553         rbd_assert((!obj_req->result &&
2554                     obj_req->xferred == obj_req->ex.oe_len) ||
2555                    (obj_req->result < 0 && !obj_req->xferred));
2556         if (!obj_req->result) {
2557                 img_req->xferred += obj_req->xferred;
2558                 return;
2559         }
2560
2561         rbd_warn(img_req->rbd_dev,
2562                  "%s at objno %llu %llu~%llu result %d xferred %llu",
2563                  obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2564                  obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
2565                  obj_req->xferred);
2566         if (!img_req->result) {
2567                 img_req->result = obj_req->result;
2568                 img_req->xferred = 0;
2569         }
2570 }
2571
2572 static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2573 {
2574         struct rbd_obj_request *obj_req = img_req->obj_request;
2575
2576         rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
2577         rbd_assert((!img_req->result &&
2578                     img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2579                    (img_req->result < 0 && !img_req->xferred));
2580
2581         obj_req->result = img_req->result;
2582         obj_req->xferred = img_req->xferred;
2583         rbd_img_request_put(img_req);
2584 }
2585
2586 static void rbd_img_end_request(struct rbd_img_request *img_req)
2587 {
2588         rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2589         rbd_assert((!img_req->result &&
2590                     img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2591                    (img_req->result < 0 && !img_req->xferred));
2592
2593         blk_mq_end_request(img_req->rq,
2594                            errno_to_blk_status(img_req->result));
2595         rbd_img_request_put(img_req);
2596 }
2597
2598 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2599 {
2600         struct rbd_img_request *img_req;
2601
2602 again:
2603         if (!__rbd_obj_handle_request(obj_req))
2604                 return;
2605
2606         img_req = obj_req->img_request;
2607         spin_lock(&img_req->completion_lock);
2608         rbd_obj_end_request(obj_req);
2609         rbd_assert(img_req->pending_count);
2610         if (--img_req->pending_count) {
2611                 spin_unlock(&img_req->completion_lock);
2612                 return;
2613         }
2614
2615         spin_unlock(&img_req->completion_lock);
2616         if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2617                 obj_req = img_req->obj_request;
2618                 rbd_img_end_child_request(img_req);
2619                 goto again;
2620         }
2621         rbd_img_end_request(img_req);
2622 }
2623
2624 static const struct rbd_client_id rbd_empty_cid;
2625
2626 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2627                           const struct rbd_client_id *rhs)
2628 {
2629         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2630 }
2631
2632 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
2633 {
2634         struct rbd_client_id cid;
2635
2636         mutex_lock(&rbd_dev->watch_mutex);
2637         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
2638         cid.handle = rbd_dev->watch_cookie;
2639         mutex_unlock(&rbd_dev->watch_mutex);
2640         return cid;
2641 }
2642
2643 /*
2644  * lock_rwsem must be held for write
2645  */
2646 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
2647                               const struct rbd_client_id *cid)
2648 {
2649         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
2650              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
2651              cid->gid, cid->handle);
2652         rbd_dev->owner_cid = *cid; /* struct */
2653 }
2654
2655 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
2656 {
2657         mutex_lock(&rbd_dev->watch_mutex);
2658         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
2659         mutex_unlock(&rbd_dev->watch_mutex);
2660 }
2661
2662 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
2663 {
2664         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2665
2666         strcpy(rbd_dev->lock_cookie, cookie);
2667         rbd_set_owner_cid(rbd_dev, &cid);
2668         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
2669 }
2670
2671 /*
2672  * lock_rwsem must be held for write
2673  */
2674 static int rbd_lock(struct rbd_device *rbd_dev)
2675 {
2676         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2677         char cookie[32];
2678         int ret;
2679
2680         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
2681                 rbd_dev->lock_cookie[0] != '\0');
2682
2683         format_lock_cookie(rbd_dev, cookie);
2684         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2685                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
2686                             RBD_LOCK_TAG, "", 0);
2687         if (ret)
2688                 return ret;
2689
2690         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
2691         __rbd_lock(rbd_dev, cookie);
2692         return 0;
2693 }
2694
2695 /*
2696  * lock_rwsem must be held for write
2697  */
2698 static void rbd_unlock(struct rbd_device *rbd_dev)
2699 {
2700         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2701         int ret;
2702
2703         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
2704                 rbd_dev->lock_cookie[0] == '\0');
2705
2706         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2707                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
2708         if (ret && ret != -ENOENT)
2709                 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
2710
2711         /* treat errors as the image is unlocked */
2712         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
2713         rbd_dev->lock_cookie[0] = '\0';
2714         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
2715         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
2716 }
2717
2718 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
2719                                 enum rbd_notify_op notify_op,
2720                                 struct page ***preply_pages,
2721                                 size_t *preply_len)
2722 {
2723         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2724         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2725         char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
2726         int buf_size = sizeof(buf);
2727         void *p = buf;
2728
2729         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
2730
2731         /* encode *LockPayload NotifyMessage (op + ClientId) */
2732         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
2733         ceph_encode_32(&p, notify_op);
2734         ceph_encode_64(&p, cid.gid);
2735         ceph_encode_64(&p, cid.handle);
2736
2737         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
2738                                 &rbd_dev->header_oloc, buf, buf_size,
2739                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
2740 }
2741
2742 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
2743                                enum rbd_notify_op notify_op)
2744 {
2745         struct page **reply_pages;
2746         size_t reply_len;
2747
2748         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2749         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2750 }
2751
2752 static void rbd_notify_acquired_lock(struct work_struct *work)
2753 {
2754         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2755                                                   acquired_lock_work);
2756
2757         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
2758 }
2759
2760 static void rbd_notify_released_lock(struct work_struct *work)
2761 {
2762         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2763                                                   released_lock_work);
2764
2765         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
2766 }
2767
2768 static int rbd_request_lock(struct rbd_device *rbd_dev)
2769 {
2770         struct page **reply_pages;
2771         size_t reply_len;
2772         bool lock_owner_responded = false;
2773         int ret;
2774
2775         dout("%s rbd_dev %p\n", __func__, rbd_dev);
2776
2777         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
2778                                    &reply_pages, &reply_len);
2779         if (ret && ret != -ETIMEDOUT) {
2780                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
2781                 goto out;
2782         }
2783
2784         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
2785                 void *p = page_address(reply_pages[0]);
2786                 void *const end = p + reply_len;
2787                 u32 n;
2788
2789                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
2790                 while (n--) {
2791                         u8 struct_v;
2792                         u32 len;
2793
2794                         ceph_decode_need(&p, end, 8 + 8, e_inval);
2795                         p += 8 + 8; /* skip gid and cookie */
2796
2797                         ceph_decode_32_safe(&p, end, len, e_inval);
2798                         if (!len)
2799                                 continue;
2800
2801                         if (lock_owner_responded) {
2802                                 rbd_warn(rbd_dev,
2803                                          "duplicate lock owners detected");
2804                                 ret = -EIO;
2805                                 goto out;
2806                         }
2807
2808                         lock_owner_responded = true;
2809                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
2810                                                   &struct_v, &len);
2811                         if (ret) {
2812                                 rbd_warn(rbd_dev,
2813                                          "failed to decode ResponseMessage: %d",
2814                                          ret);
2815                                 goto e_inval;
2816                         }
2817
2818                         ret = ceph_decode_32(&p);
2819                 }
2820         }
2821
2822         if (!lock_owner_responded) {
2823                 rbd_warn(rbd_dev, "no lock owners detected");
2824                 ret = -ETIMEDOUT;
2825         }
2826
2827 out:
2828         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2829         return ret;
2830
2831 e_inval:
2832         ret = -EINVAL;
2833         goto out;
2834 }
2835
2836 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
2837 {
2838         dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
2839
2840         cancel_delayed_work(&rbd_dev->lock_dwork);
2841         if (wake_all)
2842                 wake_up_all(&rbd_dev->lock_waitq);
2843         else
2844                 wake_up(&rbd_dev->lock_waitq);
2845 }
2846
2847 static int get_lock_owner_info(struct rbd_device *rbd_dev,
2848                                struct ceph_locker **lockers, u32 *num_lockers)
2849 {
2850         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2851         u8 lock_type;
2852         char *lock_tag;
2853         int ret;
2854
2855         dout("%s rbd_dev %p\n", __func__, rbd_dev);
2856
2857         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
2858                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
2859                                  &lock_type, &lock_tag, lockers, num_lockers);
2860         if (ret)
2861                 return ret;
2862
2863         if (*num_lockers == 0) {
2864                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
2865                 goto out;
2866         }
2867
2868         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
2869                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
2870                          lock_tag);
2871                 ret = -EBUSY;
2872                 goto out;
2873         }
2874
2875         if (lock_type == CEPH_CLS_LOCK_SHARED) {
2876                 rbd_warn(rbd_dev, "shared lock type detected");
2877                 ret = -EBUSY;
2878                 goto out;
2879         }
2880
2881         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
2882                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
2883                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2884                          (*lockers)[0].id.cookie);
2885                 ret = -EBUSY;
2886                 goto out;
2887         }
2888
2889 out:
2890         kfree(lock_tag);
2891         return ret;
2892 }
2893
2894 static int find_watcher(struct rbd_device *rbd_dev,
2895                         const struct ceph_locker *locker)
2896 {
2897         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2898         struct ceph_watch_item *watchers;
2899         u32 num_watchers;
2900         u64 cookie;
2901         int i;
2902         int ret;
2903
2904         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
2905                                       &rbd_dev->header_oloc, &watchers,
2906                                       &num_watchers);
2907         if (ret)
2908                 return ret;
2909
2910         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
2911         for (i = 0; i < num_watchers; i++) {
2912                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
2913                             sizeof(locker->info.addr)) &&
2914                     watchers[i].cookie == cookie) {
2915                         struct rbd_client_id cid = {
2916                                 .gid = le64_to_cpu(watchers[i].name.num),
2917                                 .handle = cookie,
2918                         };
2919
2920                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
2921                              rbd_dev, cid.gid, cid.handle);
2922                         rbd_set_owner_cid(rbd_dev, &cid);
2923                         ret = 1;
2924                         goto out;
2925                 }
2926         }
2927
2928         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
2929         ret = 0;
2930 out:
2931         kfree(watchers);
2932         return ret;
2933 }
2934
2935 /*
2936  * lock_rwsem must be held for write
2937  */
2938 static int rbd_try_lock(struct rbd_device *rbd_dev)
2939 {
2940         struct ceph_client *client = rbd_dev->rbd_client->client;
2941         struct ceph_locker *lockers;
2942         u32 num_lockers;
2943         int ret;
2944
2945         for (;;) {
2946                 ret = rbd_lock(rbd_dev);
2947                 if (ret != -EBUSY)
2948                         return ret;
2949
2950                 /* determine if the current lock holder is still alive */
2951                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
2952                 if (ret)
2953                         return ret;
2954
2955                 if (num_lockers == 0)
2956                         goto again;
2957
2958                 ret = find_watcher(rbd_dev, lockers);
2959                 if (ret) {
2960                         if (ret > 0)
2961                                 ret = 0; /* have to request lock */
2962                         goto out;
2963                 }
2964
2965                 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
2966                          ENTITY_NAME(lockers[0].id.name));
2967
2968                 ret = ceph_monc_blacklist_add(&client->monc,
2969                                               &lockers[0].info.addr);
2970                 if (ret) {
2971                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
2972                                  ENTITY_NAME(lockers[0].id.name), ret);
2973                         goto out;
2974                 }
2975
2976                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
2977                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
2978                                           lockers[0].id.cookie,
2979                                           &lockers[0].id.name);
2980                 if (ret && ret != -ENOENT)
2981                         goto out;
2982
2983 again:
2984                 ceph_free_lockers(lockers, num_lockers);
2985         }
2986
2987 out:
2988         ceph_free_lockers(lockers, num_lockers);
2989         return ret;
2990 }
2991
2992 /*
2993  * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2994  */
2995 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2996                                                 int *pret)
2997 {
2998         enum rbd_lock_state lock_state;
2999
3000         down_read(&rbd_dev->lock_rwsem);
3001         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3002              rbd_dev->lock_state);
3003         if (__rbd_is_lock_owner(rbd_dev)) {
3004                 lock_state = rbd_dev->lock_state;
3005                 up_read(&rbd_dev->lock_rwsem);
3006                 return lock_state;
3007         }
3008
3009         up_read(&rbd_dev->lock_rwsem);
3010         down_write(&rbd_dev->lock_rwsem);
3011         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3012              rbd_dev->lock_state);
3013         if (!__rbd_is_lock_owner(rbd_dev)) {
3014                 *pret = rbd_try_lock(rbd_dev);
3015                 if (*pret)
3016                         rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3017         }
3018
3019         lock_state = rbd_dev->lock_state;
3020         up_write(&rbd_dev->lock_rwsem);
3021         return lock_state;
3022 }
3023
3024 static void rbd_acquire_lock(struct work_struct *work)
3025 {
3026         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3027                                             struct rbd_device, lock_dwork);
3028         enum rbd_lock_state lock_state;
3029         int ret = 0;
3030
3031         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3032 again:
3033         lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3034         if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3035                 if (lock_state == RBD_LOCK_STATE_LOCKED)
3036                         wake_requests(rbd_dev, true);
3037                 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3038                      rbd_dev, lock_state, ret);
3039                 return;
3040         }
3041
3042         ret = rbd_request_lock(rbd_dev);
3043         if (ret == -ETIMEDOUT) {
3044                 goto again; /* treat this as a dead client */
3045         } else if (ret == -EROFS) {
3046                 rbd_warn(rbd_dev, "peer will not release lock");
3047                 /*
3048                  * If this is rbd_add_acquire_lock(), we want to fail
3049                  * immediately -- reuse BLACKLISTED flag.  Otherwise we
3050                  * want to block.
3051                  */
3052                 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3053                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3054                         /* wake "rbd map --exclusive" process */
3055                         wake_requests(rbd_dev, false);
3056                 }
3057         } else if (ret < 0) {
3058                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3059                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3060                                  RBD_RETRY_DELAY);
3061         } else {
3062                 /*
3063                  * lock owner acked, but resend if we don't see them
3064                  * release the lock
3065                  */
3066                 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3067                      rbd_dev);
3068                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3069                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3070         }
3071 }
3072
3073 /*
3074  * lock_rwsem must be held for write
3075  */
3076 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3077 {
3078         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3079              rbd_dev->lock_state);
3080         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3081                 return false;
3082
3083         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3084         downgrade_write(&rbd_dev->lock_rwsem);
3085         /*
3086          * Ensure that all in-flight IO is flushed.
3087          *
3088          * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3089          * may be shared with other devices.
3090          */
3091         ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3092         up_read(&rbd_dev->lock_rwsem);
3093
3094         down_write(&rbd_dev->lock_rwsem);
3095         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3096              rbd_dev->lock_state);
3097         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3098                 return false;
3099
3100         rbd_unlock(rbd_dev);
3101         /*
3102          * Give others a chance to grab the lock - we would re-acquire
3103          * almost immediately if we got new IO during ceph_osdc_sync()
3104          * otherwise.  We need to ack our own notifications, so this
3105          * lock_dwork will be requeued from rbd_wait_state_locked()
3106          * after wake_requests() in rbd_handle_released_lock().
3107          */
3108         cancel_delayed_work(&rbd_dev->lock_dwork);
3109         return true;
3110 }
3111
3112 static void rbd_release_lock_work(struct work_struct *work)
3113 {
3114         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3115                                                   unlock_work);
3116
3117         down_write(&rbd_dev->lock_rwsem);
3118         rbd_release_lock(rbd_dev);
3119         up_write(&rbd_dev->lock_rwsem);
3120 }
3121
3122 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3123                                      void **p)
3124 {
3125         struct rbd_client_id cid = { 0 };
3126
3127         if (struct_v >= 2) {
3128                 cid.gid = ceph_decode_64(p);
3129                 cid.handle = ceph_decode_64(p);
3130         }
3131
3132         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3133              cid.handle);
3134         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3135                 down_write(&rbd_dev->lock_rwsem);
3136                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3137                         /*
3138                          * we already know that the remote client is
3139                          * the owner
3140                          */
3141                         up_write(&rbd_dev->lock_rwsem);
3142                         return;
3143                 }
3144
3145                 rbd_set_owner_cid(rbd_dev, &cid);
3146                 downgrade_write(&rbd_dev->lock_rwsem);
3147         } else {
3148                 down_read(&rbd_dev->lock_rwsem);
3149         }
3150
3151         if (!__rbd_is_lock_owner(rbd_dev))
3152                 wake_requests(rbd_dev, false);
3153         up_read(&rbd_dev->lock_rwsem);
3154 }
3155
3156 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3157                                      void **p)
3158 {
3159         struct rbd_client_id cid = { 0 };
3160
3161         if (struct_v >= 2) {
3162                 cid.gid = ceph_decode_64(p);
3163                 cid.handle = ceph_decode_64(p);
3164         }
3165
3166         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3167              cid.handle);
3168         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3169                 down_write(&rbd_dev->lock_rwsem);
3170                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3171                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3172                              __func__, rbd_dev, cid.gid, cid.handle,
3173                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3174                         up_write(&rbd_dev->lock_rwsem);
3175                         return;
3176                 }
3177
3178                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3179                 downgrade_write(&rbd_dev->lock_rwsem);
3180         } else {
3181                 down_read(&rbd_dev->lock_rwsem);
3182         }
3183
3184         if (!__rbd_is_lock_owner(rbd_dev))
3185                 wake_requests(rbd_dev, false);
3186         up_read(&rbd_dev->lock_rwsem);
3187 }
3188
3189 /*
3190  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3191  * ResponseMessage is needed.
3192  */
3193 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3194                                    void **p)
3195 {
3196         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3197         struct rbd_client_id cid = { 0 };
3198         int result = 1;
3199
3200         if (struct_v >= 2) {
3201                 cid.gid = ceph_decode_64(p);
3202                 cid.handle = ceph_decode_64(p);
3203         }
3204
3205         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3206              cid.handle);
3207         if (rbd_cid_equal(&cid, &my_cid))
3208                 return result;
3209
3210         down_read(&rbd_dev->lock_rwsem);
3211         if (__rbd_is_lock_owner(rbd_dev)) {
3212                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3213                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3214                         goto out_unlock;
3215
3216                 /*
3217                  * encode ResponseMessage(0) so the peer can detect
3218                  * a missing owner
3219                  */
3220                 result = 0;
3221
3222                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3223                         if (!rbd_dev->opts->exclusive) {
3224                                 dout("%s rbd_dev %p queueing unlock_work\n",
3225                                      __func__, rbd_dev);
3226                                 queue_work(rbd_dev->task_wq,
3227                                            &rbd_dev->unlock_work);
3228                         } else {
3229                                 /* refuse to release the lock */
3230                                 result = -EROFS;
3231                         }
3232                 }
3233         }
3234
3235 out_unlock:
3236         up_read(&rbd_dev->lock_rwsem);
3237         return result;
3238 }
3239
3240 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3241                                      u64 notify_id, u64 cookie, s32 *result)
3242 {
3243         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3244         char buf[4 + CEPH_ENCODING_START_BLK_LEN];
3245         int buf_size = sizeof(buf);
3246         int ret;
3247
3248         if (result) {
3249                 void *p = buf;
3250
3251                 /* encode ResponseMessage */
3252                 ceph_start_encoding(&p, 1, 1,
3253                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
3254                 ceph_encode_32(&p, *result);
3255         } else {
3256                 buf_size = 0;
3257         }
3258
3259         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3260                                    &rbd_dev->header_oloc, notify_id, cookie,
3261                                    buf, buf_size);
3262         if (ret)
3263                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3264 }
3265
3266 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3267                                    u64 cookie)
3268 {
3269         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3270         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3271 }
3272
3273 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3274                                           u64 notify_id, u64 cookie, s32 result)
3275 {
3276         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3277         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3278 }
3279
3280 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3281                          u64 notifier_id, void *data, size_t data_len)
3282 {
3283         struct rbd_device *rbd_dev = arg;
3284         void *p = data;
3285         void *const end = p + data_len;
3286         u8 struct_v = 0;
3287         u32 len;
3288         u32 notify_op;
3289         int ret;
3290
3291         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3292              __func__, rbd_dev, cookie, notify_id, data_len);
3293         if (data_len) {
3294                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3295                                           &struct_v, &len);
3296                 if (ret) {
3297                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3298                                  ret);
3299                         return;
3300                 }
3301
3302                 notify_op = ceph_decode_32(&p);
3303         } else {
3304                 /* legacy notification for header updates */
3305                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3306                 len = 0;
3307         }
3308
3309         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3310         switch (notify_op) {
3311         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3312                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3313                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3314                 break;
3315         case RBD_NOTIFY_OP_RELEASED_LOCK:
3316                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3317                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3318                 break;
3319         case RBD_NOTIFY_OP_REQUEST_LOCK:
3320                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3321                 if (ret <= 0)
3322                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3323                                                       cookie, ret);
3324                 else
3325                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3326                 break;
3327         case RBD_NOTIFY_OP_HEADER_UPDATE:
3328                 ret = rbd_dev_refresh(rbd_dev);
3329                 if (ret)
3330                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
3331
3332                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3333                 break;
3334         default:
3335                 if (rbd_is_lock_owner(rbd_dev))
3336                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
3337                                                       cookie, -EOPNOTSUPP);
3338                 else
3339                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3340                 break;
3341         }
3342 }
3343
3344 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3345
3346 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3347 {
3348         struct rbd_device *rbd_dev = arg;
3349
3350         rbd_warn(rbd_dev, "encountered watch error: %d", err);
3351
3352         down_write(&rbd_dev->lock_rwsem);
3353         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3354         up_write(&rbd_dev->lock_rwsem);
3355
3356         mutex_lock(&rbd_dev->watch_mutex);
3357         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3358                 __rbd_unregister_watch(rbd_dev);
3359                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3360
3361                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3362         }
3363         mutex_unlock(&rbd_dev->watch_mutex);
3364 }
3365
3366 /*
3367  * watch_mutex must be locked
3368  */
3369 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3370 {
3371         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3372         struct ceph_osd_linger_request *handle;
3373
3374         rbd_assert(!rbd_dev->watch_handle);
3375         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3376
3377         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3378                                  &rbd_dev->header_oloc, rbd_watch_cb,
3379                                  rbd_watch_errcb, rbd_dev);
3380         if (IS_ERR(handle))
3381                 return PTR_ERR(handle);
3382
3383         rbd_dev->watch_handle = handle;
3384         return 0;
3385 }
3386
3387 /*
3388  * watch_mutex must be locked
3389  */
3390 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3391 {
3392         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3393         int ret;
3394
3395         rbd_assert(rbd_dev->watch_handle);
3396         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3397
3398         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3399         if (ret)
3400                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3401
3402         rbd_dev->watch_handle = NULL;
3403 }
3404
3405 static int rbd_register_watch(struct rbd_device *rbd_dev)
3406 {
3407         int ret;
3408
3409         mutex_lock(&rbd_dev->watch_mutex);
3410         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3411         ret = __rbd_register_watch(rbd_dev);
3412         if (ret)
3413                 goto out;
3414
3415         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3416         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3417
3418 out:
3419         mutex_unlock(&rbd_dev->watch_mutex);
3420         return ret;
3421 }
3422
3423 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3424 {
3425         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3426
3427         cancel_work_sync(&rbd_dev->acquired_lock_work);
3428         cancel_work_sync(&rbd_dev->released_lock_work);
3429         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3430         cancel_work_sync(&rbd_dev->unlock_work);
3431 }
3432
3433 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3434 {
3435         WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3436         cancel_tasks_sync(rbd_dev);
3437
3438         mutex_lock(&rbd_dev->watch_mutex);
3439         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3440                 __rbd_unregister_watch(rbd_dev);
3441         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3442         mutex_unlock(&rbd_dev->watch_mutex);
3443
3444         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3445         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3446 }
3447
3448 /*
3449  * lock_rwsem must be held for write
3450  */
3451 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3452 {
3453         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3454         char cookie[32];
3455         int ret;
3456
3457         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3458
3459         format_lock_cookie(rbd_dev, cookie);
3460         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3461                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
3462                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3463                                   RBD_LOCK_TAG, cookie);
3464         if (ret) {
3465                 if (ret != -EOPNOTSUPP)
3466                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3467                                  ret);
3468
3469                 /*
3470                  * Lock cookie cannot be updated on older OSDs, so do
3471                  * a manual release and queue an acquire.
3472                  */
3473                 if (rbd_release_lock(rbd_dev))
3474                         queue_delayed_work(rbd_dev->task_wq,
3475                                            &rbd_dev->lock_dwork, 0);
3476         } else {
3477                 __rbd_lock(rbd_dev, cookie);
3478         }
3479 }
3480
3481 static void rbd_reregister_watch(struct work_struct *work)
3482 {
3483         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3484                                             struct rbd_device, watch_dwork);
3485         int ret;
3486
3487         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3488
3489         mutex_lock(&rbd_dev->watch_mutex);
3490         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3491                 mutex_unlock(&rbd_dev->watch_mutex);
3492                 return;
3493         }
3494
3495         ret = __rbd_register_watch(rbd_dev);
3496         if (ret) {
3497                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3498                 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3499                         set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3500                         wake_requests(rbd_dev, true);
3501                 } else {
3502                         queue_delayed_work(rbd_dev->task_wq,
3503                                            &rbd_dev->watch_dwork,
3504                                            RBD_RETRY_DELAY);
3505                 }
3506                 mutex_unlock(&rbd_dev->watch_mutex);
3507                 return;
3508         }
3509
3510         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3511         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3512         mutex_unlock(&rbd_dev->watch_mutex);
3513
3514         down_write(&rbd_dev->lock_rwsem);
3515         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3516                 rbd_reacquire_lock(rbd_dev);
3517         up_write(&rbd_dev->lock_rwsem);
3518
3519         ret = rbd_dev_refresh(rbd_dev);
3520         if (ret)
3521                 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
3522 }
3523
3524 /*
3525  * Synchronous osd object method call.  Returns the number of bytes
3526  * returned in the outbound buffer, or a negative error code.
3527  */
3528 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3529                              struct ceph_object_id *oid,
3530                              struct ceph_object_locator *oloc,
3531                              const char *method_name,
3532                              const void *outbound,
3533                              size_t outbound_size,
3534                              void *inbound,
3535                              size_t inbound_size)
3536 {
3537         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3538         struct page *req_page = NULL;
3539         struct page *reply_page;
3540         int ret;
3541
3542         /*
3543          * Method calls are ultimately read operations.  The result
3544          * should placed into the inbound buffer provided.  They
3545          * also supply outbound data--parameters for the object
3546          * method.  Currently if this is present it will be a
3547          * snapshot id.
3548          */
3549         if (outbound) {
3550                 if (outbound_size > PAGE_SIZE)
3551                         return -E2BIG;
3552
3553                 req_page = alloc_page(GFP_KERNEL);
3554                 if (!req_page)
3555                         return -ENOMEM;
3556
3557                 memcpy(page_address(req_page), outbound, outbound_size);
3558         }
3559
3560         reply_page = alloc_page(GFP_KERNEL);
3561         if (!reply_page) {
3562                 if (req_page)
3563                         __free_page(req_page);
3564                 return -ENOMEM;
3565         }
3566
3567         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3568                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
3569                              reply_page, &inbound_size);
3570         if (!ret) {
3571                 memcpy(inbound, page_address(reply_page), inbound_size);
3572                 ret = inbound_size;
3573         }
3574
3575         if (req_page)
3576                 __free_page(req_page);
3577         __free_page(reply_page);
3578         return ret;
3579 }
3580
3581 /*
3582  * lock_rwsem must be held for read
3583  */
3584 static int rbd_wait_state_locked(struct rbd_device *rbd_dev, bool may_acquire)
3585 {
3586         DEFINE_WAIT(wait);
3587         unsigned long timeout;
3588         int ret = 0;
3589
3590         if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
3591                 return -EBLACKLISTED;
3592
3593         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3594                 return 0;
3595
3596         if (!may_acquire) {
3597                 rbd_warn(rbd_dev, "exclusive lock required");
3598                 return -EROFS;
3599         }
3600
3601         do {
3602                 /*
3603                  * Note the use of mod_delayed_work() in rbd_acquire_lock()
3604                  * and cancel_delayed_work() in wake_requests().
3605                  */
3606                 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3607                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3608                 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3609                                           TASK_UNINTERRUPTIBLE);
3610                 up_read(&rbd_dev->lock_rwsem);
3611                 timeout = schedule_timeout(ceph_timeout_jiffies(
3612                                                 rbd_dev->opts->lock_timeout));
3613                 down_read(&rbd_dev->lock_rwsem);
3614                 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3615                         ret = -EBLACKLISTED;
3616                         break;
3617                 }
3618                 if (!timeout) {
3619                         rbd_warn(rbd_dev, "timed out waiting for lock");
3620                         ret = -ETIMEDOUT;
3621                         break;
3622                 }
3623         } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3624
3625         finish_wait(&rbd_dev->lock_waitq, &wait);
3626         return ret;
3627 }
3628
3629 static void rbd_queue_workfn(struct work_struct *work)
3630 {
3631         struct request *rq = blk_mq_rq_from_pdu(work);
3632         struct rbd_device *rbd_dev = rq->q->queuedata;
3633         struct rbd_img_request *img_request;
3634         struct ceph_snap_context *snapc = NULL;
3635         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3636         u64 length = blk_rq_bytes(rq);
3637         enum obj_operation_type op_type;
3638         u64 mapping_size;
3639         bool must_be_locked;
3640         int result;
3641
3642         switch (req_op(rq)) {
3643         case REQ_OP_DISCARD:
3644         case REQ_OP_WRITE_ZEROES:
3645                 op_type = OBJ_OP_DISCARD;
3646                 break;
3647         case REQ_OP_WRITE:
3648                 op_type = OBJ_OP_WRITE;
3649                 break;
3650         case REQ_OP_READ:
3651                 op_type = OBJ_OP_READ;
3652                 break;
3653         default:
3654                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3655                 result = -EIO;
3656                 goto err;
3657         }
3658
3659         /* Ignore/skip any zero-length requests */
3660
3661         if (!length) {
3662                 dout("%s: zero-length request\n", __func__);
3663                 result = 0;
3664                 goto err_rq;
3665         }
3666
3667         rbd_assert(op_type == OBJ_OP_READ ||
3668                    rbd_dev->spec->snap_id == CEPH_NOSNAP);
3669
3670         /*
3671          * Quit early if the mapped snapshot no longer exists.  It's
3672          * still possible the snapshot will have disappeared by the
3673          * time our request arrives at the osd, but there's no sense in
3674          * sending it if we already know.
3675          */
3676         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3677                 dout("request for non-existent snapshot");
3678                 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3679                 result = -ENXIO;
3680                 goto err_rq;
3681         }
3682
3683         if (offset && length > U64_MAX - offset + 1) {
3684                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3685                          length);
3686                 result = -EINVAL;
3687                 goto err_rq;    /* Shouldn't happen */
3688         }
3689
3690         blk_mq_start_request(rq);
3691
3692         down_read(&rbd_dev->header_rwsem);
3693         mapping_size = rbd_dev->mapping.size;
3694         if (op_type != OBJ_OP_READ) {
3695                 snapc = rbd_dev->header.snapc;
3696                 ceph_get_snap_context(snapc);
3697         }
3698         up_read(&rbd_dev->header_rwsem);
3699
3700         if (offset + length > mapping_size) {
3701                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3702                          length, mapping_size);
3703                 result = -EIO;
3704                 goto err_rq;
3705         }
3706
3707         must_be_locked =
3708             (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3709             (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
3710         if (must_be_locked) {
3711                 down_read(&rbd_dev->lock_rwsem);
3712                 result = rbd_wait_state_locked(rbd_dev,
3713                                                !rbd_dev->opts->exclusive);
3714                 if (result)
3715                         goto err_unlock;
3716         }
3717
3718         img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
3719         if (!img_request) {
3720                 result = -ENOMEM;
3721                 goto err_unlock;
3722         }
3723         img_request->rq = rq;
3724         snapc = NULL; /* img_request consumes a ref */
3725
3726         if (op_type == OBJ_OP_DISCARD)
3727                 result = rbd_img_fill_nodata(img_request, offset, length);
3728         else
3729                 result = rbd_img_fill_from_bio(img_request, offset, length,
3730                                                rq->bio);
3731         if (result)
3732                 goto err_img_request;
3733
3734         rbd_img_request_submit(img_request);
3735         if (must_be_locked)
3736                 up_read(&rbd_dev->lock_rwsem);
3737         return;
3738
3739 err_img_request:
3740         rbd_img_request_put(img_request);
3741 err_unlock:
3742         if (must_be_locked)
3743                 up_read(&rbd_dev->lock_rwsem);
3744 err_rq:
3745         if (result)
3746                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3747                          obj_op_name(op_type), length, offset, result);
3748         ceph_put_snap_context(snapc);
3749 err:
3750         blk_mq_end_request(rq, errno_to_blk_status(result));
3751 }
3752
3753 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3754                 const struct blk_mq_queue_data *bd)
3755 {
3756         struct request *rq = bd->rq;
3757         struct work_struct *work = blk_mq_rq_to_pdu(rq);
3758
3759         queue_work(rbd_wq, work);
3760         return BLK_STS_OK;
3761 }
3762
3763 static void rbd_free_disk(struct rbd_device *rbd_dev)
3764 {
3765         blk_cleanup_queue(rbd_dev->disk->queue);
3766         blk_mq_free_tag_set(&rbd_dev->tag_set);
3767         put_disk(rbd_dev->disk);
3768         rbd_dev->disk = NULL;
3769 }
3770
3771 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3772                              struct ceph_object_id *oid,
3773                              struct ceph_object_locator *oloc,
3774                              void *buf, int buf_len)
3775
3776 {
3777         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3778         struct ceph_osd_request *req;
3779         struct page **pages;
3780         int num_pages = calc_pages_for(0, buf_len);
3781         int ret;
3782
3783         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
3784         if (!req)
3785                 return -ENOMEM;
3786
3787         ceph_oid_copy(&req->r_base_oid, oid);
3788         ceph_oloc_copy(&req->r_base_oloc, oloc);
3789         req->r_flags = CEPH_OSD_FLAG_READ;
3790
3791         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
3792         if (IS_ERR(pages)) {
3793                 ret = PTR_ERR(pages);
3794                 goto out_req;
3795         }
3796
3797         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
3798         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
3799                                          true);
3800
3801         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
3802         if (ret)
3803                 goto out_req;
3804
3805         ceph_osdc_start_request(osdc, req, false);
3806         ret = ceph_osdc_wait_request(osdc, req);
3807         if (ret >= 0)
3808                 ceph_copy_from_page_vector(pages, buf, 0, ret);
3809
3810 out_req:
3811         ceph_osdc_put_request(req);
3812         return ret;
3813 }
3814
3815 /*
3816  * Read the complete header for the given rbd device.  On successful
3817  * return, the rbd_dev->header field will contain up-to-date
3818  * information about the image.
3819  */
3820 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3821 {
3822         struct rbd_image_header_ondisk *ondisk = NULL;
3823         u32 snap_count = 0;
3824         u64 names_size = 0;
3825         u32 want_count;
3826         int ret;
3827
3828         /*
3829          * The complete header will include an array of its 64-bit
3830          * snapshot ids, followed by the names of those snapshots as
3831          * a contiguous block of NUL-terminated strings.  Note that
3832          * the number of snapshots could change by the time we read
3833          * it in, in which case we re-read it.
3834          */
3835         do {
3836                 size_t size;
3837
3838                 kfree(ondisk);
3839
3840                 size = sizeof (*ondisk);
3841                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3842                 size += names_size;
3843                 ondisk = kmalloc(size, GFP_KERNEL);
3844                 if (!ondisk)
3845                         return -ENOMEM;
3846
3847                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
3848                                         &rbd_dev->header_oloc, ondisk, size);
3849                 if (ret < 0)
3850                         goto out;
3851                 if ((size_t)ret < size) {
3852                         ret = -ENXIO;
3853                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3854                                 size, ret);
3855                         goto out;
3856                 }
3857                 if (!rbd_dev_ondisk_valid(ondisk)) {
3858                         ret = -ENXIO;
3859                         rbd_warn(rbd_dev, "invalid header");
3860                         goto out;
3861                 }
3862
3863                 names_size = le64_to_cpu(ondisk->snap_names_len);
3864                 want_count = snap_count;
3865                 snap_count = le32_to_cpu(ondisk->snap_count);
3866         } while (snap_count != want_count);
3867
3868         ret = rbd_header_from_disk(rbd_dev, ondisk);
3869 out:
3870         kfree(ondisk);
3871
3872         return ret;
3873 }
3874
3875 /*
3876  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3877  * has disappeared from the (just updated) snapshot context.
3878  */
3879 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3880 {
3881         u64 snap_id;
3882
3883         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3884                 return;
3885
3886         snap_id = rbd_dev->spec->snap_id;
3887         if (snap_id == CEPH_NOSNAP)
3888                 return;
3889
3890         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3891                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3892 }
3893
3894 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3895 {
3896         sector_t size;
3897
3898         /*
3899          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
3900          * try to update its size.  If REMOVING is set, updating size
3901          * is just useless work since the device can't be opened.
3902          */
3903         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
3904             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
3905                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3906                 dout("setting size to %llu sectors", (unsigned long long)size);
3907                 set_capacity(rbd_dev->disk, size);
3908                 revalidate_disk(rbd_dev->disk);
3909         }
3910 }
3911
3912 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3913 {
3914         u64 mapping_size;
3915         int ret;
3916
3917         down_write(&rbd_dev->header_rwsem);
3918         mapping_size = rbd_dev->mapping.size;
3919
3920         ret = rbd_dev_header_info(rbd_dev);
3921         if (ret)
3922                 goto out;
3923
3924         /*
3925          * If there is a parent, see if it has disappeared due to the
3926          * mapped image getting flattened.
3927          */
3928         if (rbd_dev->parent) {
3929                 ret = rbd_dev_v2_parent_info(rbd_dev);
3930                 if (ret)
3931                         goto out;
3932         }
3933
3934         if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3935                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3936         } else {
3937                 /* validate mapped snapshot's EXISTS flag */
3938                 rbd_exists_validate(rbd_dev);
3939         }
3940
3941 out:
3942         up_write(&rbd_dev->header_rwsem);
3943         if (!ret && mapping_size != rbd_dev->mapping.size)
3944                 rbd_dev_update_size(rbd_dev);
3945
3946         return ret;
3947 }
3948
3949 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
3950                 unsigned int hctx_idx, unsigned int numa_node)
3951 {
3952         struct work_struct *work = blk_mq_rq_to_pdu(rq);
3953
3954         INIT_WORK(work, rbd_queue_workfn);
3955         return 0;
3956 }
3957
3958 static const struct blk_mq_ops rbd_mq_ops = {
3959         .queue_rq       = rbd_queue_rq,
3960         .init_request   = rbd_init_request,
3961 };
3962
3963 static int rbd_init_disk(struct rbd_device *rbd_dev)
3964 {
3965         struct gendisk *disk;
3966         struct request_queue *q;
3967         unsigned int objset_bytes =
3968             rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
3969         int err;
3970
3971         /* create gendisk info */
3972         disk = alloc_disk(single_major ?
3973                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3974                           RBD_MINORS_PER_MAJOR);
3975         if (!disk)
3976                 return -ENOMEM;
3977
3978         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3979                  rbd_dev->dev_id);
3980         disk->major = rbd_dev->major;
3981         disk->first_minor = rbd_dev->minor;
3982         if (single_major)
3983                 disk->flags |= GENHD_FL_EXT_DEVT;
3984         disk->fops = &rbd_bd_ops;
3985         disk->private_data = rbd_dev;
3986
3987         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3988         rbd_dev->tag_set.ops = &rbd_mq_ops;
3989         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
3990         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
3991         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
3992         rbd_dev->tag_set.nr_hw_queues = 1;
3993         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3994
3995         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3996         if (err)
3997                 goto out_disk;
3998
3999         q = blk_mq_init_queue(&rbd_dev->tag_set);
4000         if (IS_ERR(q)) {
4001                 err = PTR_ERR(q);
4002                 goto out_tag_set;
4003         }
4004
4005         blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
4006         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4007
4008         blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
4009         q->limits.max_sectors = queue_max_hw_sectors(q);
4010         blk_queue_max_segments(q, USHRT_MAX);
4011         blk_queue_max_segment_size(q, UINT_MAX);
4012         blk_queue_io_min(q, objset_bytes);
4013         blk_queue_io_opt(q, objset_bytes);
4014
4015         if (rbd_dev->opts->trim) {
4016                 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
4017                 q->limits.discard_granularity = objset_bytes;
4018                 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
4019                 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
4020         }
4021
4022         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4023                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
4024
4025         /*
4026          * disk_release() expects a queue ref from add_disk() and will
4027          * put it.  Hold an extra ref until add_disk() is called.
4028          */
4029         WARN_ON(!blk_get_queue(q));
4030         disk->queue = q;
4031         q->queuedata = rbd_dev;
4032
4033         rbd_dev->disk = disk;
4034
4035         return 0;
4036 out_tag_set:
4037         blk_mq_free_tag_set(&rbd_dev->tag_set);
4038 out_disk:
4039         put_disk(disk);
4040         return err;
4041 }
4042
4043 /*
4044   sysfs
4045 */
4046
4047 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4048 {
4049         return container_of(dev, struct rbd_device, dev);
4050 }
4051
4052 static ssize_t rbd_size_show(struct device *dev,
4053                              struct device_attribute *attr, char *buf)
4054 {
4055         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4056
4057         return sprintf(buf, "%llu\n",
4058                 (unsigned long long)rbd_dev->mapping.size);
4059 }
4060
4061 /*
4062  * Note this shows the features for whatever's mapped, which is not
4063  * necessarily the base image.
4064  */
4065 static ssize_t rbd_features_show(struct device *dev,
4066                              struct device_attribute *attr, char *buf)
4067 {
4068         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4069
4070         return sprintf(buf, "0x%016llx\n",
4071                         (unsigned long long)rbd_dev->mapping.features);
4072 }
4073
4074 static ssize_t rbd_major_show(struct device *dev,
4075                               struct device_attribute *attr, char *buf)
4076 {
4077         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4078
4079         if (rbd_dev->major)
4080                 return sprintf(buf, "%d\n", rbd_dev->major);
4081
4082         return sprintf(buf, "(none)\n");
4083 }
4084
4085 static ssize_t rbd_minor_show(struct device *dev,
4086                               struct device_attribute *attr, char *buf)
4087 {
4088         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4089
4090         return sprintf(buf, "%d\n", rbd_dev->minor);
4091 }
4092
4093 static ssize_t rbd_client_addr_show(struct device *dev,
4094                                     struct device_attribute *attr, char *buf)
4095 {
4096         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4097         struct ceph_entity_addr *client_addr =
4098             ceph_client_addr(rbd_dev->rbd_client->client);
4099
4100         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4101                        le32_to_cpu(client_addr->nonce));
4102 }
4103
4104 static ssize_t rbd_client_id_show(struct device *dev,
4105                                   struct device_attribute *attr, char *buf)
4106 {
4107         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4108
4109         return sprintf(buf, "client%lld\n",
4110                        ceph_client_gid(rbd_dev->rbd_client->client));
4111 }
4112
4113 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4114                                      struct device_attribute *attr, char *buf)
4115 {
4116         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4117
4118         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4119 }
4120
4121 static ssize_t rbd_config_info_show(struct device *dev,
4122                                     struct device_attribute *attr, char *buf)
4123 {
4124         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4125
4126         return sprintf(buf, "%s\n", rbd_dev->config_info);
4127 }
4128
4129 static ssize_t rbd_pool_show(struct device *dev,
4130                              struct device_attribute *attr, char *buf)
4131 {
4132         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4133
4134         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4135 }
4136
4137 static ssize_t rbd_pool_id_show(struct device *dev,
4138                              struct device_attribute *attr, char *buf)
4139 {
4140         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4141
4142         return sprintf(buf, "%llu\n",
4143                         (unsigned long long) rbd_dev->spec->pool_id);
4144 }
4145
4146 static ssize_t rbd_pool_ns_show(struct device *dev,
4147                                 struct device_attribute *attr, char *buf)
4148 {
4149         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4150
4151         return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
4152 }
4153
4154 static ssize_t rbd_name_show(struct device *dev,
4155                              struct device_attribute *attr, char *buf)
4156 {
4157         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4158
4159         if (rbd_dev->spec->image_name)
4160                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4161
4162         return sprintf(buf, "(unknown)\n");
4163 }
4164
4165 static ssize_t rbd_image_id_show(struct device *dev,
4166                              struct device_attribute *attr, char *buf)
4167 {
4168         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4169
4170         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4171 }
4172
4173 /*
4174  * Shows the name of the currently-mapped snapshot (or
4175  * RBD_SNAP_HEAD_NAME for the base image).
4176  */
4177 static ssize_t rbd_snap_show(struct device *dev,
4178                              struct device_attribute *attr,
4179                              char *buf)
4180 {
4181         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4182
4183         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4184 }
4185
4186 static ssize_t rbd_snap_id_show(struct device *dev,
4187                                 struct device_attribute *attr, char *buf)
4188 {
4189         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4190
4191         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4192 }
4193
4194 /*
4195  * For a v2 image, shows the chain of parent images, separated by empty
4196  * lines.  For v1 images or if there is no parent, shows "(no parent
4197  * image)".
4198  */
4199 static ssize_t rbd_parent_show(struct device *dev,
4200                                struct device_attribute *attr,
4201                                char *buf)
4202 {
4203         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4204         ssize_t count = 0;
4205
4206         if (!rbd_dev->parent)
4207                 return sprintf(buf, "(no parent image)\n");
4208
4209         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4210                 struct rbd_spec *spec = rbd_dev->parent_spec;
4211
4212                 count += sprintf(&buf[count], "%s"
4213                             "pool_id %llu\npool_name %s\n"
4214                             "pool_ns %s\n"
4215                             "image_id %s\nimage_name %s\n"
4216                             "snap_id %llu\nsnap_name %s\n"
4217                             "overlap %llu\n",
4218                             !count ? "" : "\n", /* first? */
4219                             spec->pool_id, spec->pool_name,
4220                             spec->pool_ns ?: "",
4221                             spec->image_id, spec->image_name ?: "(unknown)",
4222                             spec->snap_id, spec->snap_name,
4223                             rbd_dev->parent_overlap);
4224         }
4225
4226         return count;
4227 }
4228
4229 static ssize_t rbd_image_refresh(struct device *dev,
4230                                  struct device_attribute *attr,
4231                                  const char *buf,
4232                                  size_t size)
4233 {
4234         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4235         int ret;
4236
4237         ret = rbd_dev_refresh(rbd_dev);
4238         if (ret)
4239                 return ret;
4240
4241         return size;
4242 }
4243
4244 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
4245 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
4246 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
4247 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
4248 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
4249 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
4250 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
4251 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
4252 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
4253 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
4254 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
4255 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
4256 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
4257 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
4258 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
4259 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
4260 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
4261
4262 static struct attribute *rbd_attrs[] = {
4263         &dev_attr_size.attr,
4264         &dev_attr_features.attr,
4265         &dev_attr_major.attr,
4266         &dev_attr_minor.attr,
4267         &dev_attr_client_addr.attr,
4268         &dev_attr_client_id.attr,
4269         &dev_attr_cluster_fsid.attr,
4270         &dev_attr_config_info.attr,
4271         &dev_attr_pool.attr,
4272         &dev_attr_pool_id.attr,
4273         &dev_attr_pool_ns.attr,
4274         &dev_attr_name.attr,
4275         &dev_attr_image_id.attr,
4276         &dev_attr_current_snap.attr,
4277         &dev_attr_snap_id.attr,
4278         &dev_attr_parent.attr,
4279         &dev_attr_refresh.attr,
4280         NULL
4281 };
4282
4283 static struct attribute_group rbd_attr_group = {
4284         .attrs = rbd_attrs,
4285 };
4286
4287 static const struct attribute_group *rbd_attr_groups[] = {
4288         &rbd_attr_group,
4289         NULL
4290 };
4291
4292 static void rbd_dev_release(struct device *dev);
4293
4294 static const struct device_type rbd_device_type = {
4295         .name           = "rbd",
4296         .groups         = rbd_attr_groups,
4297         .release        = rbd_dev_release,
4298 };
4299
4300 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4301 {
4302         kref_get(&spec->kref);
4303
4304         return spec;
4305 }
4306
4307 static void rbd_spec_free(struct kref *kref);
4308 static void rbd_spec_put(struct rbd_spec *spec)
4309 {
4310         if (spec)
4311                 kref_put(&spec->kref, rbd_spec_free);
4312 }
4313
4314 static struct rbd_spec *rbd_spec_alloc(void)
4315 {
4316         struct rbd_spec *spec;
4317
4318         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4319         if (!spec)
4320                 return NULL;
4321
4322         spec->pool_id = CEPH_NOPOOL;
4323         spec->snap_id = CEPH_NOSNAP;
4324         kref_init(&spec->kref);
4325
4326         return spec;
4327 }
4328
4329 static void rbd_spec_free(struct kref *kref)
4330 {
4331         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4332
4333         kfree(spec->pool_name);
4334         kfree(spec->pool_ns);
4335         kfree(spec->image_id);
4336         kfree(spec->image_name);
4337         kfree(spec->snap_name);
4338         kfree(spec);
4339 }
4340
4341 static void rbd_dev_free(struct rbd_device *rbd_dev)
4342 {
4343         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4344         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4345
4346         ceph_oid_destroy(&rbd_dev->header_oid);
4347         ceph_oloc_destroy(&rbd_dev->header_oloc);
4348         kfree(rbd_dev->config_info);
4349
4350         rbd_put_client(rbd_dev->rbd_client);
4351         rbd_spec_put(rbd_dev->spec);
4352         kfree(rbd_dev->opts);
4353         kfree(rbd_dev);
4354 }
4355
4356 static void rbd_dev_release(struct device *dev)
4357 {
4358         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4359         bool need_put = !!rbd_dev->opts;
4360
4361         if (need_put) {
4362                 destroy_workqueue(rbd_dev->task_wq);
4363                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4364         }
4365
4366         rbd_dev_free(rbd_dev);
4367
4368         /*
4369          * This is racy, but way better than putting module outside of
4370          * the release callback.  The race window is pretty small, so
4371          * doing something similar to dm (dm-builtin.c) is overkill.
4372          */
4373         if (need_put)
4374                 module_put(THIS_MODULE);
4375 }
4376
4377 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4378                                            struct rbd_spec *spec)
4379 {
4380         struct rbd_device *rbd_dev;
4381
4382         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4383         if (!rbd_dev)
4384                 return NULL;
4385
4386         spin_lock_init(&rbd_dev->lock);
4387         INIT_LIST_HEAD(&rbd_dev->node);
4388         init_rwsem(&rbd_dev->header_rwsem);
4389
4390         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4391         ceph_oid_init(&rbd_dev->header_oid);
4392         rbd_dev->header_oloc.pool = spec->pool_id;
4393         if (spec->pool_ns) {
4394                 WARN_ON(!*spec->pool_ns);
4395                 rbd_dev->header_oloc.pool_ns =
4396                     ceph_find_or_create_string(spec->pool_ns,
4397                                                strlen(spec->pool_ns));
4398         }
4399
4400         mutex_init(&rbd_dev->watch_mutex);
4401         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4402         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4403
4404         init_rwsem(&rbd_dev->lock_rwsem);
4405         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4406         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4407         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4408         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4409         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4410         init_waitqueue_head(&rbd_dev->lock_waitq);
4411
4412         rbd_dev->dev.bus = &rbd_bus_type;
4413         rbd_dev->dev.type = &rbd_device_type;
4414         rbd_dev->dev.parent = &rbd_root_dev;
4415         device_initialize(&rbd_dev->dev);
4416
4417         rbd_dev->rbd_client = rbdc;
4418         rbd_dev->spec = spec;
4419
4420         return rbd_dev;
4421 }
4422
4423 /*
4424  * Create a mapping rbd_dev.
4425  */
4426 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4427                                          struct rbd_spec *spec,
4428                                          struct rbd_options *opts)
4429 {
4430         struct rbd_device *rbd_dev;
4431
4432         rbd_dev = __rbd_dev_create(rbdc, spec);
4433         if (!rbd_dev)
4434                 return NULL;
4435
4436         rbd_dev->opts = opts;
4437
4438         /* get an id and fill in device name */
4439         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4440                                          minor_to_rbd_dev_id(1 << MINORBITS),
4441                                          GFP_KERNEL);
4442         if (rbd_dev->dev_id < 0)
4443                 goto fail_rbd_dev;
4444
4445         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4446         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4447                                                    rbd_dev->name);
4448         if (!rbd_dev->task_wq)
4449                 goto fail_dev_id;
4450
4451         /* we have a ref from do_rbd_add() */
4452         __module_get(THIS_MODULE);
4453
4454         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4455         return rbd_dev;
4456
4457 fail_dev_id:
4458         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4459 fail_rbd_dev:
4460         rbd_dev_free(rbd_dev);
4461         return NULL;
4462 }
4463
4464 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4465 {
4466         if (rbd_dev)
4467                 put_device(&rbd_dev->dev);
4468 }
4469
4470 /*
4471  * Get the size and object order for an image snapshot, or if
4472  * snap_id is CEPH_NOSNAP, gets this information for the base
4473  * image.
4474  */
4475 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4476                                 u8 *order, u64 *snap_size)
4477 {
4478         __le64 snapid = cpu_to_le64(snap_id);
4479         int ret;
4480         struct {
4481                 u8 order;
4482                 __le64 size;
4483         } __attribute__ ((packed)) size_buf = { 0 };
4484
4485         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4486                                   &rbd_dev->header_oloc, "get_size",
4487                                   &snapid, sizeof(snapid),
4488                                   &size_buf, sizeof(size_buf));
4489         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4490         if (ret < 0)
4491                 return ret;
4492         if (ret < sizeof (size_buf))
4493                 return -ERANGE;
4494
4495         if (order) {
4496                 *order = size_buf.order;
4497                 dout("  order %u", (unsigned int)*order);
4498         }
4499         *snap_size = le64_to_cpu(size_buf.size);
4500
4501         dout("  snap_id 0x%016llx snap_size = %llu\n",
4502                 (unsigned long long)snap_id,
4503                 (unsigned long long)*snap_size);
4504
4505         return 0;
4506 }
4507
4508 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4509 {
4510         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4511                                         &rbd_dev->header.obj_order,
4512                                         &rbd_dev->header.image_size);
4513 }
4514
4515 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4516 {
4517         void *reply_buf;
4518         int ret;
4519         void *p;
4520
4521         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4522         if (!reply_buf)
4523                 return -ENOMEM;
4524
4525         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4526                                   &rbd_dev->header_oloc, "get_object_prefix",
4527                                   NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4528         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4529         if (ret < 0)
4530                 goto out;
4531
4532         p = reply_buf;
4533         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4534                                                 p + ret, NULL, GFP_NOIO);
4535         ret = 0;
4536
4537         if (IS_ERR(rbd_dev->header.object_prefix)) {
4538                 ret = PTR_ERR(rbd_dev->header.object_prefix);
4539                 rbd_dev->header.object_prefix = NULL;
4540         } else {
4541                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4542         }
4543 out:
4544         kfree(reply_buf);
4545
4546         return ret;
4547 }
4548
4549 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4550                 u64 *snap_features)
4551 {
4552         __le64 snapid = cpu_to_le64(snap_id);
4553         struct {
4554                 __le64 features;
4555                 __le64 incompat;
4556         } __attribute__ ((packed)) features_buf = { 0 };
4557         u64 unsup;
4558         int ret;
4559
4560         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4561                                   &rbd_dev->header_oloc, "get_features",
4562                                   &snapid, sizeof(snapid),
4563                                   &features_buf, sizeof(features_buf));
4564         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4565         if (ret < 0)
4566                 return ret;
4567         if (ret < sizeof (features_buf))
4568                 return -ERANGE;
4569
4570         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4571         if (unsup) {
4572                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4573                          unsup);
4574                 return -ENXIO;
4575         }
4576
4577         *snap_features = le64_to_cpu(features_buf.features);
4578
4579         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4580                 (unsigned long long)snap_id,
4581                 (unsigned long long)*snap_features,
4582                 (unsigned long long)le64_to_cpu(features_buf.incompat));
4583
4584         return 0;
4585 }
4586
4587 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4588 {
4589         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4590                                                 &rbd_dev->header.features);
4591 }
4592
4593 struct parent_image_info {
4594         u64             pool_id;
4595         const char      *pool_ns;
4596         const char      *image_id;
4597         u64             snap_id;
4598
4599         bool            has_overlap;
4600         u64             overlap;
4601 };
4602
4603 /*
4604  * The caller is responsible for @pii.
4605  */
4606 static int decode_parent_image_spec(void **p, void *end,
4607                                     struct parent_image_info *pii)
4608 {
4609         u8 struct_v;
4610         u32 struct_len;
4611         int ret;
4612
4613         ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
4614                                   &struct_v, &struct_len);
4615         if (ret)
4616                 return ret;
4617
4618         ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
4619         pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
4620         if (IS_ERR(pii->pool_ns)) {
4621                 ret = PTR_ERR(pii->pool_ns);
4622                 pii->pool_ns = NULL;
4623                 return ret;
4624         }
4625         pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
4626         if (IS_ERR(pii->image_id)) {
4627                 ret = PTR_ERR(pii->image_id);
4628                 pii->image_id = NULL;
4629                 return ret;
4630         }
4631         ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
4632         return 0;
4633
4634 e_inval:
4635         return -EINVAL;
4636 }
4637
4638 static int __get_parent_info(struct rbd_device *rbd_dev,
4639                              struct page *req_page,
4640                              struct page *reply_page,
4641                              struct parent_image_info *pii)
4642 {
4643         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4644         size_t reply_len = PAGE_SIZE;
4645         void *p, *end;
4646         int ret;
4647
4648         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
4649                              "rbd", "parent_get", CEPH_OSD_FLAG_READ,
4650                              req_page, sizeof(u64), reply_page, &reply_len);
4651         if (ret)
4652                 return ret == -EOPNOTSUPP ? 1 : ret;
4653
4654         p = page_address(reply_page);
4655         end = p + reply_len;
4656         ret = decode_parent_image_spec(&p, end, pii);
4657         if (ret)
4658                 return ret;
4659
4660         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
4661                              "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
4662                              req_page, sizeof(u64), reply_page, &reply_len);
4663         if (ret)
4664                 return ret;
4665
4666         p = page_address(reply_page);
4667         end = p + reply_len;
4668         ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
4669         if (pii->has_overlap)
4670                 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
4671
4672         return 0;
4673
4674 e_inval:
4675         return -EINVAL;
4676 }
4677
4678 /*
4679  * The caller is responsible for @pii.
4680  */
4681 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
4682                                     struct page *req_page,
4683                                     struct page *reply_page,
4684                                     struct parent_image_info *pii)
4685 {
4686         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4687         size_t reply_len = PAGE_SIZE;
4688         void *p, *end;
4689         int ret;
4690
4691         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
4692                              "rbd", "get_parent", CEPH_OSD_FLAG_READ,
4693                              req_page, sizeof(u64), reply_page, &reply_len);
4694         if (ret)
4695                 return ret;
4696
4697         p = page_address(reply_page);
4698         end = p + reply_len;
4699         ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
4700         pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4701         if (IS_ERR(pii->image_id)) {
4702                 ret = PTR_ERR(pii->image_id);
4703                 pii->image_id = NULL;
4704                 return ret;
4705         }
4706         ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
4707         pii->has_overlap = true;
4708         ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
4709
4710         return 0;
4711
4712 e_inval:
4713         return -EINVAL;
4714 }
4715
4716 static int get_parent_info(struct rbd_device *rbd_dev,
4717                            struct parent_image_info *pii)
4718 {
4719         struct page *req_page, *reply_page;
4720         void *p;
4721         int ret;
4722
4723         req_page = alloc_page(GFP_KERNEL);
4724         if (!req_page)
4725                 return -ENOMEM;
4726
4727         reply_page = alloc_page(GFP_KERNEL);
4728         if (!reply_page) {
4729                 __free_page(req_page);
4730                 return -ENOMEM;
4731         }
4732
4733         p = page_address(req_page);
4734         ceph_encode_64(&p, rbd_dev->spec->snap_id);
4735         ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
4736         if (ret > 0)
4737                 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
4738                                                pii);
4739
4740         __free_page(req_page);
4741         __free_page(reply_page);
4742         return ret;
4743 }
4744
4745 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4746 {
4747         struct rbd_spec *parent_spec;
4748         struct parent_image_info pii = { 0 };
4749         int ret;
4750
4751         parent_spec = rbd_spec_alloc();
4752         if (!parent_spec)
4753                 return -ENOMEM;
4754
4755         ret = get_parent_info(rbd_dev, &pii);
4756         if (ret)
4757                 goto out_err;
4758
4759         dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
4760              __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
4761              pii.has_overlap, pii.overlap);
4762
4763         if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
4764                 /*
4765                  * Either the parent never existed, or we have
4766                  * record of it but the image got flattened so it no
4767                  * longer has a parent.  When the parent of a
4768                  * layered image disappears we immediately set the
4769                  * overlap to 0.  The effect of this is that all new
4770                  * requests will be treated as if the image had no
4771                  * parent.
4772                  *
4773                  * If !pii.has_overlap, the parent image spec is not
4774                  * applicable.  It's there to avoid duplication in each
4775                  * snapshot record.
4776                  */
4777                 if (rbd_dev->parent_overlap) {
4778                         rbd_dev->parent_overlap = 0;
4779                         rbd_dev_parent_put(rbd_dev);
4780                         pr_info("%s: clone image has been flattened\n",
4781                                 rbd_dev->disk->disk_name);
4782                 }
4783
4784                 goto out;       /* No parent?  No problem. */
4785         }
4786
4787         /* The ceph file layout needs to fit pool id in 32 bits */
4788
4789         ret = -EIO;
4790         if (pii.pool_id > (u64)U32_MAX) {
4791                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4792                         (unsigned long long)pii.pool_id, U32_MAX);
4793                 goto out_err;
4794         }
4795
4796         /*
4797          * The parent won't change (except when the clone is
4798          * flattened, already handled that).  So we only need to
4799          * record the parent spec we have not already done so.
4800          */
4801         if (!rbd_dev->parent_spec) {
4802                 parent_spec->pool_id = pii.pool_id;
4803                 if (pii.pool_ns && *pii.pool_ns) {
4804                         parent_spec->pool_ns = pii.pool_ns;
4805                         pii.pool_ns = NULL;
4806                 }
4807                 parent_spec->image_id = pii.image_id;
4808                 pii.image_id = NULL;
4809                 parent_spec->snap_id = pii.snap_id;
4810
4811                 rbd_dev->parent_spec = parent_spec;
4812                 parent_spec = NULL;     /* rbd_dev now owns this */
4813         }
4814
4815         /*
4816          * We always update the parent overlap.  If it's zero we issue
4817          * a warning, as we will proceed as if there was no parent.
4818          */
4819         if (!pii.overlap) {
4820                 if (parent_spec) {
4821                         /* refresh, careful to warn just once */
4822                         if (rbd_dev->parent_overlap)
4823                                 rbd_warn(rbd_dev,
4824                                     "clone now standalone (overlap became 0)");
4825                 } else {
4826                         /* initial probe */
4827                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
4828                 }
4829         }
4830         rbd_dev->parent_overlap = pii.overlap;
4831
4832 out:
4833         ret = 0;
4834 out_err:
4835         kfree(pii.pool_ns);
4836         kfree(pii.image_id);
4837         rbd_spec_put(parent_spec);
4838         return ret;
4839 }
4840
4841 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4842 {
4843         struct {
4844                 __le64 stripe_unit;
4845                 __le64 stripe_count;
4846         } __attribute__ ((packed)) striping_info_buf = { 0 };
4847         size_t size = sizeof (striping_info_buf);
4848         void *p;
4849         int ret;
4850
4851         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4852                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
4853                                 NULL, 0, &striping_info_buf, size);
4854         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4855         if (ret < 0)
4856                 return ret;
4857         if (ret < size)
4858                 return -ERANGE;
4859
4860         p = &striping_info_buf;
4861         rbd_dev->header.stripe_unit = ceph_decode_64(&p);
4862         rbd_dev->header.stripe_count = ceph_decode_64(&p);
4863         return 0;
4864 }
4865
4866 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
4867 {
4868         __le64 data_pool_id;
4869         int ret;
4870
4871         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4872                                   &rbd_dev->header_oloc, "get_data_pool",
4873                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
4874         if (ret < 0)
4875                 return ret;
4876         if (ret < sizeof(data_pool_id))
4877                 return -EBADMSG;
4878
4879         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
4880         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
4881         return 0;
4882 }
4883
4884 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4885 {
4886         CEPH_DEFINE_OID_ONSTACK(oid);
4887         size_t image_id_size;
4888         char *image_id;
4889         void *p;
4890         void *end;
4891         size_t size;
4892         void *reply_buf = NULL;
4893         size_t len = 0;
4894         char *image_name = NULL;
4895         int ret;
4896
4897         rbd_assert(!rbd_dev->spec->image_name);
4898
4899         len = strlen(rbd_dev->spec->image_id);
4900         image_id_size = sizeof (__le32) + len;
4901         image_id = kmalloc(image_id_size, GFP_KERNEL);
4902         if (!image_id)
4903                 return NULL;
4904
4905         p = image_id;
4906         end = image_id + image_id_size;
4907         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4908
4909         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4910         reply_buf = kmalloc(size, GFP_KERNEL);
4911         if (!reply_buf)
4912                 goto out;
4913
4914         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
4915         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
4916                                   "dir_get_name", image_id, image_id_size,
4917                                   reply_buf, size);
4918         if (ret < 0)
4919                 goto out;
4920         p = reply_buf;
4921         end = reply_buf + ret;
4922
4923         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4924         if (IS_ERR(image_name))
4925                 image_name = NULL;
4926         else
4927                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4928 out:
4929         kfree(reply_buf);
4930         kfree(image_id);
4931
4932         return image_name;
4933 }
4934
4935 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4936 {
4937         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4938         const char *snap_name;
4939         u32 which = 0;
4940
4941         /* Skip over names until we find the one we are looking for */
4942
4943         snap_name = rbd_dev->header.snap_names;
4944         while (which < snapc->num_snaps) {
4945                 if (!strcmp(name, snap_name))
4946                         return snapc->snaps[which];
4947                 snap_name += strlen(snap_name) + 1;
4948                 which++;
4949         }
4950         return CEPH_NOSNAP;
4951 }
4952
4953 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4954 {
4955         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4956         u32 which;
4957         bool found = false;
4958         u64 snap_id;
4959
4960         for (which = 0; !found && which < snapc->num_snaps; which++) {
4961                 const char *snap_name;
4962
4963                 snap_id = snapc->snaps[which];
4964                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4965                 if (IS_ERR(snap_name)) {
4966                         /* ignore no-longer existing snapshots */
4967                         if (PTR_ERR(snap_name) == -ENOENT)
4968                                 continue;
4969                         else
4970                                 break;
4971                 }
4972                 found = !strcmp(name, snap_name);
4973                 kfree(snap_name);
4974         }
4975         return found ? snap_id : CEPH_NOSNAP;
4976 }
4977
4978 /*
4979  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4980  * no snapshot by that name is found, or if an error occurs.
4981  */
4982 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4983 {
4984         if (rbd_dev->image_format == 1)
4985                 return rbd_v1_snap_id_by_name(rbd_dev, name);
4986
4987         return rbd_v2_snap_id_by_name(rbd_dev, name);
4988 }
4989
4990 /*
4991  * An image being mapped will have everything but the snap id.
4992  */
4993 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4994 {
4995         struct rbd_spec *spec = rbd_dev->spec;
4996
4997         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4998         rbd_assert(spec->image_id && spec->image_name);
4999         rbd_assert(spec->snap_name);
5000
5001         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5002                 u64 snap_id;
5003
5004                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5005                 if (snap_id == CEPH_NOSNAP)
5006                         return -ENOENT;
5007
5008                 spec->snap_id = snap_id;
5009         } else {
5010                 spec->snap_id = CEPH_NOSNAP;
5011         }
5012
5013         return 0;
5014 }
5015
5016 /*
5017  * A parent image will have all ids but none of the names.
5018  *
5019  * All names in an rbd spec are dynamically allocated.  It's OK if we
5020  * can't figure out the name for an image id.
5021  */
5022 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
5023 {
5024         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5025         struct rbd_spec *spec = rbd_dev->spec;
5026         const char *pool_name;
5027         const char *image_name;
5028         const char *snap_name;
5029         int ret;
5030
5031         rbd_assert(spec->pool_id != CEPH_NOPOOL);
5032         rbd_assert(spec->image_id);
5033         rbd_assert(spec->snap_id != CEPH_NOSNAP);
5034
5035         /* Get the pool name; we have to make our own copy of this */
5036
5037         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5038         if (!pool_name) {
5039                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
5040                 return -EIO;
5041         }
5042         pool_name = kstrdup(pool_name, GFP_KERNEL);
5043         if (!pool_name)
5044                 return -ENOMEM;
5045
5046         /* Fetch the image name; tolerate failure here */
5047
5048         image_name = rbd_dev_image_name(rbd_dev);
5049         if (!image_name)
5050                 rbd_warn(rbd_dev, "unable to get image name");
5051
5052         /* Fetch the snapshot name */
5053
5054         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
5055         if (IS_ERR(snap_name)) {
5056                 ret = PTR_ERR(snap_name);
5057                 goto out_err;
5058         }
5059
5060         spec->pool_name = pool_name;
5061         spec->image_name = image_name;
5062         spec->snap_name = snap_name;
5063
5064         return 0;
5065
5066 out_err:
5067         kfree(image_name);
5068         kfree(pool_name);
5069         return ret;
5070 }
5071
5072 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
5073 {
5074         size_t size;
5075         int ret;
5076         void *reply_buf;
5077         void *p;
5078         void *end;
5079         u64 seq;
5080         u32 snap_count;
5081         struct ceph_snap_context *snapc;
5082         u32 i;
5083
5084         /*
5085          * We'll need room for the seq value (maximum snapshot id),
5086          * snapshot count, and array of that many snapshot ids.
5087          * For now we have a fixed upper limit on the number we're
5088          * prepared to receive.
5089          */
5090         size = sizeof (__le64) + sizeof (__le32) +
5091                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
5092         reply_buf = kzalloc(size, GFP_KERNEL);
5093         if (!reply_buf)
5094                 return -ENOMEM;
5095
5096         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5097                                   &rbd_dev->header_oloc, "get_snapcontext",
5098                                   NULL, 0, reply_buf, size);
5099         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5100         if (ret < 0)
5101                 goto out;
5102
5103         p = reply_buf;
5104         end = reply_buf + ret;
5105         ret = -ERANGE;
5106         ceph_decode_64_safe(&p, end, seq, out);
5107         ceph_decode_32_safe(&p, end, snap_count, out);
5108
5109         /*
5110          * Make sure the reported number of snapshot ids wouldn't go
5111          * beyond the end of our buffer.  But before checking that,
5112          * make sure the computed size of the snapshot context we
5113          * allocate is representable in a size_t.
5114          */
5115         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5116                                  / sizeof (u64)) {
5117                 ret = -EINVAL;
5118                 goto out;
5119         }
5120         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5121                 goto out;
5122         ret = 0;
5123
5124         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
5125         if (!snapc) {
5126                 ret = -ENOMEM;
5127                 goto out;
5128         }
5129         snapc->seq = seq;
5130         for (i = 0; i < snap_count; i++)
5131                 snapc->snaps[i] = ceph_decode_64(&p);
5132
5133         ceph_put_snap_context(rbd_dev->header.snapc);
5134         rbd_dev->header.snapc = snapc;
5135
5136         dout("  snap context seq = %llu, snap_count = %u\n",
5137                 (unsigned long long)seq, (unsigned int)snap_count);
5138 out:
5139         kfree(reply_buf);
5140
5141         return ret;
5142 }
5143
5144 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5145                                         u64 snap_id)
5146 {
5147         size_t size;
5148         void *reply_buf;
5149         __le64 snapid;
5150         int ret;
5151         void *p;
5152         void *end;
5153         char *snap_name;
5154
5155         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5156         reply_buf = kmalloc(size, GFP_KERNEL);
5157         if (!reply_buf)
5158                 return ERR_PTR(-ENOMEM);
5159
5160         snapid = cpu_to_le64(snap_id);
5161         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5162                                   &rbd_dev->header_oloc, "get_snapshot_name",
5163                                   &snapid, sizeof(snapid), reply_buf, size);
5164         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5165         if (ret < 0) {
5166                 snap_name = ERR_PTR(ret);
5167                 goto out;
5168         }
5169
5170         p = reply_buf;
5171         end = reply_buf + ret;
5172         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5173         if (IS_ERR(snap_name))
5174                 goto out;
5175
5176         dout("  snap_id 0x%016llx snap_name = %s\n",
5177                 (unsigned long long)snap_id, snap_name);
5178 out:
5179         kfree(reply_buf);
5180
5181         return snap_name;
5182 }
5183
5184 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
5185 {
5186         bool first_time = rbd_dev->header.object_prefix == NULL;
5187         int ret;
5188
5189         ret = rbd_dev_v2_image_size(rbd_dev);
5190         if (ret)
5191                 return ret;
5192
5193         if (first_time) {
5194                 ret = rbd_dev_v2_header_onetime(rbd_dev);
5195                 if (ret)
5196                         return ret;
5197         }
5198
5199         ret = rbd_dev_v2_snap_context(rbd_dev);
5200         if (ret && first_time) {
5201                 kfree(rbd_dev->header.object_prefix);
5202                 rbd_dev->header.object_prefix = NULL;
5203         }
5204
5205         return ret;
5206 }
5207
5208 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5209 {
5210         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5211
5212         if (rbd_dev->image_format == 1)
5213                 return rbd_dev_v1_header_info(rbd_dev);
5214
5215         return rbd_dev_v2_header_info(rbd_dev);
5216 }
5217
5218 /*
5219  * Skips over white space at *buf, and updates *buf to point to the
5220  * first found non-space character (if any). Returns the length of
5221  * the token (string of non-white space characters) found.  Note
5222  * that *buf must be terminated with '\0'.
5223  */
5224 static inline size_t next_token(const char **buf)
5225 {
5226         /*
5227         * These are the characters that produce nonzero for
5228         * isspace() in the "C" and "POSIX" locales.
5229         */
5230         const char *spaces = " \f\n\r\t\v";
5231
5232         *buf += strspn(*buf, spaces);   /* Find start of token */
5233
5234         return strcspn(*buf, spaces);   /* Return token length */
5235 }
5236
5237 /*
5238  * Finds the next token in *buf, dynamically allocates a buffer big
5239  * enough to hold a copy of it, and copies the token into the new
5240  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
5241  * that a duplicate buffer is created even for a zero-length token.
5242  *
5243  * Returns a pointer to the newly-allocated duplicate, or a null
5244  * pointer if memory for the duplicate was not available.  If
5245  * the lenp argument is a non-null pointer, the length of the token
5246  * (not including the '\0') is returned in *lenp.
5247  *
5248  * If successful, the *buf pointer will be updated to point beyond
5249  * the end of the found token.
5250  *
5251  * Note: uses GFP_KERNEL for allocation.
5252  */
5253 static inline char *dup_token(const char **buf, size_t *lenp)
5254 {
5255         char *dup;
5256         size_t len;
5257
5258         len = next_token(buf);
5259         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5260         if (!dup)
5261                 return NULL;
5262         *(dup + len) = '\0';
5263         *buf += len;
5264
5265         if (lenp)
5266                 *lenp = len;
5267
5268         return dup;
5269 }
5270
5271 /*
5272  * Parse the options provided for an "rbd add" (i.e., rbd image
5273  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
5274  * and the data written is passed here via a NUL-terminated buffer.
5275  * Returns 0 if successful or an error code otherwise.
5276  *
5277  * The information extracted from these options is recorded in
5278  * the other parameters which return dynamically-allocated
5279  * structures:
5280  *  ceph_opts
5281  *      The address of a pointer that will refer to a ceph options
5282  *      structure.  Caller must release the returned pointer using
5283  *      ceph_destroy_options() when it is no longer needed.
5284  *  rbd_opts
5285  *      Address of an rbd options pointer.  Fully initialized by
5286  *      this function; caller must release with kfree().
5287  *  spec
5288  *      Address of an rbd image specification pointer.  Fully
5289  *      initialized by this function based on parsed options.
5290  *      Caller must release with rbd_spec_put().
5291  *
5292  * The options passed take this form:
5293  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5294  * where:
5295  *  <mon_addrs>
5296  *      A comma-separated list of one or more monitor addresses.
5297  *      A monitor address is an ip address, optionally followed
5298  *      by a port number (separated by a colon).
5299  *        I.e.:  ip1[:port1][,ip2[:port2]...]
5300  *  <options>
5301  *      A comma-separated list of ceph and/or rbd options.
5302  *  <pool_name>
5303  *      The name of the rados pool containing the rbd image.
5304  *  <image_name>
5305  *      The name of the image in that pool to map.
5306  *  <snap_id>
5307  *      An optional snapshot id.  If provided, the mapping will
5308  *      present data from the image at the time that snapshot was
5309  *      created.  The image head is used if no snapshot id is
5310  *      provided.  Snapshot mappings are always read-only.
5311  */
5312 static int rbd_add_parse_args(const char *buf,
5313                                 struct ceph_options **ceph_opts,
5314                                 struct rbd_options **opts,
5315                                 struct rbd_spec **rbd_spec)
5316 {
5317         size_t len;
5318         char *options;
5319         const char *mon_addrs;
5320         char *snap_name;
5321         size_t mon_addrs_size;
5322         struct parse_rbd_opts_ctx pctx = { 0 };
5323         struct ceph_options *copts;
5324         int ret;
5325
5326         /* The first four tokens are required */
5327
5328         len = next_token(&buf);
5329         if (!len) {
5330                 rbd_warn(NULL, "no monitor address(es) provided");
5331                 return -EINVAL;
5332         }
5333         mon_addrs = buf;
5334         mon_addrs_size = len + 1;
5335         buf += len;
5336
5337         ret = -EINVAL;
5338         options = dup_token(&buf, NULL);
5339         if (!options)
5340                 return -ENOMEM;
5341         if (!*options) {
5342                 rbd_warn(NULL, "no options provided");
5343                 goto out_err;
5344         }
5345
5346         pctx.spec = rbd_spec_alloc();
5347         if (!pctx.spec)
5348                 goto out_mem;
5349
5350         pctx.spec->pool_name = dup_token(&buf, NULL);
5351         if (!pctx.spec->pool_name)
5352                 goto out_mem;
5353         if (!*pctx.spec->pool_name) {
5354                 rbd_warn(NULL, "no pool name provided");
5355                 goto out_err;
5356         }
5357
5358         pctx.spec->image_name = dup_token(&buf, NULL);
5359         if (!pctx.spec->image_name)
5360                 goto out_mem;
5361         if (!*pctx.spec->image_name) {
5362                 rbd_warn(NULL, "no image name provided");
5363                 goto out_err;
5364         }
5365
5366         /*
5367          * Snapshot name is optional; default is to use "-"
5368          * (indicating the head/no snapshot).
5369          */
5370         len = next_token(&buf);
5371         if (!len) {
5372                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5373                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5374         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5375                 ret = -ENAMETOOLONG;
5376                 goto out_err;
5377         }
5378         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5379         if (!snap_name)
5380                 goto out_mem;
5381         *(snap_name + len) = '\0';
5382         pctx.spec->snap_name = snap_name;
5383
5384         /* Initialize all rbd options to the defaults */
5385
5386         pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
5387         if (!pctx.opts)
5388                 goto out_mem;
5389
5390         pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
5391         pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5392         pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
5393         pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5394         pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5395         pctx.opts->trim = RBD_TRIM_DEFAULT;
5396
5397         copts = ceph_parse_options(options, mon_addrs,
5398                                    mon_addrs + mon_addrs_size - 1,
5399                                    parse_rbd_opts_token, &pctx);
5400         if (IS_ERR(copts)) {
5401                 ret = PTR_ERR(copts);
5402                 goto out_err;
5403         }
5404         kfree(options);
5405
5406         *ceph_opts = copts;
5407         *opts = pctx.opts;
5408         *rbd_spec = pctx.spec;
5409
5410         return 0;
5411 out_mem:
5412         ret = -ENOMEM;
5413 out_err:
5414         kfree(pctx.opts);
5415         rbd_spec_put(pctx.spec);
5416         kfree(options);
5417
5418         return ret;
5419 }
5420
5421 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5422 {
5423         down_write(&rbd_dev->lock_rwsem);
5424         if (__rbd_is_lock_owner(rbd_dev))
5425                 rbd_unlock(rbd_dev);
5426         up_write(&rbd_dev->lock_rwsem);
5427 }
5428
5429 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5430 {
5431         int ret;
5432
5433         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5434                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5435                 return -EINVAL;
5436         }
5437
5438         /* FIXME: "rbd map --exclusive" should be in interruptible */
5439         down_read(&rbd_dev->lock_rwsem);
5440         ret = rbd_wait_state_locked(rbd_dev, true);
5441         up_read(&rbd_dev->lock_rwsem);
5442         if (ret) {
5443                 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5444                 return -EROFS;
5445         }
5446
5447         return 0;
5448 }
5449
5450 /*
5451  * An rbd format 2 image has a unique identifier, distinct from the
5452  * name given to it by the user.  Internally, that identifier is
5453  * what's used to specify the names of objects related to the image.
5454  *
5455  * A special "rbd id" object is used to map an rbd image name to its
5456  * id.  If that object doesn't exist, then there is no v2 rbd image
5457  * with the supplied name.
5458  *
5459  * This function will record the given rbd_dev's image_id field if
5460  * it can be determined, and in that case will return 0.  If any
5461  * errors occur a negative errno will be returned and the rbd_dev's
5462  * image_id field will be unchanged (and should be NULL).
5463  */
5464 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5465 {
5466         int ret;
5467         size_t size;
5468         CEPH_DEFINE_OID_ONSTACK(oid);
5469         void *response;
5470         char *image_id;
5471
5472         /*
5473          * When probing a parent image, the image id is already
5474          * known (and the image name likely is not).  There's no
5475          * need to fetch the image id again in this case.  We
5476          * do still need to set the image format though.
5477          */
5478         if (rbd_dev->spec->image_id) {
5479                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5480
5481                 return 0;
5482         }
5483
5484         /*
5485          * First, see if the format 2 image id file exists, and if
5486          * so, get the image's persistent id from it.
5487          */
5488         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5489                                rbd_dev->spec->image_name);
5490         if (ret)
5491                 return ret;
5492
5493         dout("rbd id object name is %s\n", oid.name);
5494
5495         /* Response will be an encoded string, which includes a length */
5496
5497         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5498         response = kzalloc(size, GFP_NOIO);
5499         if (!response) {
5500                 ret = -ENOMEM;
5501                 goto out;
5502         }
5503
5504         /* If it doesn't exist we'll assume it's a format 1 image */
5505
5506         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5507                                   "get_id", NULL, 0,
5508                                   response, RBD_IMAGE_ID_LEN_MAX);
5509         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5510         if (ret == -ENOENT) {
5511                 image_id = kstrdup("", GFP_KERNEL);
5512                 ret = image_id ? 0 : -ENOMEM;
5513                 if (!ret)
5514                         rbd_dev->image_format = 1;
5515         } else if (ret >= 0) {
5516                 void *p = response;
5517
5518                 image_id = ceph_extract_encoded_string(&p, p + ret,
5519                                                 NULL, GFP_NOIO);
5520                 ret = PTR_ERR_OR_ZERO(image_id);
5521                 if (!ret)
5522                         rbd_dev->image_format = 2;
5523         }
5524
5525         if (!ret) {
5526                 rbd_dev->spec->image_id = image_id;
5527                 dout("image_id is %s\n", image_id);
5528         }
5529 out:
5530         kfree(response);
5531         ceph_oid_destroy(&oid);
5532         return ret;
5533 }
5534
5535 /*
5536  * Undo whatever state changes are made by v1 or v2 header info
5537  * call.
5538  */
5539 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5540 {
5541         struct rbd_image_header *header;
5542
5543         rbd_dev_parent_put(rbd_dev);
5544
5545         /* Free dynamic fields from the header, then zero it out */
5546
5547         header = &rbd_dev->header;
5548         ceph_put_snap_context(header->snapc);
5549         kfree(header->snap_sizes);
5550         kfree(header->snap_names);
5551         kfree(header->object_prefix);
5552         memset(header, 0, sizeof (*header));
5553 }
5554
5555 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5556 {
5557         int ret;
5558
5559         ret = rbd_dev_v2_object_prefix(rbd_dev);
5560         if (ret)
5561                 goto out_err;
5562
5563         /*
5564          * Get the and check features for the image.  Currently the
5565          * features are assumed to never change.
5566          */
5567         ret = rbd_dev_v2_features(rbd_dev);
5568         if (ret)
5569                 goto out_err;
5570
5571         /* If the image supports fancy striping, get its parameters */
5572
5573         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5574                 ret = rbd_dev_v2_striping_info(rbd_dev);
5575                 if (ret < 0)
5576                         goto out_err;
5577         }
5578
5579         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5580                 ret = rbd_dev_v2_data_pool(rbd_dev);
5581                 if (ret)
5582                         goto out_err;
5583         }
5584
5585         rbd_init_layout(rbd_dev);
5586         return 0;
5587
5588 out_err:
5589         rbd_dev->header.features = 0;
5590         kfree(rbd_dev->header.object_prefix);
5591         rbd_dev->header.object_prefix = NULL;
5592         return ret;
5593 }
5594
5595 /*
5596  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5597  * rbd_dev_image_probe() recursion depth, which means it's also the
5598  * length of the already discovered part of the parent chain.
5599  */
5600 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5601 {
5602         struct rbd_device *parent = NULL;
5603         int ret;
5604
5605         if (!rbd_dev->parent_spec)
5606                 return 0;
5607
5608         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5609                 pr_info("parent chain is too long (%d)\n", depth);
5610                 ret = -EINVAL;
5611                 goto out_err;
5612         }
5613
5614         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5615         if (!parent) {
5616                 ret = -ENOMEM;
5617                 goto out_err;
5618         }
5619
5620         /*
5621          * Images related by parent/child relationships always share
5622          * rbd_client and spec/parent_spec, so bump their refcounts.
5623          */
5624         __rbd_get_client(rbd_dev->rbd_client);
5625         rbd_spec_get(rbd_dev->parent_spec);
5626
5627         ret = rbd_dev_image_probe(parent, depth);
5628         if (ret < 0)
5629                 goto out_err;
5630
5631         rbd_dev->parent = parent;
5632         atomic_set(&rbd_dev->parent_ref, 1);
5633         return 0;
5634
5635 out_err:
5636         rbd_dev_unparent(rbd_dev);
5637         rbd_dev_destroy(parent);
5638         return ret;
5639 }
5640
5641 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5642 {
5643         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5644         rbd_dev_mapping_clear(rbd_dev);
5645         rbd_free_disk(rbd_dev);
5646         if (!single_major)
5647                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5648 }
5649
5650 /*
5651  * rbd_dev->header_rwsem must be locked for write and will be unlocked
5652  * upon return.
5653  */
5654 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5655 {
5656         int ret;
5657
5658         /* Record our major and minor device numbers. */
5659
5660         if (!single_major) {
5661                 ret = register_blkdev(0, rbd_dev->name);
5662                 if (ret < 0)
5663                         goto err_out_unlock;
5664
5665                 rbd_dev->major = ret;
5666                 rbd_dev->minor = 0;
5667         } else {
5668                 rbd_dev->major = rbd_major;
5669                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5670         }
5671
5672         /* Set up the blkdev mapping. */
5673
5674         ret = rbd_init_disk(rbd_dev);
5675         if (ret)
5676                 goto err_out_blkdev;
5677
5678         ret = rbd_dev_mapping_set(rbd_dev);
5679         if (ret)
5680                 goto err_out_disk;
5681
5682         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5683         set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
5684
5685         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5686         if (ret)
5687                 goto err_out_mapping;
5688
5689         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5690         up_write(&rbd_dev->header_rwsem);
5691         return 0;
5692
5693 err_out_mapping:
5694         rbd_dev_mapping_clear(rbd_dev);
5695 err_out_disk:
5696         rbd_free_disk(rbd_dev);
5697 err_out_blkdev:
5698         if (!single_major)
5699                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5700 err_out_unlock:
5701         up_write(&rbd_dev->header_rwsem);
5702         return ret;
5703 }
5704
5705 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5706 {
5707         struct rbd_spec *spec = rbd_dev->spec;
5708         int ret;
5709
5710         /* Record the header object name for this rbd image. */
5711
5712         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5713         if (rbd_dev->image_format == 1)
5714                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5715                                        spec->image_name, RBD_SUFFIX);
5716         else
5717                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5718                                        RBD_HEADER_PREFIX, spec->image_id);
5719
5720         return ret;
5721 }
5722
5723 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5724 {
5725         rbd_dev_unprobe(rbd_dev);
5726         if (rbd_dev->opts)
5727                 rbd_unregister_watch(rbd_dev);
5728         rbd_dev->image_format = 0;
5729         kfree(rbd_dev->spec->image_id);
5730         rbd_dev->spec->image_id = NULL;
5731 }
5732
5733 /*
5734  * Probe for the existence of the header object for the given rbd
5735  * device.  If this image is the one being mapped (i.e., not a
5736  * parent), initiate a watch on its header object before using that
5737  * object to get detailed information about the rbd image.
5738  */
5739 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
5740 {
5741         int ret;
5742
5743         /*
5744          * Get the id from the image id object.  Unless there's an
5745          * error, rbd_dev->spec->image_id will be filled in with
5746          * a dynamically-allocated string, and rbd_dev->image_format
5747          * will be set to either 1 or 2.
5748          */
5749         ret = rbd_dev_image_id(rbd_dev);
5750         if (ret)
5751                 return ret;
5752
5753         ret = rbd_dev_header_name(rbd_dev);
5754         if (ret)
5755                 goto err_out_format;
5756
5757         if (!depth) {
5758                 ret = rbd_register_watch(rbd_dev);
5759                 if (ret) {
5760                         if (ret == -ENOENT)
5761                                 pr_info("image %s/%s%s%s does not exist\n",
5762                                         rbd_dev->spec->pool_name,
5763                                         rbd_dev->spec->pool_ns ?: "",
5764                                         rbd_dev->spec->pool_ns ? "/" : "",
5765                                         rbd_dev->spec->image_name);
5766                         goto err_out_format;
5767                 }
5768         }
5769
5770         ret = rbd_dev_header_info(rbd_dev);
5771         if (ret)
5772                 goto err_out_watch;
5773
5774         /*
5775          * If this image is the one being mapped, we have pool name and
5776          * id, image name and id, and snap name - need to fill snap id.
5777          * Otherwise this is a parent image, identified by pool, image
5778          * and snap ids - need to fill in names for those ids.
5779          */
5780         if (!depth)
5781                 ret = rbd_spec_fill_snap_id(rbd_dev);
5782         else
5783                 ret = rbd_spec_fill_names(rbd_dev);
5784         if (ret) {
5785                 if (ret == -ENOENT)
5786                         pr_info("snap %s/%s%s%s@%s does not exist\n",
5787                                 rbd_dev->spec->pool_name,
5788                                 rbd_dev->spec->pool_ns ?: "",
5789                                 rbd_dev->spec->pool_ns ? "/" : "",
5790                                 rbd_dev->spec->image_name,
5791                                 rbd_dev->spec->snap_name);
5792                 goto err_out_probe;
5793         }
5794
5795         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5796                 ret = rbd_dev_v2_parent_info(rbd_dev);
5797                 if (ret)
5798                         goto err_out_probe;
5799
5800                 /*
5801                  * Need to warn users if this image is the one being
5802                  * mapped and has a parent.
5803                  */
5804                 if (!depth && rbd_dev->parent_spec)
5805                         rbd_warn(rbd_dev,
5806                                  "WARNING: kernel layering is EXPERIMENTAL!");
5807         }
5808
5809         ret = rbd_dev_probe_parent(rbd_dev, depth);
5810         if (ret)
5811                 goto err_out_probe;
5812
5813         dout("discovered format %u image, header name is %s\n",
5814                 rbd_dev->image_format, rbd_dev->header_oid.name);
5815         return 0;
5816
5817 err_out_probe:
5818         rbd_dev_unprobe(rbd_dev);
5819 err_out_watch:
5820         if (!depth)
5821                 rbd_unregister_watch(rbd_dev);
5822 err_out_format:
5823         rbd_dev->image_format = 0;
5824         kfree(rbd_dev->spec->image_id);
5825         rbd_dev->spec->image_id = NULL;
5826         return ret;
5827 }
5828
5829 static ssize_t do_rbd_add(struct bus_type *bus,
5830                           const char *buf,
5831                           size_t count)
5832 {
5833         struct rbd_device *rbd_dev = NULL;
5834         struct ceph_options *ceph_opts = NULL;
5835         struct rbd_options *rbd_opts = NULL;
5836         struct rbd_spec *spec = NULL;
5837         struct rbd_client *rbdc;
5838         int rc;
5839
5840         if (!try_module_get(THIS_MODULE))
5841                 return -ENODEV;
5842
5843         /* parse add command */
5844         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5845         if (rc < 0)
5846                 goto out;
5847
5848         rbdc = rbd_get_client(ceph_opts);
5849         if (IS_ERR(rbdc)) {
5850                 rc = PTR_ERR(rbdc);
5851                 goto err_out_args;
5852         }
5853
5854         /* pick the pool */
5855         rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
5856         if (rc < 0) {
5857                 if (rc == -ENOENT)
5858                         pr_info("pool %s does not exist\n", spec->pool_name);
5859                 goto err_out_client;
5860         }
5861         spec->pool_id = (u64)rc;
5862
5863         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
5864         if (!rbd_dev) {
5865                 rc = -ENOMEM;
5866                 goto err_out_client;
5867         }
5868         rbdc = NULL;            /* rbd_dev now owns this */
5869         spec = NULL;            /* rbd_dev now owns this */
5870         rbd_opts = NULL;        /* rbd_dev now owns this */
5871
5872         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
5873         if (!rbd_dev->config_info) {
5874                 rc = -ENOMEM;
5875                 goto err_out_rbd_dev;
5876         }
5877
5878         down_write(&rbd_dev->header_rwsem);
5879         rc = rbd_dev_image_probe(rbd_dev, 0);
5880         if (rc < 0) {
5881                 up_write(&rbd_dev->header_rwsem);
5882                 goto err_out_rbd_dev;
5883         }
5884
5885         /* If we are mapping a snapshot it must be marked read-only */
5886         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5887                 rbd_dev->opts->read_only = true;
5888
5889         rc = rbd_dev_device_setup(rbd_dev);
5890         if (rc)
5891                 goto err_out_image_probe;
5892
5893         if (rbd_dev->opts->exclusive) {
5894                 rc = rbd_add_acquire_lock(rbd_dev);
5895                 if (rc)
5896                         goto err_out_device_setup;
5897         }
5898
5899         /* Everything's ready.  Announce the disk to the world. */
5900
5901         rc = device_add(&rbd_dev->dev);
5902         if (rc)
5903                 goto err_out_image_lock;
5904
5905         add_disk(rbd_dev->disk);
5906         /* see rbd_init_disk() */
5907         blk_put_queue(rbd_dev->disk->queue);
5908
5909         spin_lock(&rbd_dev_list_lock);
5910         list_add_tail(&rbd_dev->node, &rbd_dev_list);
5911         spin_unlock(&rbd_dev_list_lock);
5912
5913         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5914                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5915                 rbd_dev->header.features);
5916         rc = count;
5917 out:
5918         module_put(THIS_MODULE);
5919         return rc;
5920
5921 err_out_image_lock:
5922         rbd_dev_image_unlock(rbd_dev);
5923 err_out_device_setup:
5924         rbd_dev_device_release(rbd_dev);
5925 err_out_image_probe:
5926         rbd_dev_image_release(rbd_dev);
5927 err_out_rbd_dev:
5928         rbd_dev_destroy(rbd_dev);
5929 err_out_client:
5930         rbd_put_client(rbdc);
5931 err_out_args:
5932         rbd_spec_put(spec);
5933         kfree(rbd_opts);
5934         goto out;
5935 }
5936
5937 static ssize_t rbd_add(struct bus_type *bus,
5938                        const char *buf,
5939                        size_t count)
5940 {
5941         if (single_major)
5942                 return -EINVAL;
5943
5944         return do_rbd_add(bus, buf, count);
5945 }
5946
5947 static ssize_t rbd_add_single_major(struct bus_type *bus,
5948                                     const char *buf,
5949                                     size_t count)
5950 {
5951         return do_rbd_add(bus, buf, count);
5952 }
5953
5954 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5955 {
5956         while (rbd_dev->parent) {
5957                 struct rbd_device *first = rbd_dev;
5958                 struct rbd_device *second = first->parent;
5959                 struct rbd_device *third;
5960
5961                 /*
5962                  * Follow to the parent with no grandparent and
5963                  * remove it.
5964                  */
5965                 while (second && (third = second->parent)) {
5966                         first = second;
5967                         second = third;
5968                 }
5969                 rbd_assert(second);
5970                 rbd_dev_image_release(second);
5971                 rbd_dev_destroy(second);
5972                 first->parent = NULL;
5973                 first->parent_overlap = 0;
5974
5975                 rbd_assert(first->parent_spec);
5976                 rbd_spec_put(first->parent_spec);
5977                 first->parent_spec = NULL;
5978         }
5979 }
5980
5981 static ssize_t do_rbd_remove(struct bus_type *bus,
5982                              const char *buf,
5983                              size_t count)
5984 {
5985         struct rbd_device *rbd_dev = NULL;
5986         struct list_head *tmp;
5987         int dev_id;
5988         char opt_buf[6];
5989         bool force = false;
5990         int ret;
5991
5992         dev_id = -1;
5993         opt_buf[0] = '\0';
5994         sscanf(buf, "%d %5s", &dev_id, opt_buf);
5995         if (dev_id < 0) {
5996                 pr_err("dev_id out of range\n");
5997                 return -EINVAL;
5998         }
5999         if (opt_buf[0] != '\0') {
6000                 if (!strcmp(opt_buf, "force")) {
6001                         force = true;
6002                 } else {
6003                         pr_err("bad remove option at '%s'\n", opt_buf);
6004                         return -EINVAL;
6005                 }
6006         }
6007
6008         ret = -ENOENT;
6009         spin_lock(&rbd_dev_list_lock);
6010         list_for_each(tmp, &rbd_dev_list) {
6011                 rbd_dev = list_entry(tmp, struct rbd_device, node);
6012                 if (rbd_dev->dev_id == dev_id) {
6013                         ret = 0;
6014                         break;
6015                 }
6016         }
6017         if (!ret) {
6018                 spin_lock_irq(&rbd_dev->lock);
6019                 if (rbd_dev->open_count && !force)
6020                         ret = -EBUSY;
6021                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6022                                           &rbd_dev->flags))
6023                         ret = -EINPROGRESS;
6024                 spin_unlock_irq(&rbd_dev->lock);
6025         }
6026         spin_unlock(&rbd_dev_list_lock);
6027         if (ret)
6028                 return ret;
6029
6030         if (force) {
6031                 /*
6032                  * Prevent new IO from being queued and wait for existing
6033                  * IO to complete/fail.
6034                  */
6035                 blk_mq_freeze_queue(rbd_dev->disk->queue);
6036                 blk_set_queue_dying(rbd_dev->disk->queue);
6037         }
6038
6039         del_gendisk(rbd_dev->disk);
6040         spin_lock(&rbd_dev_list_lock);
6041         list_del_init(&rbd_dev->node);
6042         spin_unlock(&rbd_dev_list_lock);
6043         device_del(&rbd_dev->dev);
6044
6045         rbd_dev_image_unlock(rbd_dev);
6046         rbd_dev_device_release(rbd_dev);
6047         rbd_dev_image_release(rbd_dev);
6048         rbd_dev_destroy(rbd_dev);
6049         return count;
6050 }
6051
6052 static ssize_t rbd_remove(struct bus_type *bus,
6053                           const char *buf,
6054                           size_t count)
6055 {
6056         if (single_major)
6057                 return -EINVAL;
6058
6059         return do_rbd_remove(bus, buf, count);
6060 }
6061
6062 static ssize_t rbd_remove_single_major(struct bus_type *bus,
6063                                        const char *buf,
6064                                        size_t count)
6065 {
6066         return do_rbd_remove(bus, buf, count);
6067 }
6068
6069 /*
6070  * create control files in sysfs
6071  * /sys/bus/rbd/...
6072  */
6073 static int __init rbd_sysfs_init(void)
6074 {
6075         int ret;
6076
6077         ret = device_register(&rbd_root_dev);
6078         if (ret < 0)
6079                 return ret;
6080
6081         ret = bus_register(&rbd_bus_type);
6082         if (ret < 0)
6083                 device_unregister(&rbd_root_dev);
6084
6085         return ret;
6086 }
6087
6088 static void __exit rbd_sysfs_cleanup(void)
6089 {
6090         bus_unregister(&rbd_bus_type);
6091         device_unregister(&rbd_root_dev);
6092 }
6093
6094 static int __init rbd_slab_init(void)
6095 {
6096         rbd_assert(!rbd_img_request_cache);
6097         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
6098         if (!rbd_img_request_cache)
6099                 return -ENOMEM;
6100
6101         rbd_assert(!rbd_obj_request_cache);
6102         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
6103         if (!rbd_obj_request_cache)
6104                 goto out_err;
6105
6106         return 0;
6107
6108 out_err:
6109         kmem_cache_destroy(rbd_img_request_cache);
6110         rbd_img_request_cache = NULL;
6111         return -ENOMEM;
6112 }
6113
6114 static void rbd_slab_exit(void)
6115 {
6116         rbd_assert(rbd_obj_request_cache);
6117         kmem_cache_destroy(rbd_obj_request_cache);
6118         rbd_obj_request_cache = NULL;
6119
6120         rbd_assert(rbd_img_request_cache);
6121         kmem_cache_destroy(rbd_img_request_cache);
6122         rbd_img_request_cache = NULL;
6123 }
6124
6125 static int __init rbd_init(void)
6126 {
6127         int rc;
6128
6129         if (!libceph_compatible(NULL)) {
6130                 rbd_warn(NULL, "libceph incompatibility (quitting)");
6131                 return -EINVAL;
6132         }
6133
6134         rc = rbd_slab_init();
6135         if (rc)
6136                 return rc;
6137
6138         /*
6139          * The number of active work items is limited by the number of
6140          * rbd devices * queue depth, so leave @max_active at default.
6141          */
6142         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6143         if (!rbd_wq) {
6144                 rc = -ENOMEM;
6145                 goto err_out_slab;
6146         }
6147
6148         if (single_major) {
6149                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6150                 if (rbd_major < 0) {
6151                         rc = rbd_major;
6152                         goto err_out_wq;
6153                 }
6154         }
6155
6156         rc = rbd_sysfs_init();
6157         if (rc)
6158                 goto err_out_blkdev;
6159
6160         if (single_major)
6161                 pr_info("loaded (major %d)\n", rbd_major);
6162         else
6163                 pr_info("loaded\n");
6164
6165         return 0;
6166
6167 err_out_blkdev:
6168         if (single_major)
6169                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6170 err_out_wq:
6171         destroy_workqueue(rbd_wq);
6172 err_out_slab:
6173         rbd_slab_exit();
6174         return rc;
6175 }
6176
6177 static void __exit rbd_exit(void)
6178 {
6179         ida_destroy(&rbd_dev_id_ida);
6180         rbd_sysfs_cleanup();
6181         if (single_major)
6182                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
6183         destroy_workqueue(rbd_wq);
6184         rbd_slab_exit();
6185 }
6186
6187 module_init(rbd_init);
6188 module_exit(rbd_exit);
6189
6190 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
6191 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6192 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
6193 /* following authorship retained from original osdblk.c */
6194 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6195
6196 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
6197 MODULE_LICENSE("GPL");